Talk is cheap,show me the code

开源,协作,共享,进步

0%

Transaction Message

1 Transaction Message

Apache RocketMQ supports distributed transaction message from version 4.3.0. RocketMQ implements transaction message by using the protocol of 2PC(two-phase commit), in addition adding a compensation logic to handle timeout-case or failure-case of commit-phase, as shown below.

1.1 The Process of RocketMQ Transaction Message

The picture above shows the overall architecture of transaction message, including the sending of message(commit-request phase), the sending of commit/rollback(commit phase) and the compensation process.

1 The sending of message and Commit/Rollback.
(1) Sending the message(named Half message in RocketMQ)
(2) The server responds the writing result(success or failure) of Half message.
(3) Handle local transaction according to the result(local transaction won’t be executed when the result is failure).
(4) Sending Commit/Rollback to broker according to the result of local transaction(Commit will generate message index and make the message visible to consumers).

2 Compensation process
(1) For a transaction message without a Commit/Rollback (means the message in the pending status), a “back-check” request is initiated from the broker.
(2) The Producer receives the “back-check” request and checks the status of the local transaction corresponding to the “back-check” message.
(3) Redo Commit or Rollback based on local transaction status.
The compensation phase is used to resolve the timeout or failure case of the message Commit or Rollback.

1.2 The design of RocketMQ Transaction Message

1 Transaction message is invisible to users in first phase(commit-request phase)

Upon on the main process of transaction message, the message of first phase is invisible to the user. This is also the biggest difference from normal message. So how do we write the message while making it invisible to the user? And below is the solution of RocketMQ: if the message is a Half message, the topic and queueId of the original message will be backed up, and then changes the topic to RMQ_SYS_TRANS_HALF_TOPIC. Since the consumer group does not subscribe to the topic, the consumer cannot consume the Half message. Then RocketMQ starts a timing task, pulls the message for RMQ_SYS_TRANS_HALF_TOPIC, obtains a channel according to producer group and sends a back-check to query local transaction status, and decide whether to submit or roll back the message according to the status.

In RocketMQ, the storage structure of the message in the broker is as follows. Each message has corresponding index information. The Consumer reads the content of the message through the secondary index of the ConsumeQueue. The flow is as follows:

The specific implementation strategy of RocketMQ is: if the transaction message is written, topic and queueId of the message are replaced, and the original topic and queueId are stored in the properties of the message. Because the replace of the topic, the message will not be forwarded to the Consumer Queue of the original topic, and the consumer cannot perceive the existence of the message and will not consume it. In fact, changing the topic is the conventional method of RocketMQ(just recall the implementation mechanism of the delay message).

2 Commit/Rollback operation and introduction of Op message

After finishing writing a message that is invisible to the user in the first phase, here comes two cases in the second phase. One is Commit operation, after which the message needs to be visible to the user; the other one is Rollback operation, after which the first phase message(Half message) needs to be revoked. For the case of Rollback, since first-phase message itself is invisible to the user, there is no need to actually revoke the message (in fact, RocketMQ can’t actually delete a message because it is a sequential-write file). But still some operation needs to be done to identity the final status of the message, to differ it from pending status message. To do this, the concept of “Op message” is introduced, which means the message has a certain status(Commit or Rollback). If a transaction message does not have a corresponding Op message, the status of the transaction is still undetermined (probably the second-phase failed). By introducing the Op message, the RocketMQ records an Op message for every Half message regardless it is Commit or Rollback. The only difference between Commit and Rollback is that when it comes to Commit, the index of the Half message is created before the Op message is written.

3 How Op message stored and the correspondence between Op message and Half message

RocketMQ writes the Op message to a specific system topic(RMQ_SYS_TRANS_OP_HALF_TOPIC) which will be created via the method - TransactionalMessageUtil.buildOpTopic(); this topic is an internal Topic (like the topic of RMQ_SYS_TRANS_HALF_TOPIC) and will not be consumed by the user. The content of the Op message is the physical offset of the corresponding Half message. Through the Op message we can index to the Half message for subsequent check-back operation.

4 Index construction of Half messages

When performing Commit operation of the second phase, the index of the Half message needs to be built. Since the Half message is written to a special topic(RMQ_SYS_TRANS_HALF_TOPIC) in the first phase of 2PC, so it needs to be read out from the special topic when building index, and replace the topic and queueId with the real target topic and queueId, and then write through a normal message that is visible to the user. Therefore, in conclusion, the second phase recovers a complete normal message using the content of the Half message stored in the first phase, and then goes through the message-writing process.

5 How to handle the message failed in the second phase?

If commit/rollback phase fails, for example, a network problem causes the Commit to fail when you do Commit. Then certain strategy is required to make sure the message finally commit. RocketMQ uses a compensation mechanism called “back-check”. The broker initiates a back-check request for the message in pending status, and sends the request to the corresponding producer side (the same producer group as the producer group who sent the Half message). The producer checks the status of local transaction and redo Commit or Rollback. The broker performs the back-check by comparing the RMQ_SYS_TRANS_HALF_TOPIC messages and the RMQ_SYS_TRANS_OP_HALF_TOPIC messages and advances the checkpoint(recording those transaction messages that the status are certain).

RocketMQ does not back-check the status of transaction messages endlessly. The default time is 15. If the transaction status is still unknown after 15 times, RocketMQ will roll back the message by default.

Instructions on the use of mqadmin Management tools

