Talk is cheap,show me the code

开源,协作,共享,进步

0%

Operation FAQ

1 RocketMQ’s mqadmin command error.

Problem: Sometimes after deploying the RocketMQ cluster, when you try to execute some commands of “mqadmin”, the following exception will appear:

1
org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to <null> failed

Solution: Execute export NAMESRV_ADDR=ip:9876 (ip refers to the address of NameServer deployed in the cluster) on the VM that deploys the RocketMQ cluster.Then you will execute commands of “mqadmin” successfully.

2 The inconsistent version of RocketMQ between the producer and consumer leads to the problem that message can’t be consumed normally.

Problem: The same producer sends a message, consumer A can consume, but consumer B can’t consume, and the RocketMQ Console appears:

1
Not found the consumer group consume stats, because return offset table is empty, maybe the consumer not consume any message

Solution: The jar package of RocketMQ, such as rocketmq-client, should be the same version on the consumer and producer.

3 When adding a new topic consumer group, historical messages can’t be consumed.

Problem: When a new consumer group of the same topic is started, the consumed message is the current offset message, and the historical message is not obtained.

Solution: The default policy of rocketmq is to start from the end of the message queue and skip the historical message. If you want to consume historical message, you need to set:

1
org.apache.rocketmq.client.consumer.DefaultMQPushConsumer#setConsumeFromWhere

There are three common configurations:

  • By default, a new subscription group starts to consume from the end of the queue for the first time, and then restarts and continue to consume from the last consume position, that is, to skip the historical message.
1
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
  • A new subscription group starts to consume from the head of the queue for the first time, and then restarts and continue to consume from the last consume position, that is, to consume the historical message that is not expired on Broker.
1
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
  • A new subscription group starts to consume from the specified time point for the first time, and then restarts and continue to consume from the last consume position. It is used together with consumer.setConsumeTimestamp(). The default is half an hour ago.
1
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);

4 How to enable reading data from Slave

In some cases, the Consumer needs to reset the consume position to 1-2 days ago. At this time, on the Master Broker with limited memory, the CommitLog will carry a relatively heavy IO pressure, affecting the reading and writing of other messages on that Broker. You can enable slaveReadEnable=true. When Master Broker finds that the difference between the Consumer’s consume position and the latest value of CommitLog exceeds the percentage of machine’s memory (accessMessageInMemoryMaxRatio=40%), it will recommend Consumer to read from Slave Broker and relieve Master Broker’s IO.

5 Performance

Asynchronous flush disk is recommended to use spin lock.

Synchronous flush disk is recommended to use reentrant lock. Adjust the Broker configuration item useReentrantLockWhenPutMessage, and the default value is false.

Asynchronous flush disk is recommended to open TransientStorePoolEnable and close transferMsgByHeap to improve the efficiency of pulling message;

Synchronous flush disk is recommended to increase the sendMessageThreadPoolNums appropriately. The specific configuration needs to be tested.

6 The meaning and difference between msgId and offsetMsgId in RocketMQ

After sending message with RocketMQ, you will usually see the following log:

1
SendResult [sendStatus=SEND_OK, msgId=0A42333A0DC818B4AAC246C290FD0000, offsetMsgId=0A42333A00002A9F000000000134F1F5, messageQueue=MessageQueue [topic=topicTest1, BrokerName=mac.local, queueId=3], queueOffset=4]
  • msgId,for the client, the msgId is generated by the producer instance. Specifically, the method MessageClientIDSetter.createUniqIDBuffer() is called to generate a unique Id.
  • offsetMsgId, offsetMsgId is generated by the Broker server when writing a message ( string concating “IP address + port” and “CommitLog’s physical offset address”), and offsetMsgId is the messageId used to query in the RocketMQ console.

Best practices

1 Producer

2 Consumer

3 Broker

3.1 Broker Role

3.2 FlushDiskType

3.3 Broker Configuration

Parameter name Default Description
listenPort 10911 listen port for client
namesrvAddr null name server address
brokerIP1 InetAddress for network interface Should be configured if having multiple addresses
brokerIP2 InetAddress for network interface If configured for the Master broker in the Master/Slave cluster, slave broker will connect to this port for data synchronization
brokerName null broker name
brokerClusterName DefaultCluster this broker belongs to which cluster
brokerId 0 broker id, 0 means master, positive integers mean slave
storePathCommitLog $HOME/store/commitlog/ file path for commit log
storePathConsumerQueue $HOME/store/consumequeue/ file path for consume queue
mappedFileSizeCommitLog 1024 * 1024 * 1024(1G) mapped file size for commit log
deleteWhen 04 When to delete the commitlog which is out of the reserve time
fileReserverdTime 72 The number of hours to keep a commitlog before deleting it
brokerRole ASYNC_MASTER SYNC_MASTER/ASYNC_MASTER/SLAVE
flushDiskType ASYNC_FLUSH {SYNC_FLUSH/ASYNC_FLUSH}. Broker of SYNC_FLUSH mode flushes each message onto disk before acknowledging producer. Broker of ASYNC_FLUSH mode, on the other hand, takes advantage of group-committing, achieving better performance.

Architecture design

Technology Architecture

The RocketMQ architecture is divided into four parts, as shown in the figure above:

  • Producer:The role of message publishing supports distributed cluster mode deployment. Producer selects the corresponding Broker cluster queue for message delivery through MQ’s load balancing module. The delivery process supports fast failure and low latency.

  • Consumer:The role of message consumption supports distributed cluster deployment. Support push, pull two modes to consume messages. It also supports cluster mode and broadcast mode consumption, and it provides a real-time message subscription mechanism to meet the needs of most users.

  • NameServer:NameServer is a very simple Topic routing registry with a role similar to ZooKeeper in Dubbo, which supports dynamic registration and discovery of Broker. It mainly includes two functions: Broker management, NameServer accepts the registration information of the Broker cluster and saves it as the basic data of the routing information. Then provide a heartbeat detection mechanism to check whether the broker is still alive; routing information management, each NameServer will save the entire routing information about the Broker cluster and the queue information for the client query. Then the Producer and Consumer can know the routing information of the entire Broker cluster through the NameServer, so as to deliver and consume the message. The NameServer is usually deployed in a cluster mode, and each instance does not communicate with each other. Broker registers its own routing information with each NameServer, so each NameServer instance stores a complete routing information. When a NameServer is offline for some reason, the Broker can still synchronize its routing information with other NameServers. The Producer and Consumer can still dynamically sense the information of the Broker’s routing.

  • BrokerServer:Broker is responsible for the storage, delivery and query of messages and high availability guarantees. In order to achieve these functions, Broker includes the following important sub-modules.

  1. Remoting Module:The entire broker entity handles requests from the clients side.
  2. Client Manager:Topic subscription information for managing the client (Producer/Consumer) and maintaining the Consumer
  3. Store Service:Provides a convenient and simple API interface for handling message storage to physical hard disks and query functions.
  4. HA Service:Highly available service that provides data synchronization between Master Broker and Slave Broker.
  5. Index Service:The message delivered to the Broker is indexed according to a specific Message key to provide a quick query of the message.

Deployment architecture

RocketMQ Network deployment features

  • NameServer is an almost stateless node that can be deployed in a cluster without any information synchronization between nodes.

  • The broker deployment is relatively complex. The Broker is divided into the Master and the Slave. One Master can correspond to multiple Slaves. However, one Slave can only correspond to one Master. The correspondence between the Master and the Slave is defined by specifying the same BrokerName and different BrokerId. The BrokerId is 0. Indicates Master, non-zero means Slave. The Master can also deploy multiple. Each broker establishes a long connection with all nodes in the NameServer cluster, and periodically registers Topic information to all NameServers. Note: The current RocketMQ version supports a Master Multi Slave on the deployment architecture, but only the slave server with BrokerId=1 will participate in the read load of the message.

  • The Producer establishes a long connection with one of the nodes in the NameServer cluster (randomly selected), periodically obtains Topic routing information from the NameServer, and establishes a long connection to the Master that provides the Topic service, and periodically sends a heartbeat to the Master. Producer is completely stateless and can be deployed in a cluster.

  • The Consumer establishes a long connection with one of the nodes in the NameServer cluster (randomly selected), periodically obtains Topic routing information from the NameServer, and establishes a long connection to the Master and Slave that provides the Topic service, and periodically sends heartbeats to the Master and Slave. The Consumer can subscribe to the message from the Master or subscribe to the message from the Slave. When the consumer pulls the message to the Master, the Master server will generate a read according to the distance between the offset and the maximum offset. I/O), and whether the server is readable or not, the next time it is recommended to pull from the Master or Slave.