Before introducing the mqadmin management tool, the following points need to be declared:

  • The way of executing a command is:./mqadmin {command} {args}
  • Almost all commands need to attach the -n option to represent the nameServer address, formatted as ip:port;
  • Almost all commands can get help information with the -h option;
  • If the broker address -b option and clusterName -c option are both configured with specific values, the command execution will select the broker address specified by -b option. The value of the -b option can only be configured with a single address. The format is ip:port. The default port value is 10911. If the value of the -b option is not configured, the command will be applied to all brokers in the entire cluster.
  • You can see many commands under tools, but not all commands can be used, only the commands initialized in MQAdminStartup can be used, you can also modify this class, add or customize commands;
  • Due to the issue of version update, a small number of commands may not be updated in time, please read the related command source code to eliminate and resolve the error.
Name Meaning Command option Explain
updateTopic Create or update the configuration of topic -b The -b option declares the specific address of the broker, indicating that the broker, in which the topic is located supports only a single broker and the address format is ip:port.
-c The -c option declares the name of the cluster, which represents the cluster in which the current topic is located. (clusters are available through clusterList query)
-h- Print help information
-n Declare the service address of the nameServer, and the option format is ip:port
-p The -p option is used to specify the read and write permission for the new topic (W=2 | R=4 | WR=6)
-r The -r option declares the number of readable queues (default 8)
-w The -w option declares the number of writable queues (default 8)
-t The -t option declares the name of the topic (the name can only use characters^ [a-zA-Z0-9s -] + $)
deleteTopic Delete the topic command -c The -c option specifies the name of the cluster, which means that one of the topic in the specified cluster is deleted (cluster names can be queried via clusterList)
-h Print help information
-n Declare the service address of the nameServer, and the option format is ip:port
-t The -t option declares the name of the topic (the name can only use characters^ [a-zA-Z0-9s -] + $)
topicList View topic list information -h Print help information
-c If the -c option is not configured, only the topic list is returned, and the addition of -c option returns additional information about the clusterName, topic, consumerGroup, that is, the cluster and subscription to which the topic belongs, and no other option need to be configured.
-n Declare the service address of the nameServer, and the option format is ip:port
topicRoute To view topic specific routing information -t Used to specify the name of the topic
-h Print help information
-n Declare the service address of the nameServer, and the option format is ip:port
topicStatus The location of the offset used to view the topic message queue -t Used to specify the name of the topic
-h Print help information
-n Declare the service address of the nameServer, and the option format is ip:port
topicClusterList To view the list of clusters to which topic belongs -t Used to specify the name of the topic
-h Print help information
-n Declare the service address of the nameServer, and the option format is ip:port
updateTopicPerm This command is used to update read and write permissions for topic -t Used to specify the name of the topic
-h Print help information
-n Declare the service address of the nameServer, and the option format is ip:port
-b The -b option declares the specific address of the broker, indicating that the broker, in which the topic is located supports only a single broker and the address format is ip:port.
-p The -p option is used to specify the read and write permission for the new topic (W=2 | R=4 | WR=6)
-c Used to specify the name of the cluster that represents the cluster in which the topic is located, which can be accessed through the clusterList query, but the -b parameter has a higher priority, and if no -b option related configuration is specified, the command is executed on all broker in the cluster
updateOrderConf The key, value configuration that creates, deletes, and retrieves specific namespaces from nameServer is not yet enabled. -h Print help information
-n Declare the service address of the nameServer, and the option format is ip:port
-t topic,key
-v orderConf,value
-m method,available values include get, put, delete
allocateMQ Computing load result of load message queue in consumer list with average load algorithm -t Used to specify the name of the topic
-h Print help information
-n Declare the service address of the nameServer, and the option format is ip:port
-i IpList, is separated by commas to calculate which message queues these ip unload topic
statsAll For printing topic subscription, TPS, cumulative amount, 24 hours read and write total, etc. -h Print help information
-n Declare the service address of the nameServer, and the option format is ip:port
-a Whether to print only active topic
-t Used to specify the name of the topic

Name Meaning Command option Explain
clusterList View cluster information, cluster, brokerName, brokerId, TPS, and so on -m Print more information (add print to # InTotalYest, #OutTotalYest, #InTotalToday ,#OutTotalToday)
-h Print help information
-n Service address used to specify nameServer and formatted as ip:port
-i Print interval,unit basis is seconds
clusterRT Send message to detect each broker RT of the cluster.the message send to ${BrokerName} Topic -a amount,total number per probe,RT = Total time/amount
-s Message size,unit basis is B
-c Which cluster to detect.
-p Whether to print the formatted log,split with "|", not printed by default
-h Print help information
-m Owned computer room for printing
-i The interval, in seconds, at which a message is sent.
-n Service address used to specify nameServer and formatted as ip:port
Name Meaning Command option Explain
updateBrokerConfig The configuration information used to update the broker and the contents of the Broker.conf file are modified -b Declare the address of the broker and format as ip:port
-c Specify the name of the cluster
-k the value of k
-v the value of value
-h Print help information
-n Service address used to specify nameServer and formatted as ip:port
brokerStatus For viewing broker related statistics and running status (almost all the information you want is inside) -b Declare the address of the broker and format as ip:port
-h Print help information
-n Service address used to specify nameServer and formatted as ip:port
brokerConsumeStats Get the consumption of each consumer in broker and return information such as consume Offset,broker Offset,diff,timestamp by message queue dimension -b Declare the address of the broker and format as ip:port
-t Configure the timeout of the request
-l Configure the diff threshold beyond which to print
-o Specifies whether the order topic, is typically false
-h Print help information
-n Service address used to specify nameServer and formatted as ip:port
getBrokerConfig Get configuration information for the broker -b Declare the address of the broker and format as ip:port
-n Service address used to specify nameServer and formatted as ip:port
wipeWritePerm Clear write permissions for broker from nameServer -b Declare the address of the broker and format as ip:port
-n Service address used to specify nameServer and formatted as ip:port
-h Print help information
cleanExpiredCQ Clean up expired consume Queue on broker,An expired queue may be generated if the number of columns is reduced manually -n Service address used to specify nameServer and formatted as ip:port
-h Print help information
-b Declare the address of the broker and format as ip:port
-c Used to specify the name of the cluster
cleanUnusedTopic Clean up unused topic on broker and release topic's consume Queue from memory,If the topic is removed manually, an unused topic will be generated -n Service address used to specify nameServer and formatted as ip:port
-h Print help information
-b Declare the address of the broker and format as ip:port
-c Used to specify the name of the cluster
sendMsgStatus Send a message to the broker and then return the send status and RT -n Service address used to specify nameServer and formatted as ip:port
-h Print help information
-b brokerName,note that this is not broker's address
-s Message size,the unit of account is B
-c Number of messages sent

Name Meaning Command option Explain
queryMsgById Query msg according to offsetMsgId. If you use open source console, you should use offsetMsgId. There are other parameters for this command. For details, please read QueryMsgByIdSubCommand. -i msgId
-h Print help information
-n Service address used to specify nameServer and formatted as ip:port
queryMsgByKey Query messages based on message Key -k msgKey
-t The name of the topic
-h Print help information
-n Service address used to specify nameServer and formatted as ip:port
queryMsgByOffset Query messages based on Offset -b The name of broker,(Note here: the name of broker is filled in, not the address of broker, and the broker name can be found in clusterList)
-i Queue id of the query
-o The value of offset
-t The name of the topic
-h Print help information
-n Service address used to specify nameServer and formatted as ip:port
queryMsgByUniqueKey According to the msgId query, msgId is different from offsetMsgId. The specific differences can be found in common operational and maintenance problems. "-g" option and "-d" option are to be used together, and when you find the message, try to get a particular consumer to consume the message and return the result of the consumption. -h Print help information
-n Service address used to specify nameServer and formatted as ip:port
-i uniqe msg id
-g consumerGroup
-d clientId
-t The name of the topic
checkMsgSendRT Detect RT to send a message to topic, function similar to clusterRT -h Print help information
-n Service address used to specify nameServer and formatted as ip:port
-t The name of the topic
-a the number of probes
-s The size of message
sendMessage Send a message that can be sent, as configured, to a particular message Queue, or to a normal send. -h Print help information
-n Service address used to specify nameServer and formatted as ip:port
-t The name of the topic
-p body,message body
-k keys
-c tags
-b brokerName
-i queueId
consumeMessage Consumer messages. You can consume messages based on offset, start timestamps, end timestamps, message queues, and configure different consumption logic for different execution, as detailed in ConsumeMessageCommand. -h Print help information
-n Service address used to specify nameServer and formatted as ip:port
-t The name of the topic
-b brokerName
-o Start consumption from offset
-i queueId
-g Group of consumers
-s Specify a start timestamp in a format see -h
-d Specify a end timestamp
-c Specify how many messages to consume
printMsg Consume messages from broker and print them, optional time periods -h Print help information
-n Service address used to specify nameServer and formatted as ip:port
-t The name of the topic
-c Character set,for example UTF-8
-s subExpress,filter expression
-b Specify a start timestamp in a format see -h
-e Specify the end timestamp
-d Whether to print the message body
printMsgByQueue Similar to printMsg, but specifying message queue -h Print help information
-n Service address used to specify nameServer and formatted as ip:port
-t The name of the topic
-i queueId
-a brokerName
-c Character set,for example UTF-8
-s subExpress,filter expression
-b Specify a start timestamp in a format see -h
-e Specify the end timestamp
-p Whether to print a message
-d Whether to print the message body
-f Whether to count the number of tags and print
resetOffsetByTime Reset both offset,broker and consumer by timestamp -h Print help information
-n Service address used to specify nameServer and formatted as ip:port
-g Group of consumers
-t The name of the topic
-s Resets the offset corresponding to this timestamp
-f Whether to force a reset, if set to false, only supports backtracking offset, if it is true, regardless of the relationship between offset and consume Offset with the timestamp
-c Whether to reset the C++ client offset

Name Meaning Command option Explain
consumerProgress To view the subscriber consumption status, you can see the amount of message accumulation for a specific client IP -g The group name of consumer
-s Whether to print client IP
-h Print help information
-n Service address used to specify nameServer and formatted as ip:port
consumerStatus See the consumer status, including whether the same subscription is in the same group, analyze whether the process queue is stacked, return the consumer jstack results, more content, and see ConsumerStatusSubCommand for the user -h Print help information
-n Service address used to specify nameServer and formatted as ip:port
-g consumer group
-i clientId
-s Whether to execute jstack
getConsumerStatus Get Consumer consumption progress -g the group name of consumer
-t Query topic
-i Ip address of consumer client
-n Service address used to specify nameServer and formatted as ip:port
-h Print help information
updateSubGroup Update or create a subscription -n Service address used to specify nameServer and formatted as ip:port
-h Print help information
-b the address of broker
-c The name of cluster
-g The group name of consumer
-s Whether the group is allowed to consume
-m Whether to start consumption from the minimum offset
-d Is it a broadcast mode
-q The Number of retry queues
-r Maximum number of retries
-i When the slaveReadEnable is on and which brokerId consumption is recommended for consumption from slave, the brokerid of slave, can be configured to consume from the slave actively
-w If broker recommends consumption from slave, configuration determines which slave consumption to consume from, and configure a specific brokerId, such as 1
-a Whether to notify other consumers of load balancing when the number of consumers changes
deleteSubGroup Remove subscriptions from broker -n Service address used to specify nameServer and formatted as ip:port
-h Print help information
-b the address of broker
-c The name of cluster
-g The group name of consumer
cloneGroupOffset Use the offset of the source group in the target group -n Service address used to specify nameServer and formatted as ip:port
-h Print help information
-s Source consumer group
-d Target consumer group
-t The name of topic
-o Not used yet

Name Meaning Command option Explain
consumerConnec tion Query the network connection of consumer -g The group name of consumer
-n Service address used to specify nameServer and formatted as ip:port
-h Print help information
producerConnec tion Query the network connection of producer -g the group name of producer
-t The name of topic
-n Service address used to specify nameServer and formatted as ip:port
-h Print help information

Name Meaning Command option Explain
updateKvConfig Update the kv configuration of nameServer, which is not currently used -s Specify a specific namespace
-k key
-v value
-n Service address used to specify nameServer and formatted as ip:port
-h Print help information
deleteKvConfig Delete the kv configuration of nameServer -s Specify a specific namespace
-k key
-n Service address used to specify nameServer and formatted as ip:port
-h Print help information
getNamesrvConfig Get the configuration of the nameServer -n Service address used to specify nameServer and formatted as ip:port
-h Print help information
updateNamesrvConfig Modifying the configuration of nameServer -n Service address used to specify nameServer and formatted as ip:port
-h Print help information
-k The value of key
-v The value of value

8 Other relevant command notes

Name Meaning Command option Explain
startMonitoring Used to start the monitoring process, monitor message deletion, retry queue messages, etc. -n Service address used to specify nameServer and formatted as ip:port
-h Print help information

Load Balancing

Load balancing in RocketMQ is accomplished on Client side. Specifically, it can be divided into load balancing at Producer side when sending messages and load balancing at Constumer side when subscribing messages.

1 Producer Load Balancing

When the Producer sends a message, it will first find the specified TopicPublishInfo according to Topic. After getting the routing information of TopicPublishInfo, the RocketMQ client will select a queue (MessageQueue) from the messageQueue List in TopicPublishInfo to send the message by default.Specific fault-tolerant strategies are defined in the MQFaultStrategy class.
Here is a sendLatencyFaultEnable switch variable, which, if turned on, filters out the Broker agent of not available on the basis of randomly gradually increasing modular arithmetic selection. The so-called “latencyFault Tolerance” refers to a certain period of time to avoid previous failures. For example, if the latency of the last request exceeds 550 Lms, it will evade 3000 Lms; if it exceeds 1000L, it will evade 60000 L; if it is closed, it will choose a queue (MessageQueue) to send messages by randomly gradually increasing modular arithmetic, and the latencyFault Tolerance mechanism is the key to achieve high availability of message sending.

2 Consumer Load Balancing

In RocketMQ, the two consumption modes (Push/Pull) on the Consumer side are both based on the pull mode to get the message, while in the Push mode it is only a kind of encapsulation of the pull mode, which is essentially implemented as the message pulling thread after pulling a batch of messages from the server. After submitting to the message consuming thread pool, it continues to try again to pull the message to the server. If the message is not pulled, the pull is delayed and continues. In both pull mode based consumption patterns (Push/Pull), the Consumer needs to know which message queue - queue from the Broker side to get the message. Therefore, it is necessary to do load balancing on the Consumer side, that is, which Consumer consumption is allocated to the same ConsumerGroup by more than one MessageQueue on the Broker side.

1, Heartbeat Packet Sending on Consumer side
After Consumer is started, it continuously sends heartbeat packets to all Broker instances in the RocketMQ cluster via timing task (which contains the message consumption group name, subscription relationship collection,Message communication mode and the value of the client id,etc). After receiving the heartbeat message from Consumer, Broker side maintains it in Consumer Manager’s local caching variable—consumerTable, At the same time, the encapsulated client network channel information is stored in the local caching variable—channelInfoTable, which can provide metadata information for the later load balancing of Consumer.
2,Core Class for Load Balancing on Consumer side—RebalanceImpl
Starting the MQClientInstance instance in the startup process of the Consumer instance will complete the start of the load balancing service thread-RebalanceService (executed every 20 s). By looking at the source code, we can find that the run () method of the RebalanceService thread calls the rebalanceByTopic () method of the RebalanceImpl class, which is the core of the Consumer end load balancing. Here, rebalanceByTopic () method will do different logical processing depending on whether the consumer communication type is “broadcast mode” or “cluster mode”. Here we mainly look at the main processing flow in cluster mode:
(1) Get the message consumption queue set (mqSet) under the Topic from the local cache variable—topicSubscribeInfoTable of the rebalanceImpl instance.
(2) Call mQClientFactory. findConsumerIdList () method to send a RPC communication request to Broker side to obtain the consumer Id list under the consumer group based on the parameters of topic and consumer group (consumer table constructed by Broker side based on the heartbeat data reported by the front consumer side responds and returns, business request code: GET_CONSUMER_LIST_BY_GROUP);
(3) First, the message consumption queue and the consumer Id under Topic are sorted, then the message queue to be pulled is calculated by using the message queue allocation strategy algorithm (default: the average allocation algorithm of the message queue). The average allocation algorithm here is similar to the paging algorithm. It ranks all MessageQueues like records. It ranks all consumers like pages. It calculates the average size of each page and the range of each page record. Finally, it traverses the whole range and calculates the records that the current consumer should allocate to (MessageQueue here).
Image text
(4) Then, the updateProcessQueueTableInRebalance () method is invoked, which first compares the allocated message queue set (mqSet) with processQueueTable for filtering.
Image text

  • The red part of the processQueueTable annotation in the figure above
    indicates that it is not included with the assigned message queue set
    mqSet. Set the Dropped attribute to true for these queues, and then
    check whether these queues can remove the processQueueTable cache
    variable or not. The removeUnnecessaryMessageQueue () method is
    executed here, that is, check every 1s to see if the locks of the
    current consumption processing queue can be retrieved and return true
    if they are retrieved. If the lock of the current consumer processing
    queue is still not available after waiting for 1s, it returns false.
    If true is returned, the corresponding Entry is removed from the
    processQueueTable cache variable.
  • The green section in processQueueTable above represents the
    intersection with the assigned message queue set mqSet. Determine
    whether the ProcessQueue has expired, regardless of Pull mode, if it
    is Push mode, set the Dropped attribute to true, and call the
    removeUnnecessaryMessageQueue () method to try to remove Entry as
    above;

Finally, a ProcessQueue object is created for each MessageQueue in the filtered message queue set (mqSet) and stored in the processQueueTable queue of RebalanceImpl (where the computePullFromWhere (MessageQueue mq) method of the RebalanceImpl instance is invoked to obtain the next progress consumption value offset of the MessageQueue object, which is then populated into the attribute of pullRequest object to be created next time.), and create pull request object—pullRequest to add to pull list—pullRequestList, and finally execute dispatchPullRequest () method. PullRequest object of Pull message is put into the blocking queue pullRequestQueue of PullMessageService service thread in turn, and the request of Pull message is sent to Broker end after the service thread takes out. Among them, we can focus on the contrast, RebalancePushImpl and RebalancePullImpl two implementation classes dispatchPullRequest () method is different, the method in RebalancePullImpl class is empty, thus answering the last question in the previous article.

The core design idea of message consumption queue is that a message consumption queue can only be consumed by one consumer in the same consumer group at the same time, and a message consumer can consume multiple message queues at the same time.

Message Filter

RocketMQ - a distributed message queue, is different with all other MQ middleware, on the way of filtering messages. It’s do the filter when the messages are subscribed via consumer side.RocketMQ do it lies in the separate storage mechanism that Producer side writing messages and Consomer subscribe messages, Consumer side will get an index from a logical message queue ConsumeQueue when subscribing, then read message entity from CommitLog using the index. So in the end, it is still impossible to get around its storage structure.The storage structure of ConsumeQueue is as follows, and there is a 8-byte Message Tag hashcode, The message filter based on Tag value is just used this Message Tag hash-code.

The RocketMQ has two mainly filter types:

  • Tag filtering: Consumer can specify not only the message topic but also the message tag values, when subscribing. Multiple tag values need to be separated by ‘||’. When consumer subscribing a message, it builds the subscription request into a SubscriptionData object and sends a pull message request to the Broker side. Before the Broker side reads data from the RocketMQ file storage layer - Store, it will construct a MessageFilter using the SubscriptionData object and then pass it to the Store. Store get a record from ConsumeQueue, and it will filter the message by the saved tag hashcode, it is unable to filter the messages exactly in the server side because of only the hashcode will be used when filtering, Therefore, after the Consumer pulls the message, it also needs to compare the original tag string of the message. If the original tag string is not same with the expected, the message will be ignored.

  • SQL92 filtering: This filter behavior is almost same with the above Tag filtering method. The only difference is on the way how Store works. The rocketmq-filter module is responsible for the construction and execution of the real SQL expression. Executing an SQL expression every time a filter is executed affects efficiency, so RocketMQ uses BloomFilter to avoid doing it every time. The expression context of SQL92 is a property of the message.

Basic Concept

1 Message Model

RocketMQ message model is mainly composed of Producer, Broker and Consumer. The producer is responsible for producing messages and the consumer is for consuming messages, while the broker stores messages.
The broker is an independent server during actual deployment, and each broker can store messages from multiple topics. Even messages from the same topic can be stored in the different brokers by sharding strategy.
The message queue is used to store physical offsets of messages, and the message addresses are stored in seperate queues. The consumer group consists of multiple consumer instances.

2 Producer

The Producer is responsible for producing messages, typically by business systems. It sends messages generated by the systems to brokers. RocketMQ provides multiple paradigms of sending: synchronous, asynchronous, sequential and one-way. Both synchronous and asynchronous methods require the confirmation information return from the Broker, but one-way method does not require it.

3 Consumer

The Consumer is responsible for consuming messages, typically the background system is responsible for asynchronous consumption. The consumer pulls messages from brokers and feeds them into application. From the perspective of user, two types of consumers are provided: pull consumer and push consumer.

4 Topic

The Topic refers to a collection of one kind of message. Each topic contains several messages and one message can only belong to one topic. The topic is the basic unit of RocketMQ for message subscription.

5 Broker Server

As the role of the transfer station, the Broker Server stores and forwards messages. In RocketMQ, the broker server is responsible for receiving messages sent from producers, storing them and preparing to handle pull requests. It also stores the related message meta data, including consumer groups, consuming progress, topics, queues info and so on.

6 Name Server

The Name Server serves as the provider of routing service. The producer or the consumer can find the list of broker IP addresses for each topic through name server. Multiple name servers can be deployed in one cluster, but they are independent of each other and do not exchange information.

7 Pull Consumer

A type of Consumer, the application pulls messages from brokers by actively invoking the consumer pull message method, and the application has the advantages of controlling the timing and frequency of pulling messages. Once the batch of messages is pulled, user application will initiate consuming process.

8 Push Consumer

A type of Consumer, Under this high real-time performance mode, it will push the message to the consumer actively when the Broker receives the data.

9 Producer Group

A collection of the same type of Producer, which sends the same type of messages with consistent logic. If a transaction message is sent and the original producer crashes after sending, the broker server will contact other producers in the same producer group to commit or rollback the transactional message.

10 Consumer Group

A collection of the same type of Consumer, which consume the same type of messages with consistent logic. The consumer group makes load-balance and fault-tolerance super easy in terms of message consuming.
Warning: consumer instances of one consumer group must have exactly the same topic subscription(s).

RocketMQ supports two types of consumption mode:Clustering and Broadcasting.

11 Consumption Mode - Clustering

Under the Clustering mode, all the messages from one topic will be delivered to all the consumers instances averagely as much as possible. That is, one message can be consumed by only one consumer instance.

12 Consumption Mode - Broadcasting

Under the Broadcasting mode, each consumer instance of the same consumer group receives every message published to the corresponding topic.

13 Normal Ordered Message

Under the Normal Ordered Message mode, the messages received by consumers from the same ConsumeQueue are sequential, but the messages received from the different message queues may be non-sequential.

14 Strictly Ordered Message

Under the Strictly Ordered Message mode, all messages received by the consumers from the same topic are sequential as the order they are stored.

15 Message

The physical carrier of information transmitted by a messaging system, the smallest unit of production and consumption data, each message must belong to one topic.
Each Message in RocketMQ has a unique message id and can carry a key used to store business-related value. The system has the function to query messages by its id or key.

16 Tag

Flags set for messages to distinguish different types of messages under the same topic, functioning as a “sub-topic”. Messages from the same business unit can set different tags under the same topic in terms of different business purposes. The tag can effectively maintain the clarity and consistency of the code and optimize the query system provided by RocketMQ. The consumer can realize different “sub-topic” by using tag in order to achieve better expansibility.

Batch Message Sample


Sending messages in batch improves performance of delivering small messages. Messages of the same batch should have: same topic, same waitStoreMsgOK and no schedule support. You can send messages up to 4MiB at a time, but if you need to send a larger message, it is recommended to divide the larger messages into multiple small messages of no more than 1MiB.

1 Send Batch Messages

If you just send messages of no more than 4MiB at a time, it is easy to use batch:

1
2
3
4
5
6
7
8
9
10
11
String topic = "BatchTest";
List<Message> messages = new ArrayList<>();
messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes()));
try {
producer.send(messages);
} catch (Exception e) {
e.printStackTrace();
//handle the error
}