Describe the cluster workflow in conjunction with the deployment architecture diagram:

  • Start the NameServer, listen to the port after the NameServer, and wait for the Broker, Producer, and Consumer to connect, which is equivalent to a routing control center.
  • The Broker starts, keeps a long connection with all NameServers, and sends heartbeat packets periodically. The heartbeat packet contains the current broker information (IP+ port, etc.) and stores all Topic information. After the registration is successful, there is a mapping relationship between Topic and Broker in the NameServer cluster.
  • Before sending and receiving a message, create a Topic. When creating a Topic, you need to specify which Brokers the Topic should be stored on, or you can automatically create a Topic when sending a message.
  • Producer sends a message. When starting, it first establishes a long connection with one of the NameServer clusters, and obtains from the NameServer which Brokers are currently sent by the Topic. Polling selects a queue from the queue list and then establishes with the broker where the queue is located. Long connection to send a message to the broker.
  • The Consumer is similar to the Producer. It establishes a long connection with one of the NameServers, obtains which Brokers the current Topic exists on, and then directly establishes a connection channel with the Broker to start consuming messages.

权限控制


1.权限控制特性介绍

权限控制(ACL)主要为RocketMQ提供Topic资源级别的用户访问控制。用户在使用RocketMQ权限控制时,可以在Client客户端通过 RPCHook注入AccessKey和SecretKey签名;同时,将对应的权限控制属性(包括Topic访问权限、IP白名单和AccessKey和SecretKey签名等)设置在distribution/conf/plain_acl.yml的配置文件中。Broker端对AccessKey所拥有的权限进行校验,校验不过,抛出异常;
ACL客户端可以参考:org.apache.rocketmq.example.simple包下面的AclClient代码。

2. 权限控制的定义与属性值

2.1权限定义

对RocketMQ的Topic资源访问权限控制定义主要如下表所示,分为以下四种

权限 含义
DENY 拒绝
ANY PUB 或者 SUB 权限
PUB 发送权限
SUB 订阅权限

2.2 权限定义的关键属性

字段 取值 含义
globalWhiteRemoteAddresses *;192.168.*.*;192.168.0.1 全局IP白名单
accessKey 字符串 Access Key
secretKey 字符串 Secret Key
whiteRemoteAddress *;192.168.*.*;192.168.0.1 用户IP白名单
admin true;false 是否管理员账户
defaultTopicPerm DENY;PUB;SUB;PUB|SUB 默认的Topic权限
defaultGroupPerm DENY;PUB;SUB;PUB|SUB 默认的ConsumerGroup权限
topicPerms topic=权限 各个Topic的权限
groupPerms group=权限 各个ConsumerGroup的权限

具体可以参考distribution/conf/plain_acl.yml配置文件

3. 支持权限控制的集群部署

distribution/conf/plain_acl.yml配置文件中按照上述说明定义好权限属性后,打开aclEnable开关变量即可开启RocketMQ集群的ACL特性。这里贴出Broker端开启ACL特性的properties配置文件内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH
storePathRootDir=/data/rocketmq/rootdir-a-m
storePathCommitLog=/data/rocketmq/commitlog-a-m
autoCreateSubscriptionGroup=true
## if acl is open,the flag will be true
aclEnable=true
listenPort=10911
brokerIP1=XX.XX.XX.XX1
namesrvAddr=XX.XX.XX.XX:9876

4. 权限控制主要流程

ACL主要流程分为两部分,主要包括权限解析和权限校验。

4.1 权限解析

Broker端对客户端的RequestCommand请求进行解析,拿到需要鉴权的属性字段。
主要包括:
(1)AccessKey:类似于用户名,代指用户主体,权限数据与之对应;
(2)Signature:客户根据 SecretKey 签名得到的串,服务端再用SecretKey进行签名验证;

4.2 权限校验

Broker端对权限的校验逻辑主要分为以下几步:
(1)检查是否命中全局 IP 白名单;如果是,则认为校验通过;否则走 2;
(2)检查是否命中用户 IP 白名单;如果是,则认为校验通过;否则走 3;
(3)校验签名,校验不通过,抛出异常;校验通过,则走 4;
(4)对用户请求所需的权限 和 用户所拥有的权限进行校验;不通过,抛出异常;
用户所需权限的校验需要注意已下内容:
(1)特殊的请求例如 UPDATE_AND_CREATE_TOPIC 等,只能由 admin 账户进行操作;
(2)对于某个资源,如果有显性配置权限,则采用配置的权限;如果没有显性配置权限,则采用默认的权限;

5. 热加载修改后权限控制定义

RocketMQ的权限控制存储的默认实现是基于yml配置文件。用户可以动态修改权限控制定义的属性,而不需重新启动Broker服务节点。

6. 权限控制的使用限制

(1)如果ACL与高可用部署(Master/Slave架构)同时启用,那么需要在Broker Master节点的distribution/conf/plain_acl.yml配置文件中
设置全局白名单信息,即为将Slave节点的ip地址设置至Master节点plain_acl.yml配置文件的全局白名单中。

(2)如果ACL与高可用部署(多副本Dledger架构)同时启用,由于出现节点宕机时,Dledger Group组内会自动选主,那么就需要将Dledger Group组
内所有Broker节点的plain_acl.yml配置文件的白名单设置所有Broker节点的ip地址。

7. ACL mqadmin配置管理命令

7.1 更新ACL配置文件中“account”的属性值

该命令的示例如下:

sh mqadmin updateAclConfig -n 192.168.1.2:9876 -b 192.168.12.134:10911 -a RocketMQ -s 1234567809123
-t topicA=DENY,topicD=SUB -g groupD=DENY,groupB=SUB

说明:如果不存在则会在ACL Config YAML配置文件中创建;若存在,则会更新对应的“accounts”的属性值;
如果指定的是集群名称,则会在集群中各个broker节点执行该命令;否则会在单个broker节点执行该命令。

参数 取值 含义
n eg:192.168.1.2:9876 namesrv地址(必填)
c eg:DefaultCluster 指定集群名称(与broker地址二选一)
b eg:192.168.12.134:10911 指定broker地址(与集群名称二选一)
a eg:RocketMQ Access Key值(必填)
s eg:1234567809123 Secret Key值(可选)
m eg:true 是否管理员账户(可选)
w eg:192.168.0.* whiteRemoteAddress,用户IP白名单(可选)
i eg:DENY;PUB;SUB;PUB|SUB defaultTopicPerm,默认Topic权限(可选)
u eg:DENY;PUB;SUB;PUB|SUB defaultGroupPerm,默认ConsumerGroup权限(可选)
t eg:topicA=DENY,topicD=SUB topicPerms,各个Topic的权限(可选)
g eg:groupD=DENY,groupB=SUB groupPerms,各个ConsumerGroup的权限(可选)

7.2 删除ACL配置文件里面的对应“account”

该命令的示例如下:

sh mqadmin deleteAccessConfig -n 192.168.1.2:9876 -c DefaultCluster -a RocketMQ

说明:如果指定的是集群名称,则会在集群中各个broker节点执行该命令;否则会在单个broker节点执行该命令。
其中,参数”a”为Access Key的值,用以标识唯一账户id,因此该命令的参数中指定账户id即可。

参数 取值 含义
n eg:192.168.1.2:9876 namesrv地址(必填)
c eg:DefaultCluster 指定集群名称(与broker地址二选一)
b eg:192.168.12.134:10911 指定broker地址(与集群名称二选一)
a eg:RocketMQ Access Key的值(必填)

7.3 更新ACL配置文件里面中的全局白名单

该命令的示例如下:

sh mqadmin updateGlobalWhiteAddr -n 192.168.1.2:9876 -b 192.168.12.134:10911 -g 10.10.154.1,10.10.154.2

说明:如果指定的是集群名称,则会在集群中各个broker节点执行该命令;否则会在单个broker节点执行该命令。
其中,参数”g”为全局IP白名的值,用以更新ACL配置文件中的“globalWhiteRemoteAddresses”字段的属性值。

参数 取值 含义
n eg:192.168.1.2:9876 namesrv地址(必填)
c eg:DefaultCluster 指定集群名称(与broker地址二选一)
b eg:192.168.12.134:10911 指定broker地址(与集群名称二选一)
g eg:10.10.154.1,10.10.154.2 全局IP白名单(必填)