2 Split into Lists

The complexity only grow when you send large batch and you may not sure if it exceeds the size limit (4MiB). At this time, you’d better split the lists:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
public class ListSplitter implements Iterator<List<Message>> {
private final int SIZE_LIMIT = 1000 * 1000;
private final List<Message> messages;
private int currIndex;
public ListSplitter(List<Message> messages) {
this.messages = messages;
}
@Override public boolean hasNext() {
return currIndex < messages.size();
}
@Override public List<Message> next() {
int nextIndex = currIndex;
int totalSize = 0;
for (; nextIndex < messages.size(); nextIndex++) {
Message message = messages.get(nextIndex);
int tmpSize = message.getTopic().length() + message.getBody().length;
Map<String, String> properties = message.getProperties();
for (Map.Entry<String, String> entry : properties.entrySet()) {
tmpSize += entry.getKey().length() + entry.getValue().length();
}
tmpSize = tmpSize + 20; //for log overhead
if (tmpSize > SIZE_LIMIT) {
//it is unexpected that single message exceeds the SIZE_LIMIT
//here just let it go, otherwise it will block the splitting process
if (nextIndex - currIndex == 0) {
//if the next sublist has no element, add this one and then break, otherwise just break
nextIndex++;
}
break;
}
if (tmpSize + totalSize > SIZE_LIMIT) {
break;
} else {
totalSize += tmpSize;
}

}
List<Message> subList = messages.subList(currIndex, nextIndex);
currIndex = nextIndex;
return subList;
}
}

// then you could split the large list into small ones:
ListSplitter splitter = new ListSplitter(messages);
while (splitter.hasNext()) {
try {
List<Message> listItem = splitter.next();
producer.send(listItem);
} catch (Exception e) {
e.printStackTrace();
// handle the error
}
}

OpenMessaging Example

OpenMessaging, which includes the establishment of industry guidelines and messaging, streaming specifications to provide a common framework for finance, ecommerce, IoT and big-data area. The design principles are the cloud-oriented, simplicity, flexibility, and language independent in distributed heterogeneous environments. Conformance to these specifications will make it possible to develop a heterogeneous messaging applications across all major platforms and operating systems.

RocketMQ provides a partial implementation of OpenMessaging 0.1.0-alpha, the following examples demonstrate how to access RocketMQ based on OpenMessaging.

OMSProducer

The following example shows how to send message to RocketMQ broker in synchronous, asynchronous, or one-way transmissions.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class OMSProducer {
public static void main(String[] args) {
final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
.getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");

final Producer producer = messagingAccessPoint.createProducer();

messagingAccessPoint.startup();
System.out.printf("MessagingAccessPoint startup OK%n");

producer.startup();
System.out.printf("Producer startup OK%n");

{
Message message = producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8")));
SendResult sendResult = producer.send(message);
System.out.printf("Send sync message OK, msgId: %s%n", sendResult.messageId());
}

{
final Promise<SendResult> result = producer.sendAsync(producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
result.addListener(new PromiseListener<SendResult>() {
@Override
public void operationCompleted(Promise<SendResult> promise) {
System.out.printf("Send async message OK, msgId: %s%n", promise.get().messageId());
}

@Override
public void operationFailed(Promise<SendResult> promise) {
System.out.printf("Send async message Failed, error: %s%n", promise.getThrowable().getMessage());
}
});
}

{
producer.sendOneway(producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
System.out.printf("Send oneway message OK%n");
}

producer.shutdown();
messagingAccessPoint.shutdown();
}
}

OMSPullConsumer

Use OMS PullConsumer to poll messages from a specified queue.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class OMSPullConsumer {
public static void main(String[] args) {
final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
.getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");

final PullConsumer consumer = messagingAccessPoint.createPullConsumer("OMS_HELLO_TOPIC",
OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "OMS_CONSUMER"));

messagingAccessPoint.startup();
System.out.printf("MessagingAccessPoint startup OK%n");

consumer.startup();
System.out.printf("Consumer startup OK%n");

Message message = consumer.poll();
if (message != null) {
String msgId = message.headers().getString(MessageHeader.MESSAGE_ID);
System.out.printf("Received one message: %s%n", msgId);
consumer.ack(msgId);
}

consumer.shutdown();
messagingAccessPoint.shutdown();
}
}

OMSPushConsumer

Attaches OMS PushConsumer to a specified queue and consumes messages by MessageListener

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class OMSPushConsumer {
public static void main(String[] args) {
final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
.getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");

final PushConsumer consumer = messagingAccessPoint.
createPushConsumer(OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "OMS_CONSUMER"));

messagingAccessPoint.startup();
System.out.printf("MessagingAccessPoint startup OK%n");

Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
consumer.shutdown();
messagingAccessPoint.shutdown();
}
}));