7.4 查询集群/Broker的ACL配置文件版本信息

该命令的示例如下:

sh mqadmin clusterAclConfigVersion -n 192.168.1.2:9876 -c DefaultCluster

说明:如果指定的是集群名称,则会在集群中各个broker节点执行该命令;否则会在单个broker节点执行该命令。

参数 取值 含义
n eg:192.168.1.2:9876 namesrv地址(必填)
c eg:DefaultCluster 指定集群名称(与broker地址二选一)
b eg:192.168.12.134:10911 指定broker地址(与集群名称二选一)

特别注意开启Acl鉴权认证后导致Master/Slave和Dledger模式下Broker同步数据异常的问题,
在社区[4.5.1]版本中已经修复,具体的PR链接为:https://github.com/apache/rocketmq/pull/1149;

消息轨迹


1. 消息轨迹数据关键属性

Producer端 Consumer端 Broker端
生产实例信息 消费实例信息 消息的Topic
发送消息时间 投递时间,投递轮次 消息存储位置
消息是否发送成功 消息是否消费成功 消息的Key值
发送耗时 消费耗时 消息的Tag值

2. 支持消息轨迹集群部署

2.1 Broker端配置文件

这里贴出Broker端开启消息轨迹特性的properties配置文件内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH
storePathRootDir=/data/rocketmq/rootdir-a-m
storePathCommitLog=/data/rocketmq/commitlog-a-m
autoCreateSubscriptionGroup=true
## if msg tracing is open,the flag will be true
traceTopicEnable=true
listenPort=10911
brokerIP1=XX.XX.XX.XX1
namesrvAddr=XX.XX.XX.XX:9876

2.2 普通模式

RocketMQ集群中每一个Broker节点均用于存储Client端收集并发送过来的消息轨迹数据。因此,对于RocketMQ集群中的Broker节点数量并无要求和限制。

2.3 物理IO隔离模式

对于消息轨迹数据量较大的场景,可以在RocketMQ集群中选择其中一个Broker节点专用于存储消息轨迹,使得用户普通的消息数据与消息轨迹数据的物理IO完全隔离,互不影响。在该模式下,RockeMQ集群中至少有两个Broker节点,其中一个Broker节点定义为存储消息轨迹数据的服务端。

2.4 启动开启消息轨迹的Broker

nohup sh mqbroker -c ../conf/2m-noslave/broker-a.properties &

3. 保存消息轨迹的Topic定义

RocketMQ的消息轨迹特性支持两种存储轨迹数据的方式:

3.1 系统级的TraceTopic

在默认情况下,消息轨迹数据是存储于系统级的TraceTopic中(其名称为:RMQ_SYS_TRACE_TOPIC)。该Topic在Broker节点启动时,会自动创建出来(如上所叙,需要在Broker端的配置文件中将traceTopicEnable的开关变量设置为true)。

3.2 用户自定义的TraceTopic

如果用户不准备将消息轨迹的数据存储于系统级的默认TraceTopic,也可以自己定义并创建用户级的Topic来保存轨迹(即为创建普通的Topic用于保存消息轨迹数据)。下面一节会介绍Client客户端的接口如何支持用户自定义的TraceTopic。

4. 支持消息轨迹的Client客户端实践

为了尽可能地减少用户业务系统使用RocketMQ消息轨迹特性的改造工作量,作者在设计时候采用对原来接口增加一个开关参数(enableMsgTrace)来实现消息轨迹是否开启;并新增一个自定义参(customizedTraceTopic)数来实现用户存储消息轨迹数据至自己创建的用户级Topic。