consumer.attachQueue("OMS_HELLO_TOPIC", new MessageListener() {
@Override
public void onMessage(final Message message, final ReceivedMessageContext context) {
System.out.printf("Received one message: %s%n", message.headers().getString(MessageHeader.MESSAGE_ID));
context.ack();
}
});

}
}

Filter Example


In most cases, tag is a simple and useful design to select messages you want. For example:

1
2
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_EXAMPLE");
consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");

The consumer will recieve messages that contains TAGA or TAGB or TAGC. But the limitation is that one message only can have one tag, and this may not work for sophisticated scenarios. In this case, you can use SQL expression to filter out messages.
SQL feature could do some calculation through the properties you put in when sending messages. Under the grammars defined by RocketMQ, you can implement some interesting logic. Here is an example:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
------------
| message |
|----------| a > 5 AND b = 'abc'
| a = 10 | --------------------> Gotten
| b = 'abc'|
| c = true |
------------
------------
| message |
|----------| a > 5 AND b = 'abc'
| a = 1 | --------------------> Missed
| b = 'abc'|
| c = true |
------------

1 Grammars

RocketMQ only defines some basic grammars to support this feature. You could also extend it easily.

  • Numeric comparison, like >, >=, <, <=, BETWEEN, =;
  • Character comparison, like =, <>, IN;
  • IS NULL or IS NOT NULL;
  • Logical AND, OR, NOT;