4.1 发送消息时开启消息轨迹

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName",true);
producer.setNamesrvAddr("XX.XX.XX.XX1");
producer.start();
try {
{
Message msg = new Message("TopicTest",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}

} catch (Exception e) {
e.printStackTrace();
}

4.2 订阅消息时开启消息轨迹

1
2
3
4
5
6
7
8
9
10
11
12
13
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1",true);
consumer.subscribe("TopicTest", "*");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.setConsumeTimestamp("20181109221800");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");

4.3 支持自定义存储消息轨迹Topic

在上面的发送和订阅消息时候分别将DefaultMQProducer和DefaultMQPushConsumer实例的初始化修改为如下即可支持自定义存储消息轨迹Topic。

1
2
3
4
5
6
##其中Topic_test11111需要用户自己预先创建,来保存消息轨迹;
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName",true,"Topic_test11111");
......

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1",true,"Topic_test11111");
......

Dledger集群搭建


前言

该文档主要介绍如何部署自动容灾切换的 RocketMQ-on-DLedger Group。

RocketMQ-on-DLedger Group 是指一组相同名称的 Broker,至少需要 3 个节点,通过 Raft 自动选举出一个 Leader,其余节点 作为 Follower,并在 Leader 和 Follower 之间复制数据以保证高可用。
RocketMQ-on-DLedger Group 能自动容灾切换,并保证数据一致。
RocketMQ-on-DLedger Group 是可以水平扩展的,也即可以部署任意多个 RocketMQ-on-DLedger Group 同时对外提供服务。

1. 新集群部署

1.1 编写配置

每个 RocketMQ-on-DLedger Group 至少准备三台机器(本文假设为 3)。
编写 3 个配置文件,建议参考 conf/dledger 目录下的配置文件样例。
关键配置介绍:

name 含义 举例
enableDLegerCommitLog 是否启动 DLedger true
dLegerGroup DLedger Raft Group的名字,建议和 brokerName 保持一致 RaftNode00
dLegerPeers DLedger Group 内各节点的端口信息,同一个 Group 内的各个节点配置必须要保证一致 n0-127.0.0.1:40911;n1-127.0.0.1:40912;n2-127.0.0.1:40913
dLegerSelfId 节点 id, 必须属于 dLegerPeers 中的一个;同 Group 内各个节点要唯一 n0
sendMessageThreadPoolNums 发送线程个数,建议配置成 Cpu 核数 16

这里贴出 conf/dledger/broker-n0.conf 的配置举例。

1
2
3
4
5
6
7
8
9
10
11
12
brokerClusterName = RaftCluster
brokerName=RaftNode00
listenPort=30911
namesrvAddr=127.0.0.1:9876
storePathRootDir=/tmp/rmqstore/node00
storePathCommitLog=/tmp/rmqstore/node00/commitlog
enableDLegerCommitLog=true
dLegerGroup=RaftNode00
dLegerPeers=n0-127.0.0.1:40911;n1-127.0.0.1:40912;n2-127.0.0.1:40913
## must be unique
dLegerSelfId=n0
sendMessageThreadPoolNums=16

1.2 启动 Broker

与老版本的启动方式一致。

nohup sh bin/mqbroker -c conf/dledger/xxx-n0.conf &
nohup sh bin/mqbroker -c conf/dledger/xxx-n1.conf &
nohup sh bin/mqbroker -c conf/dledger/xxx-n2.conf &

2. 旧集群升级

如果旧集群采用 Master 方式部署,则每个 Master 都需要转换成一个 RocketMQ-on-DLedger Group。
如果旧集群采用 Master-Slave 方式部署,则每个 Master-Slave 组都需要转换成一个 RocketMQ-on-DLedger Group。

2.1 杀掉旧的 Broker

可以通过 kill 命令来完成,也可以调用 bin/mqshutdown broker

2.2 检查旧的 Commitlog

RocketMQ-on-DLedger 组中的每个节点,可以兼容旧的 Commitlog ,但其 Raft 复制过程,只能针对新增加的消息。因此,为了避免出现异常,需要保证 旧的 Commitlog 是一致的。
如果旧的集群是采用 Master-Slave 方式部署,有可能在shutdown时,其数据并不是一致的,建议通过md5sum 的方式,检查最近的最少 2 个 Commmitlog 文件,如果发现不一致,则通过拷贝的方式进行对齐。

虽然 RocketMQ-on-DLedger Group 也可以以 2 节点方式部署,但其会丧失容灾切换能力(2n + 1 原则,至少需要3个节点才能容忍其中 1 个宕机)。
所以在对齐了 Master 和 Slave 的 Commitlog 之后,还需要准备第 3 台机器,并把旧的 Commitlog 从 Master 拷贝到 第 3 台机器(记得同时拷贝一下 config 文件夹)。

在 3 台机器准备好了之后,旧 Commitlog 文件也保证一致之后,就可以开始走下一步修改配置了。

2.3 修改配置

参考新集群部署。

2.4 重新启动 Broker

参考新集群部署。

Access control list

Overview

This document focuses on how to quickly deploy and use a RocketMQ cluster that supports the privilege control feature.

1. Access control features

Access Control (ACL) mainly provides Topic resource level user access control for RocketMQ.If you want to enable RocketMQ permission control, you can inject the AccessKey and SecretKey signatures through the RPCHook on the Client side.And then, the corresponding permission control attributes (including Topic access rights, IP whitelist and AccessKey and SecretKey signature) are set in the configuration file of distribution/conf/plain_acl.yml.The Broker side will check the permissions owned by the AccessKey, and if the verification fails, an exception is thrown;
The source code about ACL on the Client side can be find in org.apache.rocketmq.example.simple.AclClient.java

2. Access control definition and attribute values

2.1 Access control definition

The definition of Topic resource access control for RocketMQ is mainly as shown in the following table.

Permission explanation
DENY permission deny
ANY PUB or SUB permission
PUB Publishing permission
SUB Subscription permission

2.2 Main properties

key value explanation
globalWhiteRemoteAddresses string Global IP whitelist,example:
*;
192.168.*.*;
192.168.0.1
accessKey string Access Key
secretKey string Secret Key
whiteRemoteAddress string User IP whitelist,example:
*;
192.168.*.*;
192.168.0.1
admin true;false Whether an administrator account
defaultTopicPerm DENY;PUB;SUB;PUB|SUB Default Topic permission
defaultGroupPerm DENY;PUB;SUB;PUB|SUB Default ConsumerGroup permission
topicPerms topic=permission Topic only permission
groupPerms group=permission ConsumerGroup only permission

For details, please refer to the distribution/conf/plain_acl.yml configuration file.

3. Cluster deployment with permission control

After defining the permission attribute in the distribution/conf/plain_acl.yml configuration file as described above, open the aclEnable switch variable to enable the ACL feature of the RocketMQ cluster.The configuration file of the ACL feature enabled on the broker is as follows:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH
storePathRootDir=/data/rocketmq/rootdir-a-m
storePathCommitLog=/data/rocketmq/commitlog-a-m
autoCreateSubscriptionGroup=true
## if acl is open,the flag will be true
aclEnable=true
listenPort=10911
brokerIP1=XX.XX.XX.XX1
namesrvAddr=XX.XX.XX.XX:9876

4. Main process of access control

The main ACL process is divided into two parts, including privilege resolution and privilege check.

4.1 Privilege resolution

The Broker side parses the client’s RequestCommand request and obtains the attribute field that needs to be authenticated.
main attributes:
(1) AccessKey:Similar to the user name, on behalf of the user entity, the permission data corresponds to it;
(2) Signature:The client obtains the string according to the signature of the SecretKey, and the server uses the SecretKey to perform signature verification.

4.2 Privilege check

The check logic of the right side of the broker is mainly divided into the following steps:
(1) Check if the global IP whitelist is hit; if yes, the check passes; otherwise, go to step (2);
(2) Check if the user IP whitelist is hit; if yes, the check passes; otherwise, go to step (3);
(3) Check the signature, if the verification fails, throw an exception; if the verification passes, go to step (4);
(4) Check the permissions required by the user request and the permissions owned by the user; if not, throw an exception;

The verification of the required permissions of the user requires attention to the following points:
(1) Special requests such as UPDATE_AND_CREATE_TOPIC can only be operated by the admin account;
(2) For a resource, if there is explicit configuration permission, the configured permission is used; if there is no explicit configuration permission, the default permission is adopted;

5. Hot loading modified Access control

The default implementation of RocketrMQ’s permission control store is based on the yml configuration file. Users can dynamically modify the properties defined by the permission control without restarting the Broker service node.

Dledger快速搭建


前言

该文档主要介绍如何快速构建和部署基于 DLedger 的可以自动容灾切换的 RocketMQ 集群。

详细的新集群部署和旧集群升级指南请参考 部署指南

1. 源码构建

构建分为两个部分,需要先构建 DLedger,然后 构建 RocketMQ

1.1 构建 DLedger

git clone https://github.com/openmessaging/openmessaging-storage-dledger.git

cd openmessaging-storage-dledger

mvn clean install -DskipTests

1.2 构建 RocketMQ

git clone https://github.com/apache/rocketmq.git

cd rocketmq

git checkout -b store_with_dledger origin/store_with_dledger

mvn -Prelease-all -DskipTests clean install -U

2. 快速部署

在构建成功后

cd distribution/target/apache-rocketmq

sh bin/dledger/fast-try.sh start

如果上面的步骤执行成功,可以通过 mqadmin 运维命令查看集群状态。

sh bin/mqadmin clusterList -n 127.0.0.1:9876

顺利的话,会看到如下内容:

ClusterList

(BID 为 0 的表示 Master,其余都是 Follower)

启动成功,现在可以向集群收发消息,并进行容灾切换测试了。

关闭快速集群,可以执行:

sh bin/dledger/fast-try.sh stop

快速部署,默认配置在 conf/dledger 里面,默认的存储路径在 /tmp/rmqstore。

3. 容灾切换

部署成功,杀掉 Leader 之后(在上面的例子中,杀掉端口 30931 所在的进程),等待约 10s 左右,用 clusterList 命令查看集群,就会发现 Leader 切换到另一个节点了。

DefaultMQProducer


类简介

public class DefaultMQProducer extends ClientConfig implements MQProducer

DefaultMQProducer类是应用用来投递消息的入口,开箱即用,可通过无参构造方法快速创建一个生产者。主要负责消息的发送,支持同步/异步/oneway的发送方式,这些发送方式均支持批量发送。可以通过该类提供的getter/setter方法,调整发送者的参数。DefaultMQProducer提供了多个send方法,每个send方法略有不同,在使用前务必详细了解其意图。下面给出一个生产者示例代码,点击查看更多示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class Producer {
public static void main(String[] args) throws MQClientException {
// 创建指定分组名的生产者
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");

// 启动生产者
producer.start();

for (int i = 0; i < 128; i++)
try {
// 构建消息
Message msg = new Message("TopicTest",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));

// 同步发送
SendResult sendResult = producer.send(msg);

// 打印发送结果
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
}

producer.shutdown();
}
}

注意:该类是线程安全的。在配置并启动完成后可在多个线程间安全共享。

字段摘要

类型 字段名称 描述
DefaultMQProducerImpl defaultMQProducerImpl 生产者的内部默认实现
String producerGroup 生产者分组
String createTopicKey 在发送消息时,自动创建服务器不存在的topic
int defaultTopicQueueNums 创建topic时默认的队列数量
int sendMsgTimeout 发送消息的超时时间
int compressMsgBodyOverHowmuch 压缩消息体的阈值
int retryTimesWhenSendFailed 同步模式下内部尝试发送消息的最大次数
int retryTimesWhenSendAsyncFailed 异步模式下内部尝试发送消息的最大次数
boolean retryAnotherBrokerWhenNotStoreOK 是否在内部发送失败时重试另一个broker
int maxMessageSize 消息的最大长度
TraceDispatcher traceDispatcher 消息追踪器。使用rcpHook来追踪消息

构造方法摘要

方法名称 方法描述
DefaultMQProducer() 由默认参数值创建一个生产者
DefaultMQProducer(final String producerGroup) 使用指定的分组名创建一个生产者
DefaultMQProducer(final String producerGroup, boolean enableMsgTrace) 使用指定的分组名创建一个生产者,并设置是否开启消息追踪
DefaultMQProducer(final String producerGroup, boolean enableMsgTrace, final String customizedTraceTopic) 使用指定的分组名创建一个生产者,并设置是否开启消息追踪及追踪topic的名称
DefaultMQProducer(RPCHook rpcHook) 使用指定的hook创建一个生产者
DefaultMQProducer(final String producerGroup, RPCHook rpcHook) 使用指定的分组名及自定义hook创建一个生产者
DefaultMQProducer(final String producerGroup, RPCHook rpcHook, boolean enableMsgTrace,final String customizedTraceTopic) 使用指定的分组名及自定义hook创建一个生产者,并设置是否开启消息追踪及追踪topic的名称

使用方法摘要