Constant types are:

  • Numeric, like 123, 3.1415;
  • Character, like ‘abc’, must be made with single quotes;
  • NULL, special constant;
  • Boolean, TRUE or FALSE;

2 Usage constraints

Only push consumer could select messages by SQL92. The interface is:

1
public void subscribe(finalString topic, final MessageSelector messageSelector)

3 Producer example

You can put properties in message through method putUserProperty when sending.

1
2
3
4
5
6
7
8
9
10
11
12
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.start();
Message msg = new Message("TopicTest",
tag,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
// Set some properties.
msg.putUserProperty("a", String.valueOf(i));
SendResult sendResult = producer.send(msg);

producer.shutdown();

4 Consumer example

Use MessageSelector.bySql to select messages through SQL when consuming.

1
2
3
4
5
6
7
8
9
10
11
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
// only subsribe messages have property a, also a >=0 and a <= 3
consumer.subscribe("TopicTest", MessageSelector.bySql("a between 0 and 3");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();

Basic Sample


Two functions below are provided in the basic sample:

  • The RocketMQ can be utilized to send messages in three ways: reliable synchronous, reliable asynchronous, and one-way transmission. The first two message types are reliable because there is a response whether they were sent successfully.
  • The RocketMQ can be utilized to consume messages.

1 Add Dependency

maven:

1
2
3
4
5
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.3.0</version>
</dependency>

gradle:

1
compile 'org.apache.rocketmq:rocketmq-client:4.3.0'

2 Send Messages

2.1 Use Producer to Send Synchronous Messages

Reliable synchronous transmission is used in extensive scenes, such as important notification messages, SMS notification.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class SyncProducer {
public static void main(String[] args) throws Exception {
// Instantiate with a producer group name
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// Specify name server addresses
producer.setNamesrvAddr("localhost:9876");
// Launch the producer instance
producer.start();
for (int i = 0; i < 100; i++) {
// Create a message instance with specifying topic, tag and message body
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
// Send message to one of brokers
SendResult sendResult = producer.send(msg);
// Check whether the message has been delivered by the callback of sendResult
System.out.printf("%s%n", sendResult);
}
// Shut down once the producer instance is not longer in use
producer.shutdown();
}
}
2.2 Send Asynchronous Messages

Asynchronous transmission is generally used in response time sensitive business scenarios. It means that it is unable for the sender to wait the response of the Broker too long.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class AsyncProducer {
public static void main(String[] args) throws Exception {
// Instantiate with a producer group name
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// Specify name server addresses
producer.setNamesrvAddr("localhost:9876");
// Launch the producer instance
producer.start();
producer.setRetryTimesWhenSendAsyncFailed(0);
for (int i = 0; i < 100; i++) {
final int index = i;
// Create a message instance with specifying topic, tag and message body
Message msg = new Message("TopicTest",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
// SendCallback: receive the callback of the asynchronous return result.
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("%-10d OK %s %n", index,
sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
System.out.printf("%-10d Exception %s %n", index, e);
e.printStackTrace();
}
});
}
// Shut down once the producer instance is not longer in use
producer.shutdown();
}
}
2.3 Send Messages in One-way Mode

One-way transmission is used for cases requiring moderate reliability, such as log collection.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class OnewayProducer {
public static void main(String[] args) throws Exception{
// Instantiate with a producer group name
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// Specify name server addresses
producer.setNamesrvAddr("localhost:9876");
// Launch the producer instance
producer.start();
for (int i = 0; i < 100; i++) {
// Create a message instance with specifying topic, tag and message body
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
// Send in one-way mode, no return result
producer.sendOneway(msg);
}
// Shut down once the producer instance is not longer in use
producer.shutdown();
}
}

3 Consume Messages

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
// Instantiate with specified consumer group name
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");

// Specify name server addresses
consumer.setNamesrvAddr("localhost:9876");

// Subscribe one or more topics and tags for finding those messages need to be consumed
consumer.subscribe("TopicTest", "*");
// Register callback to execute on arrival of messages fetched from brokers
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
// Mark the message that have been consumed successfully
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// Launch the consumer instance
consumer.start();
System.out.printf("Consumer Started.%n");
}
}