返回值 方法名称 方法描述
void createTopic(String key, String newTopic, int queueNum) 在broker上创建指定的topic
void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) 在broker上创建指定的topic
long earliestMsgStoreTime(MessageQueue mq) 查询最早的消息存储时间
List fetchPublishMessageQueues(String topic) 获取topic的消息队列
long maxOffset(MessageQueue mq) 查询给定消息队列的最大offset
long minOffset(MessageQueue mq) 查询给定消息队列的最小offset
QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end) 按关键字查询消息
long searchOffset(MessageQueue mq, long timestamp) 查找指定时间的消息队列的物理offset
SendResult send(Collection msgs) 同步批量发送消息
SendResult send(Collection msgs, long timeout) 同步批量发送消息
SendResult send(Collection msgs, MessageQueue messageQueue) 向指定的消息队列同步批量发送消息
SendResult send(Collection msgs, MessageQueue messageQueue, long timeout) 向指定的消息队列同步批量发送消息,并指定超时时间
SendResult send(Message msg) 同步单条发送消息
SendResult send(Message msg, long timeout) 同步发送单条消息,并指定超时时间
SendResult send(Message msg, MessageQueue mq) 向指定的消息队列同步发送单条消息
SendResult send(Message msg, MessageQueue mq, long timeout) 向指定的消息队列同步单条发送消息,并指定超时时间
void send(Message msg, MessageQueue mq, SendCallback sendCallback) 向指定的消息队列异步单条发送消息,并指定回调方法
void send(Message msg, MessageQueue mq, SendCallback sendCallback, long timeout) 向指定的消息队列异步单条发送消息,并指定回调方法和超时时间
SendResult send(Message msg, MessageQueueSelector selector, Object arg) 向消息队列同步单条发送消息,并指定发送队列选择器
SendResult send(Message msg, MessageQueueSelector selector, Object arg, long timeout) 向消息队列同步单条发送消息,并指定发送队列选择器与超时时间
void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback) 向指定的消息队列异步单条发送消息
void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback, long timeout) 向指定的消息队列异步单条发送消息,并指定超时时间
void send(Message msg, SendCallback sendCallback) 异步发送消息
void send(Message msg, SendCallback sendCallback, long timeout) 异步发送消息,并指定回调方法和超时时间
TransactionSendResult sendMessageInTransaction(Message msg, LocalTransactionExecuter tranExecuter, final Object arg) 发送事务消息,并指定本地执行事务实例
TransactionSendResult sendMessageInTransaction(Message msg, Object arg) 发送事务消息
void sendOneway(Message msg) 单向发送消息,不等待broker响应
void sendOneway(Message msg, MessageQueue mq) 单向发送消息到指定队列,不等待broker响应
void sendOneway(Message msg, MessageQueueSelector selector, Object arg) 单向发送消息到队列选择器的选中的队列,不等待broker响应
void shutdown() 关闭当前生产者实例并释放相关资源
void start() 启动生产者
MessageExt viewMessage(String offsetMsgId) 根据给定的msgId查询消息
MessageExt public MessageExt viewMessage(String topic, String msgId) 根据给定的msgId查询消息,并指定topic

字段详细信息

  • producerGroup

    private String producerGroup

    生产者的分组名称。相同的分组名称表明生产者实例在概念上归属于同一分组。这对事务消息十分重要,如果原始生产者在事务之后崩溃,那么broker可以联系同一生产者分组的不同生产者实例来提交或回滚事务。

    默认值:DEFAULT_PRODUCER

    注意: 由数字、字母、下划线、横杠(-)、竖线(|)或百分号组成;不能为空;长度不能超过255。

  • defaultMQProducerImpl

    protected final transient DefaultMQProducerImpl defaultMQProducerImpl

    生产者的内部默认实现,在构造生产者时内部自动初始化,提供了大部分方法的内部实现。

  • createTopicKey

    private String createTopicKey = MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC

    在发送消息时,自动创建服务器不存在的topic,需要指定Key,该Key可用于配置发送消息所在topic的默认路由。

    默认值:TBW102

    建议:测试或者demo使用,生产环境下不建议打开自动创建配置。

  • defaultTopicQueueNums

    private volatile int defaultTopicQueueNums = 4

    创建topic时默认的队列数量。

    默认值:4

  • sendMsgTimeout

    private int sendMsgTimeout = 3000

    发送消息时的超时时间。

    默认值:3000,单位:毫秒

    建议:不建议修改该值,该值应该与broker配置中的sendTimeout一致,发送超时,可临时修改该值,建议解决超时问题,提高broker集群的Tps。

  • compressMsgBodyOverHowmuch

    private int compressMsgBodyOverHowmuch = 1024 * 4

    压缩消息体阈值。大于4K的消息体将默认进行压缩。

    默认值:1024 * 4,单位:字节

    建议:可通过DefaultMQProducerImpl.setZipCompressLevel方法设置压缩率(默认为5,可选范围[0,9]);可通过DefaultMQProducerImpl.tryToCompressMessage方法测试出compressLevel与compressMsgBodyOverHowmuch最佳值。

  • retryTimesWhenSendFailed

    private int retryTimesWhenSendFailed = 2

    同步模式下,在返回发送失败之前,内部尝试重新发送消息的最大次数。

    默认值:2,即:默认情况下一条消息最多会被投递3次。

    注意:在极端情况下,这可能会导致消息的重复。

  • retryTimesWhenSendAsyncFailed

    private int retryTimesWhenSendAsyncFailed = 2

    异步模式下,在发送失败之前,内部尝试重新发送消息的最大次数。

    默认值:2,即:默认情况下一条消息最多会被投递3次。

    注意:在极端情况下,这可能会导致消息的重复。

  • retryAnotherBrokerWhenNotStoreOK

    private boolean retryAnotherBrokerWhenNotStoreOK = false

    同步模式下,消息保存失败时是否重试其他broker。

    默认值:false

    注意:此配置关闭时,非投递时产生异常情况下,会忽略retryTimesWhenSendFailed配置。

  • maxMessageSize

    private int maxMessageSize = 1024 * 1024 * 4

    消息的最大大小。当消息题的字节数超过maxMessageSize就发送失败。

    默认值:1024 * 1024 * 4,单位:字节

  • traceDispatcher

    private TraceDispatcher traceDispatcher = null

    在开启消息追踪后,该类通过hook的方式把消息生产者,消息存储的broker和消费者消费消息的信息像链路一样记录下来。在构造生产者时根据构造入参enableMsgTrace来决定是否创建该对象。

构造方法详细信息

  1. DefaultMQProducer

    public DefaultMQProducer()

    创建一个新的生产者。

  2. DefaultMQProducer

    DefaultMQProducer(final String producerGroup)

    使用指定的分组名创建一个生产者。

    • 入参描述:

      参数名 类型 是否必须 缺省值 描述
      producerGroup String DEFAULT_PRODUCER 生产者的分组名称
  3. DefaultMQProducer

    DefaultMQProducer(final String producerGroup, boolean enableMsgTrace)

    使用指定的分组名创建一个生产者,并设置是否开启消息追踪。

    • 入参描述:

      参数名 类型 是否必须 缺省值 描述
      producerGroup String DEFAULT_PRODUCER 生产者的分组名称
      enableMsgTrace boolean false 是否开启消息追踪
  4. DefaultMQProducer

    DefaultMQProducer(final String producerGroup, boolean enableMsgTrace, final String customizedTraceTopic)

    使用指定的分组名创建一个生产者,并设置是否开启消息追踪及追踪topic的名称。

    • 入参描述:

      参数名 类型 是否必须 缺省值 描述
      producerGroup String DEFAULT_PRODUCER 生产者的分组名称
      rpcHook RPCHook null 每个远程命令执行后会回调rpcHook
      enableMsgTrace boolean false 是否开启消息追踪
      customizedTraceTopic String RMQ_SYS_TRACE_TOPIC 消息跟踪topic的名称
  5. DefaultMQProducer

    DefaultMQProducer(RPCHook rpcHook)

    使用指定的hook创建一个生产者。

    • 入参描述:

      参数名 类型 是否必须 缺省值 描述
      rpcHook RPCHook null 每个远程命令执行后会回调rpcHook
  6. DefaultMQProducer

    DefaultMQProducer(final String producerGroup, RPCHook rpcHook)

    使用指定的分组名及自定义hook创建一个生产者。

    • 入参描述:

      参数名 类型 是否必须 缺省值 描述
      producerGroup String DEFAULT_PRODUCER 生产者的分组名称
      rpcHook RPCHook null 每个远程命令执行后会回调rpcHook
  7. DefaultMQProducer

    DefaultMQProducer(final String producerGroup, RPCHook rpcHook, boolean enableMsgTrace,final String customizedTraceTopic)

    使用指定的分组名及自定义hook创建一个生产者,并设置是否开启消息追踪及追踪topic的名称。

    • 入参描述:
    参数名 类型 是否必须 缺省值 描述
    producerGroup String DEFAULT_PRODUCER 生产者的分组名称
    rpcHook RPCHook null 每个远程命令执行后会回调rpcHook
    enableMsgTrace boolean false 是否开启消息追踪
    customizedTraceTopic String RMQ_SYS_TRACE_TOPIC 消息跟踪topic的名称