Transaction Message Example

1 Transaction message status

There are three states for transaction message:

  • TransactionStatus.CommitTransaction: commit transaction, it means that allow consumers to consume this message.
  • TransactionStatus.RollbackTransaction: rollback transaction, it means that the message will be deleted and not allowed to consume.
  • TransactionStatus.Unknown: intermediate state, it means that MQ is needed to check back to determine the status.

2 Send transactional message example

2.1 Create the transactional producer

Use TransactionMQProducerclass to create producer client, and specify a unique ProducerGroup, and you can set up a custom thread pool to process check requests. After executing the local transaction, you need to reply to MQ according to the execution result, and the reply status is described in the above section.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class TransactionProducer {
public static void main(String[] args) throws MQClientException, InterruptedException {
TransactionListener transactionListener = new TransactionListenerImpl();
TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
producer.setExecutorService(executorService);
producer.setTransactionListener(transactionListener);
producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 10; i++) {
try {
Message msg =
new Message("TopicTest1234", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.printf("%s%n", sendResult);
Thread.sleep(10);
} catch (MQClientException | UnsupportedEncodingException e) {
e.printStackTrace();
}
}
for (int i = 0; i < 100000; i++) {
Thread.sleep(1000);
}
producer.shutdown();
}
}

2.2 Implement the TransactionListener interface

The executeLocalTransaction method is used to execute local transaction when send half message succeed. It returns one of three transaction status mentioned in the previous section.

The checkLocalTransaction method is used to check the local transaction status and respond to MQ check requests. It also returns one of three transaction status mentioned in the previous section.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class TransactionListenerImpl implements TransactionListener {
private AtomicInteger transactionIndex = new AtomicInteger(0);
private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
int value = transactionIndex.getAndIncrement();
int status = value % 3;
localTrans.put(msg.getTransactionId(), status);
return LocalTransactionState.UNKNOW;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
Integer status = localTrans.get(msg.getTransactionId());
if (null != status) {
switch (status) {
case 0:
return LocalTransactionState.UNKNOW;
case 1:
return LocalTransactionState.COMMIT_MESSAGE;
case 2:
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
return LocalTransactionState.COMMIT_MESSAGE;
}
}

3 Usage Constraint

  1. Messages of the transactional have no schedule and batch support.
  2. In order to avoid a single message being checked too many times and lead to half queue message accumulation, we limited the number of checks for a single message to 15 times by default, but users can change this limit by change the transactionCheckMax parameter in the configuration of the broker, if one message has been checked over transactionCheckMax times, broker will discard this message and print an error log at the same time by default. Users can change this behavior by override the AbstractTransactionCheckListener class.
  3. A transactional message will be checked after a certain period of time that determined by parameter transactionTimeout in the configuration of the broker. And users also can change this limit by set user property “CHECK_IMMUNITY_TIME_IN_SECONDS” when sending transactional message, this parameter takes precedence over the “transactionMsgTimeout” parameter.
  4. A transactional message maybe checked or consumed more than once.
  5. Committed message reput to the user’s target topic may fail. Currently, it depends on the log record. High availability is ensured by the high availability mechanism of RocketMQ itself. If you want to ensure that the transactional message isn’t lost and the transaction integrity is guaranteed, it is recommended to use synchronous double write. mechanism.
  6. Producer IDs of transactional messages cannot be shared with producer IDs of other types of messages. Unlike other types of message, transactional messages allow backward queries. MQ Server query clients by their Producer IDs.