使用方法详细信息

  1. createTopic

    public void createTopic(String key, String newTopic, int queueNum)

    在broker上创建一个topic。

    • 入参描述:

      参数名 类型 是否必须 默认值 值范围 说明
      key String 访问密钥。
      newTopic String 新建topic的名称。由数字、字母、下划线(_)、横杠(-)、竖线(&#124;)或百分号(%)组成;
      长度小于255;不能为TBW102或空。
      queueNum int 0 (0, maxIntValue] topic的队列数量。
    • 返回值描述:

      void

    • 异常描述:

      MQClientException - 生产者状态非Running;未找到broker等客户端异常。

  2. createTopic

    public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag)

    在broker上创建一个topic。

    • 入参描述:

      参数名 类型 是否必须 默认值 值范围 说明
      key String 访问密钥。
      newTopic String 新建topic的名称。
      queueNum int 0 (0, maxIntValue] topic的队列数量。
      topicSysFlag int 0 保留字段,暂未使用。
    • 返回值描述:

      void

    • 异常描述:

      MQClientException - 生产者状态非Running;未找到broker等客户端异常。

  3. earliestMsgStoreTime

    public long earliestMsgStoreTime(MessageQueue mq)

    查询最早的消息存储时间。

    • 入参描述:

      参数名 类型 是否必须 默认值 值范围 说明
      mq MessageQueue 要查询的消息队列
    • 返回值描述:

      指定队列最早的消息存储时间。单位:毫秒。

    • 异常描述:

      MQClientException - 生产者状态非Running;没有找到broker;broker返回失败;网络异常;线程中断等客户端异常。

  4. fetchPublishMessageQueues

    public List<MessageQueue> fetchPublishMessageQueues(String topic)

    获取topic的消息队列。

    • 入参描述:

      参数名 类型 是否必须 默认值 值范围 说明
      topic String topic名称
    • 返回值描述:

      传入topic下的消息队列。

    • 异常描述:

      MQClientException - 生产者状态非Running;没有找到broker;broker返回失败;网络异常;线程中断等客户端异常。

  5. maxOffset

    public long maxOffset(MessageQueue mq)

    查询消息队列的最大物理偏移量。

    • 入参描述:

      参数名 类型 是否必须 默认值 值范围 说明
      mq MessageQueue 要查询的消息队列
    • 返回值描述:

      给定消息队列的最大物理偏移量。

    • 异常描述:

      MQClientException - 生产者状态非Running;没有找到broker;broker返回失败;网络异常;线程中断等客户端异常。

  6. minOffset

    public long minOffset(MessageQueue mq)

    查询给定消息队列的最小物理偏移量。

    • 入参描述:

      参数名 类型 是否必须 默认值 值范围 说明
      mq MessageQueue 要查询的消息队列
    • 返回值描述:

      给定消息队列的最小物理偏移量。

    • 异常描述:

      MQClientException - 生产者状态非Running;没有找到broker;broker返回失败;网络异常;线程中断等客户端异常。

  7. queryMessage

    public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end)

    按关键字查询消息。

    • 入参描述:

      参数名 类型 是否必须 默认值 值范围 说明
      topic String topic名称
      key String null 查找的关键字
      maxNum int 返回消息的最大数量
      begin long 开始时间戳,单位:毫秒
      end long 结束时间戳,单位:毫秒
    • 返回值描述:

      查询到的消息集合。

    • 异常描述:

      MQClientException - 生产者状态非Running;没有找到broker;broker返回失败;网络异常等客户端异常客户端异常。

      InterruptedException - 线程中断。

  8. searchOffset

    public long searchOffset(MessageQueue mq, long timestamp)

    查找指定时间的消息队列的物理偏移量。

    • 入参描述:

      参数名 类型 是否必须 默认值 值范围 说明
      mq MessageQueue 要查询的消息队列。
      timestamp long 指定要查询时间的时间戳。单位:毫秒。
    • 返回值描述:

      指定时间的消息队列的物理偏移量。

    • 异常描述:

      MQClientException - 生产者状态非Running;没有找到broker;broker返回失败;网络异常;线程中断等客户端异常。

  9. send

    public SendResult send(Collection<Message> msgs)

    同步批量发送消息。在返回发送失败之前,内部尝试重新发送消息的最大次数(参见retryTimesWhenSendFailed属性)。未明确指定发送队列,默认采取轮询策略发送。

    • 入参描述:

      参数名 类型 是否必须 默认值 值范围 说明
      msgs Collection 待发送的消息集合。集合内的消息必须属同一个topic。
    • 返回值描述:

      批量消息的发送结果,包含msgId,发送状态等信息。

    • 异常描述:

      MQClientException - broker不存在或未找到;namesrv地址为空;未找到topic的路由信息等客户端异常。

      RemotingException - 网络异常。

      MQBrokerException - broker发生错误。

      InterruptedException - 发送线程中断。

      RemotingTooMuchRequestException - 发送超时。

  10. send

    public SendResult send(Collection<Message> msgs, long timeout)

    同步批量发送消息,如果在指定的超时时间内未完成消息投递,会抛出RemotingTooMuchRequestException
    在返回发送失败之前,内部尝试重新发送消息的最大次数(参见retryTimesWhenSendFailed属性)。未明确指定发送队列,默认采取轮询策略发送。

    • 入参描述:

      参数名 类型 是否必须 默认值 值范围 说明
      msgs Collection 待发送的消息集合。集合内的消息必须属同一个topic。
      timeout long 参见sendMsgTimeout属性 发送超时时间,单位:毫秒。
    • 返回值描述:

      批量消息的发送结果,包含msgId,发送状态等信息。

    • 异常描述:

      MQClientException - broker不存在或未找到;namesrv地址为空;未找到topic的路由信息等客户端异常。

      RemotingException - 网络异常。

      MQBrokerException - broker发生错误。

      InterruptedException - 发送线程中断。

      RemotingTooMuchRequestException - 发送超时。

  11. send

    public SendResult send(Collection<Message> msgs, MessageQueue messageQueue)

    向给定队列同步批量发送消息。

    注意:指定队列意味着所有消息均为同一个topic。

    • 入参描述:

      参数名 类型 是否必须 默认值 值范围 说明
      msgs Collection 待发送的消息集合。集合内的消息必须属同一个topic。
      messageQueue MessageQueue 待投递的消息队列。指定队列意味着待投递消息均为同一个topic。
    • 返回值描述:

      批量消息的发送结果,包含msgId,发送状态等信息。

    • 异常描述:

      MQClientException - broker不存在或未找到;namesrv地址为空;未找到topic的路由信息等客户端异常。

      RemotingException - 网络异常。

      MQBrokerException - broker发生错误。

      InterruptedException - 发送线程中断。

      RemotingTooMuchRequestException - 发送超时。

  12. send

    public SendResult send(Collection<Message> msgs, MessageQueue messageQueue, long timeout)

    向给定队列同步批量发送消息,如果在指定的超时时间内未完成消息投递,会抛出RemotingTooMuchRequestException

    注意:指定队列意味着所有消息均为同一个topic。

    • 入参描述:

      参数名 类型 是否必须 默认值 值范围 说明
      msgs Collection 待发送的消息集合。集合内的消息必须属同一个topic。
      timeout long 参见sendMsgTimeout属性 发送超时时间,单位:毫秒。
      messageQueue MessageQueue 待投递的消息队列。指定队列意味着待投递消息均为同一个topic。
    • 返回值描述:

      批量消息的发送结果,包含msgId,发送状态等信息。

    • 异常描述:

      MQClientException - broker不存在或未找到;namesrv地址为空;未找到topic的路由信息等客户端异常。

      RemotingException - 网络异常。

      MQBrokerException - broker发生错误。

      InterruptedException - 发送线程中断。

      RemotingTooMuchRequestException - 发送超时。

  13. send

    public SendResult send(Message msg)

    以同步模式发送消息,仅当发送过程完全完成时,此方法才会返回。
    在返回发送失败之前,内部尝试重新发送消息的最大次数(参见retryTimesWhenSendFailed属性)。未明确指定发送队列,默认采取轮询策略发送。

    • 入参描述:

      参数名 类型 是否必须 默认值 值范围 说明
      msg Message 待发送的消息。
    • 返回值描述:

      消息的发送结果,包含msgId,发送状态等信息。

    • 异常描述:

      MQClientException - broker不存在或未找到;namesrv地址为空;未找到topic的路由信息等客户端异常。

      RemotingException - 网络异常。

      MQBrokerException - broker发生错误。

      InterruptedException - 发送线程中断。

      RemotingTooMuchRequestException - 发送超时。

  14. send

    public SendResult send(Message msg, long timeout)

    以同步模式发送消息,如果在指定的超时时间内未完成消息投递,会抛出RemotingTooMuchRequestException。仅当发送过程完全完成时,此方法才会返回。
    在返回发送失败之前,内部尝试重新发送消息的最大次数(参见retryTimesWhenSendFailed属性)。未明确指定发送队列,默认采取轮询策略发送。

    • 入参描述:

      参数名 类型 是否必须 默认值 值范围 说明
      msg Message 待发送的消息。
      timeout long 参见sendMsgTimeout属性 发送超时时间,单位:毫秒。
    • 返回值描述:

      消息的发送结果,包含msgId,发送状态等信息。

    • 异常描述:

      MQClientException - broker不存在或未找到;namesrv地址为空;未找到topic的路由信息等客户端异常。

      RemotingException - 网络异常。

      MQBrokerException - broker发生错误。

      InterruptedException - 发送线程中断。

      RemotingTooMuchRequestException - 发送超时。

  15. send

    public SendResult send(Message msg, MessageQueue mq)

    向指定的消息队列同步发送单条消息。仅当发送过程完全完成时,此方法才会返回。

    • 入参描述:

      参数名 类型 是否必须 默认值 值范围 说明
      msg Message 待发送的消息。
      mq MessageQueue 待投递的消息队列。
    • 返回值描述:

      消息的发送结果,包含msgId,发送状态等信息。

    • 异常描述:

      MQClientException - broker不存在或未找到;namesrv地址为空;未找到topic的路由信息等客户端异常。

      RemotingException - 网络异常。

      MQBrokerException - broker发生错误。

      InterruptedException - 发送线程中断。

      RemotingTooMuchRequestException - 发送超时。

  16. send

    public SendResult send(Message msg, MessageQueue mq, long timeout)

    向指定的消息队列同步发送单条消息,如果在指定的超时时间内未完成消息投递,会抛出RemotingTooMuchRequestException。仅当发送过程完全完成时,此方法才会返回。

    • 入参描述:

      参数名 类型 是否必须 默认值 值范围 说明
      msg Message 待发送的消息。
      timeout long 参见sendMsgTimeout属性 发送超时时间,单位:毫秒。
      mq MessageQueue 待投递的消息队列。指定队列意味着待投递消息均为同一个topic。
    • 返回值描述:

      消息的发送结果,包含msgId,发送状态等信息。

    • 异常描述:

      MQClientException - broker不存在或未找到;namesrv地址为空;未找到topic的路由信息等客户端异常。

      RemotingException - 网络异常。

      MQBrokerException - broker发生错误。

      InterruptedException - 发送线程中断。

      RemotingTooMuchRequestException - 发送超时。

  17. send

    public void send(Message msg, MessageQueue mq, SendCallback sendCallback)

    向指定的消息队列异步发送单条消息,异步发送调用后直接返回,并在在发送成功或者异常时回调sendCallback,所以异步发送时sendCallback参数不能为null,否则在回调时会抛出NullPointerException
    异步发送时,在成功发送前,其内部会尝试重新发送消息的最大次数(参见retryTimesWhenSendAsyncFailed属性)。

    • 入参描述:

      参数名 类型 是否必须 默认值 值范围 说明
      msg Message 待发送的消息。
      mq MessageQueue 待投递的消息队列。指定队列意味着待投递消息均为同一个topic。
      sendCallback SendCallback 回调接口的实现。
    • 返回值描述:

      void

    • 异常描述:

      MQClientException - broker不存在或未找到;namesrv地址为空;未找到topic的路由信息等客户端异常。

      RemotingException - 网络异常。

      InterruptedException - 发送线程中断。

  18. send

    public void send(Message msg, MessageQueue mq, SendCallback sendCallback, long timeout)

    向指定的消息队列异步发送单条消息,异步发送调用后直接返回,并在在发送成功或者异常时回调sendCallback,所以异步发送时sendCallback参数不能为null,否则在回调时会抛出NullPointerException
    若在指定时间内消息未发送成功,回调方法会收到RemotingTooMuchRequestException异常。
    异步发送时,在成功发送前,其内部会尝试重新发送消息的最大次数(参见retryTimesWhenSendAsyncFailed属性)。

    • 入参描述:

      参数名 类型 是否必须 默认值 值范围 说明
      msg Message 待发送的消息。
      mq MessageQueue 待投递的消息队列。
      sendCallback SendCallback 回调接口的实现。
      timeout long 参见sendMsgTimeout属性 发送超时时间,单位:毫秒。
    • 返回值描述:
      void

    • 异常描述:

      MQClientException - broker不存在或未找到;namesrv地址为空;未找到topic的路由信息等客户端异常。

      RemotingException - 网络异常。

      InterruptedException - 发送线程中断。

  19. send

    public SendResult send(Message msg, MessageQueueSelector selector, Object arg)

    向通过MessageQueueSelector计算出的队列同步发送消息。

    可以通过自实现MessageQueueSelector接口,将某一类消息发送至固定的队列。比如:将同一个订单的状态变更消息投递至固定的队列。

    注意:此消息发送失败内部不会重试。

    • 入参描述:

      参数名 类型 是否必须 默认值 值范围 说明
      msg Message 待发送的消息。
      selector MessageQueueSelector 队列选择器。
      arg Object 供队列选择器使用的参数对象。
    • 返回值描述:

      消息的发送结果,包含msgId,发送状态等信息。

    • 异常描述:

      MQClientException - broker不存在或未找到;namesrv地址为空;未找到topic的路由信息等客户端异常。

      RemotingException - 网络异常。

      MQBrokerException - broker发生错误。

      InterruptedException - 发送线程中断。

      RemotingTooMuchRequestException - 发送超时。

  20. send

    public SendResult send(Message msg, MessageQueueSelector selector, Object arg, long timeout)

    向通过MessageQueueSelector计算出的队列同步发送消息,并指定发送超时时间。

    可以通过自实现MessageQueueSelector接口,将某一类消息发送至固定的队列。比如:将同一个订单的状态变更消息投递至固定的队列。

    注意:此消息发送失败内部不会重试。

    • 入参描述:

      参数名 类型 是否必须 默认值 值范围 说明
      msg Message 待发送的消息。
      selector MessageQueueSelector 队列选择器。
      arg Object 供队列选择器使用的参数对象。
      timeout long 参见sendMsgTimeout属性 发送超时时间,单位:毫秒。
    • 返回值描述:

      消息的发送结果,包含msgId,发送状态等信息。

    • 异常描述:
      MQClientException - broker不存在或未找到;namesrv地址为空;未找到topic的路由信息等客户端异常。

      RemotingException - 网络异常。

      MQBrokerException - broker发生错误。

      InterruptedException - 发送线程中断。

      RemotingTooMuchRequestException - 发送超时。

  21. send

    public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback)

    向通过MessageQueueSelector计算出的队列异步发送单条消息,异步发送调用后直接返回,并在在发送成功或者异常时回调sendCallback,所以异步发送时sendCallback参数不能为null,否则在回调时会抛出NullPointerException
    异步发送时,在成功发送前,其内部会尝试重新发送消息的最大次数(参见retryTimesWhenSendAsyncFailed属性)。

    可以通过自实现MessageQueueSelector接口,将某一类消息发送至固定的队列。比如:将同一个订单的状态变更消息投递至固定的队列。

    • 入参描述:

      参数名 类型 是否必须 默认值 值范围 说明
      msg Message 待发送的消息。
      selector MessageQueueSelector 队列选择器。
      arg Object 供队列选择器使用的参数对象。
      sendCallback SendCallback 回调接口的实现。
    • 返回值描述:

      void

    • 异常描述:

      MQClientException - broker不存在或未找到;namesrv地址为空;未找到topic的路由信息等客户端异常。

      RemotingException - 网络异常。

      InterruptedException - 发送线程中断。

  22. send

    public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback, long timeout)

    向通过MessageQueueSelector计算出的队列异步发送单条消息,异步发送调用后直接返回,并在在发送成功或者异常时回调sendCallback,所以异步发送时sendCallback参数不能为null,否则在回调时会抛出NullPointerException
    异步发送时,在成功发送前,其内部会尝试重新发送消息的最大次数(参见retryTimesWhenSendAsyncFailed属性)。

    可以通过自实现MessageQueueSelector接口,将某一类消息发送至固定的队列。比如:将同一个订单的状态变更消息投递至固定的队列。

    • 入参描述:

      参数名 类型 是否必须 默认值 值范围 说明
      msg Message 待发送的消息。
      selector MessageQueueSelector 队列选择器。
      arg Object 供队列选择器使用的参数对象。
      sendCallback SendCallback 回调接口的实现。
      timeout long 参见sendMsgTimeout属性 发送超时时间,单位:毫秒。
    • 返回值描述:

      void

    • 异常描述:

      MQClientException - broker不存在或未找到;namesrv地址为空;未找到topic的路由信息等客户端异常。

      RemotingException - 网络异常。

      InterruptedException - 发送线程中断。

  23. send

    public void send(Message msg, SendCallback sendCallback)

    异步发送单条消息,异步发送调用后直接返回,并在在发送成功或者异常时回调sendCallback,所以异步发送时sendCallback参数不能为null,否则在回调时会抛出NullPointerException
    异步发送时,在成功发送前,其内部会尝试重新发送消息的最大次数(参见retryTimesWhenSendAsyncFailed属性)。

    • 入参描述:

      参数名 类型 是否必须 默认值 值范围 说明
      msg Message 待发送的消息。
      sendCallback SendCallback 回调接口的实现。
    • 返回值描述:

      void

    • 异常描述:

      MQClientException - broker不存在或未找到;namesrv地址为空;未找到topic的路由信息等客户端异常。

      RemotingException - 网络异常。

      InterruptedException - 发送线程中断。

  24. send

    public void send(Message msg, SendCallback sendCallback, long timeout)

    异步发送单条消息,异步发送调用后直接返回,并在在发送成功或者异常时回调sendCallback,所以异步发送时sendCallback参数不能为null,否则在回调时会抛出NullPointerException
    异步发送时,在成功发送前,其内部会尝试重新发送消息的最大次数(参见retryTimesWhenSendAsyncFailed属性)。

    • 入参描述:

      参数名 类型 是否必须 默认值 值范围 说明
      msg Message 待发送的消息。
      sendCallback SendCallback 回调接口的实现。
      timeout long 参见sendMsgTimeout属性 发送超时时间,单位:毫秒。
    • 返回值描述:

      void

    • 异常描述:

      MQClientException - broker不存在或未找到;namesrv地址为空;未找到topic的路由信息等客户端异常。

      RemotingException - 网络异常。

      InterruptedException - 发送线程中断。

  25. sendMessageInTransaction

    public TransactionSendResult sendMessageInTransaction(Message msg, LocalTransactionExecuter tranExecuter, final Object arg)

    发送事务消息。该类不做默认实现,抛出RuntimeException异常。参见:TransactionMQProducer类。

    • 入参描述:

      参数名 类型 是否必须 默认值 值范围 说明
      msg Message 待投递的事务消息
      tranExecuter LocalTransactionExecuter 本地事务执行器。该类已过期,将在5.0.0版本中移除。请勿使用该方法。
      arg Object 供本地事务执行程序使用的参数对象
    • 返回值描述:

      事务结果,参见:LocalTransactionState类。

    • 异常描述:

      RuntimeException - 永远抛出该异常。

  26. sendMessageInTransaction

    public TransactionSendResult sendMessageInTransaction(Message msg, final Object arg)

    发送事务消息。该类不做默认实现,抛出RuntimeException异常。参见:TransactionMQProducer类。

    • 入参描述:

      参数名 类型 是否必须 默认值 值范围 说明
      msg Message 待投递的事务消息
      arg Object 供本地事务执行程序使用的参数对象
    • 返回值描述:

      事务结果,参见:LocalTransactionState类。

    • 异常描述:

      RuntimeException - 永远抛出该异常。

  27. sendOneway

    public void sendOneway(Message msg)

    以oneway形式发送消息,broker不会响应任何执行结果,和UDP类似。它具有最大的吞吐量但消息可能会丢失。

    可在消息量大,追求高吞吐量并允许消息丢失的情况下使用该方式。

    • 入参描述:

      参数名 类型 是否必须 默认值 值范围 说明
      msg Message 待投递的消息
    • 返回值描述:

      void

    • 异常描述:

      MQClientException - broker不存在或未找到;namesrv地址为空;未找到topic的路由信息等客户端异常。

      RemotingException - 网络异常。

      InterruptedException - 发送线程中断。

  28. sendOneway

    public void sendOneway(Message msg, MessageQueue mq)

    向指定队列以oneway形式发送消息,broker不会响应任何执行结果,和UDP类似。它具有最大的吞吐量但消息可能会丢失。

    可在消息量大,追求高吞吐量并允许消息丢失的情况下使用该方式。

    • 入参描述:

      参数名 类型 是否必须 默认值 值范围 说明
      msg Message 待投递的消息
      mq MessageQueue 待投递的消息队列
    • 返回值描述:
      void

    • 异常描述:
      MQClientException - broker不存在或未找到;namesrv地址为空;未找到topic的路由信息等客户端异常。

      RemotingException - 网络异常。

      InterruptedException - 发送线程中断。

  29. sendOneway

    public void sendOneway(Message msg, MessageQueueSelector selector, Object arg)

    向通过MessageQueueSelector计算出的队列以oneway形式发送消息,broker不会响应任何执行结果,和UDP类似。它具有最大的吞吐量但消息可能会丢失。

    可在消息量大,追求高吞吐量并允许消息丢失的情况下使用该方式。

    • 入参描述:

      参数名 类型 是否必须 默认值 值范围 说明
      msg Message 待发送的消息。
      selector MessageQueueSelector 队列选择器。
      arg Object 供队列选择器使用的参数对象。
    • 返回值描述:

      void

    • 异常描述:

      MQClientException - broker不存在或未找到;namesrv地址为空;未找到topic的路由信息等客户端异常。

      RemotingException - 网络异常。

      InterruptedException - 发送线程中断。

  30. shutdown

    public void shutdown()

    关闭当前生产者实例并释放相关资源。

    • 入参描述:

      无。

    • 返回值描述:

      void

    • 异常描述:

  31. start

    public void start()

    启动生产者实例。在发送或查询消息之前必须调用此方法。它执行了许多内部初始化,比如:检查配置、与namesrv建立连接、启动一系列心跳等定时任务等。

    • 入参描述:

      无。

    • 返回值描述:

      void

    • 异常描述:

      MQClientException - 初始化过程中出现失败。

  32. viewMessage

    public MessageExt viewMessage(String offsetMsgId)

    根据给定的msgId查询消息。

    • 入参描述:

      参数名 类型 是否必须 默认值 值范围 说明
      offsetMsgId String offsetMsgId
    • 返回值描述:

      返回MessageExt,包含:topic名称,消息题,消息ID,消费次数,生产者host等信息。

    • 异常描述:

      RemotingException - 网络层发生错误。

      MQBrokerException - broker发生错误。

      InterruptedException - 线程被中断。

      MQClientException - 生产者状态非Running;msgId非法等。

  33. viewMessage

    public MessageExt viewMessage(String topic, String msgId)

    根据给定的msgId查询消息,并指定topic。

    • 入参描述:

      参数名 类型 是否必须 默认值 值范围 说明
      msgId String msgId
      topic String topic名称
    • 返回值描述:

      返回MessageExt,包含:topic名称,消息题,消息ID,消费次数,生产者host等信息。

    • 异常描述:

      RemotingException - 网络层发生错误。

      MQBrokerException - broker发生错误。

      InterruptedException - 线程被中断。

      MQClientException - 生产者状态非Running;msgId非法等。