Talk is cheap,show me the code

开源,协作,共享,进步

0%

Example for Ordered Messages

RocketMQ provides ordered messages using FIFO order. All related messages need to be sent into the same message queue in an orderly manner.

The following demonstrates ordered messages by ensuring order of create, pay, send and finish steps of sales order process.

1 produce ordered messages

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
package org.apache.rocketmq.example.order2

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;

import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;

/*
* ordered messages producer
*/
public class Producer {

public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
String[] tags = new String[]{"TagA", "TagC", "TagD"};
// sales orders list
List<OrderStep> orderList = new Producer().buildOrders();

Date date = new Date();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String dateStr = sdf.format(date);

for (int i = 0; i < 10; i++) {
// generate message timestamp
String body = dateStr + " Hello RocketMQ " + orderList.get(i);
Message msg = new Message("TopicTest", tags[i % tags.length], "KEY" + i, body.getBytes());

SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Long id = (Long) arg; //message queue is selected by #salesOrderID
long index = id % mqs.size();
return mqs.get((int) index);
}
}, orderList.get(i).getOrderId());

System.out.println(String.format("SendResult status:%s, queueId:%d, body:%s",
sendResult.getSendStatus(),
sendResult.getMessageQueue().getQueueId(),
body));
}

producer.shutdown();
}

/**
* each sales order step
*/
private static class OrderStep {
private long orderId;
private String desc;

public long getOrderId() {
return orderId;
}

public void setOrderId(long orderId) {
this.orderId = orderId;
}

public String getDesc() {
return desc;
}

public void setDesc(String desc) {
this.desc = desc;
}

@Override
public String toString() {
return "OrderStep{" +
"orderId=" + orderId +
", desc='" + desc + '\'' +
'}';
}
}

/**
* to generate ten OrderStep objects for three sales orders:
* #SalesOrder "15103111039L": create, pay, send, finish;
* #SalesOrder "15103111065L": create, pay, finish;
* #SalesOrder "15103117235L": create, pay, finish;
*/
private List<OrderStep> buildOrders() {

List<OrderStep> orderList = new ArrayList<OrderStep>();

//create sales order with orderid="15103111039L"
OrderStep orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("create");
orderList.add(orderDemo);

//create sales order with orderid="15103111065L"
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111065L);
orderDemo.setDesc("create");
orderList.add(orderDemo);

//pay sales order #"15103111039L"
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("pay");
orderList.add(orderDemo);

//create sales order with orderid="15103117235L"
orderDemo = new OrderStep();
orderDemo.setOrderId(15103117235L);
orderDemo.setDesc("create");
orderList.add(orderDemo);

//pay sales order #"15103111065L"
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111065L);
orderDemo.setDesc("pay");
orderList.add(orderDemo);

//pay sales order #"15103117235L"
orderDemo = new OrderStep();
orderDemo.setOrderId(15103117235L);
orderDemo.setDesc("pay");
orderList.add(orderDemo);

//mark sales order #"15103111065L" as "finish"
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111065L);
orderDemo.setDesc("finish");
orderList.add(orderDemo);

//mark mark sales order #"15103111039L" as "send"
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("send");
orderList.add(orderDemo);

////mark sales order #"15103117235L" as "finish"
orderDemo = new OrderStep();
orderDemo.setOrderId(15103117235L);
orderDemo.setDesc("finish");
orderList.add(orderDemo);

//mark sales order #"15103111039L" as "finish"
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("finish");
orderList.add(orderDemo);

return orderList;
}
}

2 Consume ordered messages

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57

package org.apache.rocketmq.example.order2;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;

/**
* consume messages in order
*/
public class ConsumerInOrder {

public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");
consumer.setNamesrvAddr("127.0.0.1:9876");
/**
* when the consumer is first run, the start point of message queue where it can get messages will be set.
* or if it is restarted, it will continue from the last place to get messages.
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

consumer.subscribe("TopicTest", "TagA || TagC || TagD");

consumer.registerMessageListener(new MessageListenerOrderly() {

Random random = new Random();

@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
context.setAutoCommit(true);
for (MessageExt msg : msgs) {
// one consumer for each message queue, and messages order are kept in a single message queue.
System.out.println("consumeThread=" + Thread.currentThread().getName() + "queueId=" + msg.getQueueId() + ", content:" + new String(msg.getBody()));
}

try {
TimeUnit.SECONDS.sleep(random.nextInt(10));
} catch (Exception e) {
e.printStackTrace();
}
return ConsumeOrderlyStatus.SUCCESS;
}
});

consumer.start();

System.out.println("Consumer Started.");
}
}

Frequently Asked Questions

The following questions are frequently asked with regard to the RocketMQ project in general.

1 General

  1. Why did we create rocketmq project instead of selecting other products?

    Please refer to Why RocketMQ

  2. Do I have to install other softeware, such as zookeeper, to use RocketMQ?

    No. RocketMQ can run independently.

2 Usage

1. Where does the newly created Consumer ID start consuming messages?

&#8195;1) If the topic sends a message within three days, then the consumer start consuming messages from the first message saved in the server.

&#8195;2) If the topic sends a message three days ago, the consumer starts to consume messages from the latest message in the server, in other words, starting from the tail of message queue.

&#8195;3) If such consumer is rebooted, then it starts to consume messages from the last consumption location.

2. How to reconsume message when consumption fails?

&#8195;1) Cluster consumption pattern, The consumer business logic code returns Action.ReconsumerLater, NULL, or throws an exception, if a message failed to be consumed, it will retry for up to 16 times, after that, the message would be descarded.

&#8195;2) Broadcast consumption patternThe broadcaset consumption still ensures that a message is consumered at least once, but no resend option is provided.

3. How to query the failed message if there is a consumption failure?

&#8195;1) Using topic query by time, you can query messages within a period of time.

&#8195;2) Using Topic and Message Id to accurately query the message.

&#8195;3) Using Topic and Message Key accurately query a class of messages with the same Message Key.

4. Are messages delivered exactly once?

RocketMQ ensures that all messages are delivered at least once. In most cases, the messages are not repeated.

5. How to add a new broker?

&#8195;1) Start up a new broker and register it to the same list of name servers.

&#8195;2) By default, only internal system topics and consumer groups are created automatically. If you would like to have your business topic and consumer groups on the new node, please replicate them from the existing broker. Admin tool and command lines are provided to handle this.

The following answers are all default values and can be modified by configuration.

1. How long are the messages saved on the server?

Stored messages will be saved for up to 3 days, and messages that are not consumed for more than 3 days will be deleted.

2. What is the size limit for message Body?

Generally 256KB.

3. How to set the number of consumer threads?

When you start Consumer, set a ConsumeThreadNums property, example is as follows:

1
2
consumer.setConsumeThreadMin(20);
consumer.setConsumeThreadMax(20);

4 Errors

1. If you start a producer or consumer failed and the error message is producer group or consumer repeat.

Reason:Using the same Producer /Consumer Group to launch multiple instances of Producer/Consumer in the same JVM may cause the client fail to start.

Solution: Make sure that a JVM corresponding to one Producer /Consumer Group starts only with one Producer/Consumer instance.

2. Consumer failed to start loading json file in broadcast mode.

Reason: Fastjson version is too low to allow the broadcast consumer to load local offsets.json, causing the consumer boot failure. Damaged fastjson file can also cause the same problem.

Solution: Fastjson version has to be upgraded to rocketmq client dependent version to ensure that the local offsets.json can be loaded. By default offsets.json file is in /home/{user}/.rocketmq_offsets. Or check the integrity of fastjson.

3. What is the impact of a broker crash.

&#8195;1) Master crashes

Messages can no longer be sent to this broker set, but if you have another broker set available, messages can still be sent given the topic is present. Messages can still be consumed from slaves.

&#8195;2) Some slave crash

As long as there is another working slave, there will be no impact on sending messages. There will also be no impact on consuming messages except when the consumer group is set to consume from this slave preferably. By default, comsumer group consumes from master.

&#8195;3) All slaves crash

There will be no impact on sending messages to master, but, if the master is SYNC_MASTER, producer will get a SLAVE_NOT_AVAILABLE indicating that the message is not sent to any slaves. There will also be no impact on consuming messages except that if the consumer group is set to consume from slave preferably. By default, comsumer group consumes from master.

4. Producer complains “No Topic Route Info”, how to diagnose?

This happens when you are trying to send messages to a topic whose routing info is not available to the producer.

&#8195;1) Make sure that the producer can connect to a name server and is capable of fetching routing meta info from it.

&#8195;2) Make sure that name servers do contain routing meta info of the topic. You may query the routing meta info from name server through topicRoute using admin tools or web console.

&#8195;3) Make sure that your brokers are sending heartbeats to the same list of name servers your producer is connecting to.

&#8195;4) Make sure that the topic’s permssion is 6(rw-), or at least 2(-w-).

If you can’t find this topic, create it on a broker via admin tools command updateTopic or web console.

Consumer


1 Consumption process idempotent

RocketMQ cannot avoid Exactly-Once, so if the business is very sensitive to consumption repetition, it is important to perform deduplication at the business level. Deduplication can be done with a relational database. First, you need to determine the unique key of the message, which can be either msgId or a unique identifier field in the message content, such as the order Id. Determine if a unique key exists in the relational database before consumption. If it does not exist, insert it and consume it, otherwise skip it. (The actual process should consider the atomic problem, determine whether there is an attempt to insert, if the primary key conflicts, the insertion fails, skip directly)

2 Slow message processing

2.1 Increase consumption parallelism

Most messages consumption behaviors are IO-intensive, That is, it may be to operate the database, or call RPC. The consumption speed of such consumption behavior lies in the throughput of the back-end database or the external system. By increasing the consumption parallelism, the total consumption throughput can be increased, but the degree of parallelism is increased to a certain extent. Instead it will fall.Therefore, the application must set a reasonable degree of parallelism. There are several ways to modify the degree of parallelism of consumption as follows:

  • Under the same ConsumerGroup, increase the degree of parallelism by increasing the number of Consumer instances (note that the Consumer instance that exceeds the number of subscription queues is invalid). Can be done by adding machines, or by starting multiple processes on an existing machine.
  • Improve the consumption parallel thread of a single Consumer by modifying the parameters consumeThreadMin and consumeThreadMax.

2.2 Batch mode consumption

Some business processes can increase consumption throughput to a large extent if they support batch mode consumption. For example, order deduction application, it takes 1s to process one order at a time, and it takes only 2s to process 10 orders at a time. In this way, the throughput of consumption can be greatly improved. By setting the consumer’s consumeMessageBatchMaxSize to return a parameter, the default is 1, that is, only one message is consumed at a time, for example, set to N, then the number of messages consumed each time is less than or equal to N.

2.3 Skip non-critical messages

When a message is accumulated, if the consumption speed cannot keep up with the transmission speed, if the service does not require high data, you can choose to discard the unimportant message. For example, when the number of messages in a queue is more than 100,000 , try to discard some or all of the messages, so that you can quickly catch up with the speed of sending messages. The sample code is as follows:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs,
ConsumeConcurrentlyContext context){
long offest = msgs.get(0).getQueueOffset();
String maxOffset =
msgs.get(0).getProperty(Message.PROPERTY_MAX_OFFSET);
long diff = Long.parseLong(maxOffset) - offset;
if(diff > 100000){
//TODO Special handling of message accumulation
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
//TODO Normal consumption process
return ConcumeConcurrentlyStatus.CONSUME_SUCCESS;
}

2.4 Optimize each message consumption process

For example, the consumption process of a message is as follows:

  • Query from DB according to the message [data 1]
  • Query from DB according to the message [data 2]
  • Complex business calculations
  • Insert [Data 3] into the DB
  • Insert [Data 4] into the DB

There are 4 interactions with the DB in the consumption process of this message. If it is calculated by 5ms each time, it takes a total of 20ms. If the business calculation takes 5ms, then the total time is 25ms, So if you can optimize 4 DB interactions to 2 times, the total time can be optimized to 15ms, which means the overall performance is increased by 40%. Therefore, if the application is sensitive to delay, the DB can be deployed on the SSD hard disk. Compared with the SCSI disk, the former RT will be much smaller.

3 Print Log

If the amount of messages is small, it is recommended to print the message in the consumption entry method, consume time, etc., to facilitate subsequent troubleshooting.

1
2
3
4
5
6
7
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs,
ConsumeConcurrentlyContext context){
log.info("RECEIVE_MSG_BEGIN: " + msgs.toString());
//TODO Normal consumption process
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}

If you can print the time spent on each message, it will be more convenient when troubleshooting online problems such as slow consumption.

4 Other consumption suggestions

4.1、Consumer Group and Subscriptions

The first thing you should be aware of is that different Consumer Group can consume the same topic independently, and each of them will have their own consuming offsets. Please make sure each Consumer within the same Group to subscribe the same topics.

4.2、Orderly

The Consumer will lock each MessageQueue to make sure it is consumed one by one in order. This will cause a performance loss, but it is useful when you care about the order of the messages. It is not recommended to throw exceptions, you can return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT instead.

4.3、Concurrently

As the name tells, the Consumer will consume the messages concurrently. It is recommended to use this for good performance. It is not recommended to throw exceptions, you can return ConsumeConcurrentlyStatus.RECONSUME_LATER instead.

4.4、Consume Status

For MessageListenerConcurrently, you can return RECONSUME_LATER to tell the consumer that you can not consume it right now and want to reconsume it later. Then you can continue to consume other messages. For MessageListenerOrderly, because you care about the order, you can not jump over the message, but you can return SUSPEND_CURRENT_QUEUE_A_MOMENT to tell the consumer to wait for a moment.

4.5、Blocking

It is not recommend to block the Listener, because it will block the thread pool, and eventually may stop the consuming process.

4.6、Thread Number

The consumer use a ThreadPoolExecutor to process consuming internally, so you can change it by setting setConsumeThreadMin or setConsumeThreadMax.

4.7、ConsumeFromWhere

When a new Consumer Group is established, it will need to decide whether it needs to consume the historical messages which had already existed in the Broker. CONSUME_FROM_LAST_OFFSET will ignore the historical messages, and consume anything produced after that. CONSUME_FROM_FIRST_OFFSET will consume every message existed in the Broker. You can also use CONSUME_FROM_TIMESTAMP to consume messages produced after the specified timestamp.

Schedule example

1 Start consumer to wait for incoming subscribed messages

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;

public class ScheduledMessageConsumer {

public static void main(String[] args) throws Exception {
// Instantiate message consumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ExampleConsumer");
// Subscribe topics
consumer.subscribe("TestTopic", "*");
// Register message listener
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
for (MessageExt message : messages) {
// Print approximate delay time period
System.out.println("Receive message[msgId=" + message.getMsgId() + "] "
+ (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later");
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// Launch consumer
consumer.start();
}
}

2 Send scheduled messages

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;

public class ScheduledMessageProducer {

public static void main(String[] args) throws Exception {
// Instantiate a producer to send scheduled messages
DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
// Launch producer
producer.start();
int totalMessagesToSend = 100;
for (int i = 0; i < totalMessagesToSend; i++) {
Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
// This message will be delivered to consumer 10 seconds later.
message.setDelayTimeLevel(3);
// Send the message
producer.send(message);
}

// Shutdown producer after use.
producer.shutdown();
}

}

3 Verification

You should see messages are consumed about 10 seconds later than their storing time.

4 Use scenarios for scheduled messages

For example, in e-commerce, if an order is submitted, a delay message can be sent, and the status of the order can be checked after 1 hour. If the order is still unpaid, the order can be cancelled and the inventory released.

5 Restrictions on the use of scheduled messages

1
2
3
// org/apache/rocketmq/store/config/MessageStoreConfig.java

private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

Nowadays RocketMq does not support any time delay. It needs to set several fixed delay levels, which correspond to level 1 to 18 from 1s to 2h. Message consumption failure will enter the delay message queue. Message sending time is related to the set delay level and the number of retries.

See SendMessageProcessor.java

Message Trace

1 Key Attributes of Message Trace Data

Producer Consumer Broker
production instance information consumption instance information message Topic
send message time post time, post round message storage location
whether the message was sent successfully Whether the message was successfully consumed The Key of the message
Time spent sending Time spent consuming Tag of the message

2 Support for Message Trace Cluster Deployment

2.1 Broker Configuration Fille

The properties profile content of the Broker side enabled message trace feature is pasted here:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH
storePathRootDir=/data/rocketmq/rootdir-a-m
storePathCommitLog=/data/rocketmq/commitlog-a-m
autoCreateSubscriptionGroup=true
## if msg tracing is open,the flag will be true
traceTopicEnable=true
listenPort=10911
brokerIP1=XX.XX.XX.XX1
namesrvAddr=XX.XX.XX.XX:9876

2.2 Normal Mode

Each Broker node in the RocketMQ cluster is used to store message trace data collected and sent from the Client.Therefore, there are no requirements or restrictions on the number of Broker nodes in the RocketMQ cluster.

2.3 Physical IO Isolation Mode

For scenarios with large amount of trace message data , one of the Broker nodes in the RocketMQ cluster can be selected to store the trace message , so that the common message data of the user and the physical IO of the trace message data are completely isolated from each other.In this mode, there are at least two Broker nodes in the RockeMQ cluster, one of which is defined as the server on which message trace data is stored.

2.4 Start the Broker that Starts the MessageTrace

nohup sh mqbroker -c ../conf/2m-noslave/broker-a.properties &

3 Save the Topic Definition of Message Trace

RocketMQ’s message trace feature supports two ways to store trace data:

3.1 System-level TraceTopic

By default, message track data is stored in the system-level TraceTopic(names:RMQ_SYS_TRACE_TOPIC)。This Topic is automatically created when the Broker node is started(As described above, the switch variable traceTopicEnable needs to be set to true in the Broker configuration file)。

3.2 Custom TraceTopic

If the user is not prepared to store the message track data in the system-level default TraceTopic, you can also define and create a user-level Topic to save the track (that is, to create a regular Topic to save the message track data)。The following section introduces how the Client interface supports the user-defined TraceTopic.

4 Client Practices that Support Message Trace

In order to reduce as much as possible the transformation work of RocketMQ message trace feature used in the user service system, the author added a switch parameter (enableMsgTrace) to the original interface in the design to realize whether the message trace is opened or not.

4.1 Opening the Message Trace when Sending the Message

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName",true);
producer.setNamesrvAddr("XX.XX.XX.XX1");
producer.start();
try {
{
Message msg = new Message("TopicTest",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}

} catch (Exception e) {
e.printStackTrace();
}

4.2 Opening Message Trace whenSubscribing to a Message

1
2
3
4
5
6
7
8
9
10
11
12
13
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1",true);
consumer.subscribe("TopicTest", "*");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.setConsumeTimestamp("20181109221800");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");

4.3 Support for Custom Storage Message Trace Topic

The initialization of DefaultMQProducer and DefaultMQPushConsumer instances can be changed to support the custom storage message trace Topic as follows when sending and subscriving messages above.

1
2
3
4
5
6
##Where Topic_test11111 needs to be pre-created by the user to save the message trace;
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName",true,"Topic_test11111");
......

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1",true,"Topic_test11111");
......

Features

1 Subscribe and Publish

Message publication refers to that a producer sends messages to a topic; Message subscription means a consumer follows a topic with certain tags and then consumes data from that topic.

2 Message Ordering

Message ordering refers to that a group of messages can be consumed orderly as they are published. For example, an order generates three messages: order creation, order payment, and order completion. It only makes sense to consume them in their generated order, but orders can be consumed in parallel at the same time. RocketMQ can strictly guarantee these messages are in order.

Orderly message is divided into global orderly message and partitioned orderly message. Global order means that all messages under a certain topic must be in order, partitioned order only requires each group of messages are consumed orderly.

  • Global message ordering:
    For a given Topic, all messages are published and consumed in strict first-in-first-out (FIFO) order.
    Applicable scenario: the performance requirement is not high, and all messages are published and consumed according to FIFO principle strictly.
  • Partitioned message ordering:
    For a given Topic, all messages are partitioned according to sharding key. Messages within the same partition are published and consumed in strict FIFO order. Sharding key is the key field to distinguish message’s partition, which is a completely different concept from the key of ordinary messages.
    Applicable scenario: high performance requirement, with sharding key as the partition field, messages within the same partition are published and consumed according to FIFO principle strictly.

3 Message Filter

Consumers of RocketMQ can filter messages based on tags as well as supporting for user-defined attribute filtering. Message filter is currently implemented on the Broker side, with the advantage of reducing the network transmission of useless messages for Consumer and the disadvantage of increasing the burden on the Broker and relatively complex implementation.

4 Message Reliability

RocketMQ supports high reliability of messages in several situations:
1 Broker shutdown normally
2 Broker abnormal crash
3 OS Crash
4 The machine is out of power, but it can be recovered immediately
5 The machine cannot be started up (the CPU, motherboard, memory and other key equipment may be damaged)
6 Disk equipment damaged

In the four cases of 1), 2), 3), and 4) where the hardware resource can be recovered immediately, RocketMQ guarantees that the message will not be lost or a small amount of data will be lost (depending on whether the flush disk type is synchronous or asynchronous).

5 ) and 6) are single point of failure and cannot be recovered. Once it happens, all messages on the single point will be lost. In both cases, RocketMQ ensures that 99% of the messages are not lost through asynchronous replication, but a very few number of messages may still be lost. Synchronous double write mode can completely avoid single point of failure, which will surely affect the performance and suitable for the occasion of high demand for message reliability, such as money related applications. Note: RocketMQ supports synchronous double writes since version 3.0.

5 At Least Once

At least Once refers to that every message will be delivered at least once. RocketMQ supports this feature because the Consumer pulls the message locally and does not send an ack back to the server until it has consumed it.

6 Backtracking Consumption

Backtracking consumption refers to that the Consumer has consumed the message successfully, but the business needs to consume again. To support this function, the message still needs to be retained after the Broker sends the message to the Consumer successfully. The re-consumption is normally based on time dimension. For example, after the recovery of the Consumer system failured, the data one hour ago needs to be re-consumed, then the Broker needs to provide a mechanism to reverse the consumption progress according to the time dimension. RocketMQ supports backtracking consumption by time trace, with the time dimension down to milliseconds.

7 Transactional Message

RocketMQ transactional message refers to the fact that the application of a local transaction and the sending of a Message operation can be defined in a global transaction which means both succeed or failed simultaneously. RocketMQ transactional message provides distributed transaction functionality similar to X/Open XA, enabling the ultimate consistency of distributed transactions through transactional message.

8 Scheduled Message

Scheduled message(delay queue) refers to that messages are not consumed immediately after they are sent to the broker, but waiting to be delivered to the real topic after a specific time.
The broker has a configuration item, messageDelayLevel, with default values “1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”, 18 levels. Users can configure a custom messageDelayLevel. Note that messageDelayLevel is a broker’s property rather than a topic’s. When sending a message, just set the delayLevel level: msg.setDelayLevel(level). There are three types of levels:

  • level == 0, The message is not a delayed message
  • 1<=level<=maxLevel, Message delay specific time, such as level==1, delay for 1s
  • level > maxLevel, than level== maxLevel, such as level==20, delay for 2h

Scheduled messages are temporarily saved in a topic named SCHEDULE_TOPIC_XXXX, and saved in a specific queue according to delayTimeLevel, queueId = delayTimeLevel - 1, that is, only messages with the same delay are saved in a queue, ensuring that messages with the same sending delay can be consumed orderly. The broker consumes SCHEDULE_TOPIC_XXXX on schedule and writes messages to the real topic.

Note that Scheduled messages are counted both the first time they are written and the time they are scheduled to be written to the real topic, so both the send number and the TPS are increased.

9 Message Retry

When the Consumer fails to consume the message, a retry mechanism is needed to make the message to be consumed again. Consumer’s consume failure can usually be classified as follows:

  • Due to the reasons of the message itself, such as deserialization failure, the message data itself cannot be processed (for example, the phone number of the current message is cancelled and cannot be charged), etc. This kind of error usually requires skipping this message and consuming others since immediately retry would be failed 99%, so it is better to provide a timed retry mechanism that retries after 10 seconds.
  • Due to the reasons of dependent downstream application services are not available, such as db connection is not usable, perimeter network is not unreachable, etc. When this kind of error is encountered, consuming other messages will also result in an error even if the current failed message is skipped. In this case, it is recommended to sleep for 30s before consuming the next message, which will reduce the pressure on the broker to retry the message.

RocketMQ will set up a retry queue named “%RETRY%+consumerGroup” for each consumer group(Note that the retry queue for this topic is for consumer groups, not for each topic) to temporarily save messages cannot be consumed by customer due to all kinds of reasons. Considering that it takes some time for the exception to recover, multiple retry levels are set for the retry queue, and each retry level has a corresponding re-deliver delay. The more retries, the greater the deliver delay. RocketMQ first save retry messages to the delay queue which topic is named “SCHEDULE_TOPIC_XXXX”, then background schedule task will save the messages to “%RETRY%+consumerGroup” retry queue according to their corresponding delay.

10 Message Resend

When a producer sends a message, the synchronous message will be resent if fails, the asynchronous message will retry and oneway message is without any guarantee. Message resend ensures that messages are sent successfully and without lost as much as possible, but it can lead to message duplication, which is an unavoidable problem in RocketMQ. Under normal circumstances, message duplication will not occur, but when there is a large number of messages and network jitter, message duplication will be a high-probability event. In addition, producer initiative messages resend and the consumer load changes will also result in duplicate messages. The message retry policy can be set as follows:

  • retryTimesWhenSendFailed: Synchronous message retry times when send failed, default value is 2, so the producer will try to send retryTimesWhenSendFailed + 1 times at most. To ensure that the message is not lost, producer will try sending the message to another broker instead of selecting the broker that failed last time. An exception will be thrown if it reaches the retry limit, and the client should guarantee that the message will not be lost. Messages will resend when RemotingException, MQClientException, and partial MQBrokerException occur.
  • retryTimesWhenSendAsyncFailed: Asynchronous message retry times when send failed, asynchronous retry sends message to the same broker instead of selecting another one and does not guarantee that the message wont lost.
  • retryAnotherBrokerWhenNotStoreOK: Message flush disk (master or slave) timeout or slave not available (return status is not SEND_OK), whether to try to send to another broker, default value is false. Very important messages can set to true.

11 Flow Control

Producer flow control, because broker processing capacity reaches a bottleneck; Consumer flow control, because the consumption capacity reaches a bottleneck.

Producer flow control:

  • When commitLog file locked time exceeds osPageCacheBusyTimeOutMills, default value of osPageCacheBusyTimeOutMills is 1000 ms, then return flow control.
  • If transientStorePoolEnable == true, and the broker is asynchronous flush disk type, and resources are insufficient in the transientStorePool, reject the current send request and return flow control.
  • The broker checks the head request wait time of the send request queue every 10ms. If the wait time exceeds waitTimeMillsInSendQueue, which default value is 200ms, the current send request is rejected and the flow control is returned.
  • The broker implements flow control by rejecting send requests.

Consumer flow control:

  • When consumer local cache messages number exceeds pullThresholdForQueue, default value is 1000.
  • When consumer local cache messages size exceeds pullThresholdSizeForQueue, default value is 100MB.
  • When consumer local cache messages span exceeds consumeConcurrentlyMaxSpan, default value is 2000.

The result of consumer flow control is to reduce the pull frequency.

12 Dead Letter Queue

Dead letter queue is used to deal messages that cannot be consumed normally. When a message is consumed failed at first time, the message queue will automatically resend the message. If the consumption still fails after the maximum number retry, it indicates that the consumer cannot properly consume the message under normal circumstances. At this time, the message queue will not immediately abandon the message, but send it to the special queue corresponding to the consumer.

RocketMQ defines the messages that could not be consumed under normal circumstances as Dead-Letter Messages, and the special queue in which the Dead-Letter Messages are saved as Dead-Letter Queues. In RocketMQ, the consumer instance can consume again by resending messages in the Dead-Letter Queue using console.

Apache RocketMQ Developer Guide


This guide helps develpers understand and use Apache RocketMQ quickly.

1. Concepts & Features

  • Concept:introduce basic concepts in RocketMQ.

  • Feature:introduce functional features of RocketMQ’s implementations.

2. Architecture Design

  • Architecture:introduce RocketMQ’s deployment and technical architecture.

  • Design:introduce design concept of RocketMQ’s key mechanisms, including message storage, communication mechanisms, message filter, loadbalance, transaction message, etc.

3. Example

  • Example :introduce RocketMQ’s common usage, including basic example, sequence message example, delay message example, batch message example, filter message example, transaction message example, etc.

4. Best Practice

  • Best Practice:introduce RocketMQ’s best practice, including producer, consumer, broker, NameServer, configuration of client, and the best parameter configuration of JVM, linux.

  • Message Trace:introduce how to use RocketMQ’s message tracing feature.

  • Auth Management:introduce how to deployment quickly and how to use RocketMQ cluster enabling auth management feature.

  • Quick Start:introduce how to deploy Dledger quickly.

  • Cluster Deployment:introduce how to deploy Dledger in cluster.

5. Operation and maintenance management

  • Operation:introduce RocketMQ’s deployment modes that including single-master mode, multi-master mode, multi-master multi-slave mode and so on, as well as the usage of operation tool mqadmin.

6. API Reference(TODO)

3 Broker

3.1 Broker Role

Broker Role is ASYNC_MASTER, SYNC_MASTER or SLAVE. If you cannot tolerate message missing, we suggest you deploy SYNC_MASTER and attach a SLAVE to it. If you feel ok about missing, but you want the Broker to be always available, you may deploy ASYNC_MASTER with SLAVE. If you just want to make it easy, you may only need a ASYNC_MASTER without SLAVE.

3.2 FlushDiskType

ASYNC_FLUSH is recommended, for SYNC_FLUSH is expensive and will cause too much performance loss. If you want reliability, we recommend you use SYNC_MASTER with SLAVE.

3.3 Broker Configuration

Parameter name Default Description
listenPort 10911 listen port for client
namesrvAddr null name server address
brokerIP1 InetAddress for network interface Should be configured if having multiple addresses
brokerIP2 InetAddress for network interface If configured for the Master broker in the Master/Slave cluster, slave broker will connect to this port for data synchronization
brokerName null broker name
brokerClusterName DefaultCluster this broker belongs to which cluster
brokerId 0 broker id, 0 means master, positive integers mean slave
storePathCommitLog $HOME/store/commitlog/ file path for commit log
storePathConsumerQueue $HOME/store/consumequeue/ file path for consume queue
mappedFileSizeCommitLog 1024 * 1024 * 1024(1G) mapped file size for commit log
deleteWhen 04 When to delete the commitlog which is out of the reserve time
fileReserverdTime 72 The number of hours to keep a commitlog before deleting it
brokerRole ASYNC_MASTER SYNC_MASTER/ASYNC_MASTER/SLAVE
flushDiskType ASYNC_FLUSH {SYNC_FLUSH/ASYNC_FLUSH}. Broker of SYNC_FLUSH mode flushes each message onto disk before acknowledging producer. Broker of ASYNC_FLUSH mode, on the other hand, takes advantage of group-committing, achieving better performance.

Producer


1 Message Sending Tips
1.1 The Use of Tags

One application instance should use one topic as much as possible and the subtype of messages can be marked by tags. Tag provides extra flexibility to users. In the consume subscribing process, the messages filtering can only be handled by using tags when the tags are specified in the message sending process: message.setTags("TagA").

1.2 The Use of Keys

A business key can be set in one message and it will be easier to look up the message on a broker server to diagnose issues during development. Each message will be created index(hash index) by server, instance can query the content of this message by topic and key and who consumes the message.Because of the hash index, make sure that the key should be unique in order to avoid potential hash index conflict.

1
2
3
// Order Id
String orderId = "20034568923546";
message.setKeys(orderId);
1.3 The Log Print

When sending a message,no matter success or fail, a message log must be printed which contains SendResult and Key. It is assumed that we will always get SEND_OK if no exception is thrown. Below is a list of descriptions about each status:

  • SEND_OK

SEND_OK means sending message successfully. SEND_OK does not mean it is reliable. To make sure no message would be lost, you should also enable SYNC_MASTER or SYNC_FLUSH.

  • FLUSH_DISK_TIMEOUT

FLUSH_DISK_TIMEOUT means sending message successfully but the Broker flushing the disk with timeout. In this kind of condition, the Broker has saved this message in memory, this message will be lost only if the Broker was down. The FlushDiskType and SyncFlushTimeout could be specified in MessageStoreConfig. If the Broker set MessageStoreConfig’s FlushDiskType=SYNC_FLUSH(default is ASYNC_FLUSH), and the Broker doesn’t finish flushing the disk within MessageStoreConfig’s syncFlushTimeout(default is 5 secs), you will get this status.

  • FLUSH_SLAVE_TIMEOUT

FLUSH_SLAVE_TIMEOUT means sending messages successfully but the slave Broker does not finish synchronizing with the master. If the Broker’s role is SYNC_MASTER(default is ASYNC_MASTER), and the slave Broker doesn’t finish synchronizing with the master within the MessageStoreConfig’s syncFlushTimeout(default is 5 secs), you will get this status.

  • SLAVE_NOT_AVAILABLE

SLAVE_NOT_AVAILABLE means sending messages successfully but no slave Broker configured. If the Broker’s role is SYNC_MASTER(default is ASYNC_MASTER), but no slave Broker is configured, you will get this status.

2 Operations on Message Sending failed

The send method of Producer can be retried, the retry process is illustrated below:

  • The method will retry at most 2 times(2 times in synchronous mode, 0 times in asynchronous mode).
  • If sending failed, it will turn to the next Broker. This strategy will be executed when the total costing time is less then sendMsgTimeout(default is 10 seconds).
  • The retry method will be terminated if timeout exception was thrown when sending messages to Broker.

The strategy above could make sure message sending successfully to a certain degree. Some more retry strategies, such as we could try to save the message to database if calling the send synchronous method failed and then retry by background thread’s timed tasks, which will make sure the message is sent to Broker,could be improved if asking for high reliability business requirement.

The reasons why the retry strategy using database have not integrated by the RocketMQ client will be explained below: Firstly, the design mode of the RocketMQ client is stateless mode. It means that the client is designed to be horizontally scalable at each level and the consumption of the client to physical resources is only CPU, memory and network. Then, if a key-value memory module is integrated by the client itself, the Asyn-Saving strategy will be utilized in consideration of the high resource consumption of the Syn-Saving strategy. However, given that operations staff does not manage the client shutoff, some special commands, such as kill -9, may be used which will lead to the lost of message because of no saving in time. Furthermore, the physical resource running Producer is not appropriate to save some significant data because of low reliability. Above all, the retry process should be controlled by program itself.

3 Send Messages in One-way Mode

The message sending is usually a process like below:

  • Client sends request to sever.
  • Sever handles request
  • Sever returns response to client

The total costing time of sending one message is the sum of costing time of three steps above. Some situations demand that total costing time must be in a quite low level, however, do not take reliable performance into consideration, such as log collection. This kind of application could be called in one-way mode, which means client sends request but not wait for response. In this kind of mode, the cost of sending request is only a call of system operation which means one operation writing data to client socket buffer. Generally, the time cost of this process will be controlled n microseconds level.

Design

1 Message Store

1.1 The Architecure of Message Store

1.2 PageCache and Memory-Map(Mmap)

1.3 Message Flush

2 Communication Mechanism

2.1 The class diagram of Remoting module

2.2 The design of protocol and encode/decode

2.3 The three ways and process of message communication

2.4 The multi-thread design of Reactor

3 Message Filter

4 LoadBalancing

4.1 The loadBalance of Producer

4.2 The loadBalance of Consumer

5 Transactional Message

Apache RocketMQ supports distributed transactional message from version 4.3.0. RocketMQ implements transactional message by using the protocol of 2PC(two-phase commit), in addition adding a compensation logic to handle timeout-case or failure-case of commit-phase, as shown below.

5.1 The Process of RocketMQ Transactional Message

The picture above shows the overall architecture of transactional message, including the sending of message(commit-request phase), the sending of commit/rollback(commit phase) and the compensation process.

  1. The sending of message and Commit/Rollback.
    (1) Sending the message(named Half message in RocketMQ)
    (2) The server responds the writing result(success or failure) of Half message.
    (3) Handle local transaction according to the result(local transaction won’t be executed when the result is failure).
    (4) Sending Commit/Rollback to broker according to the result of local transaction(Commit will generate message index and make the message visible to consumers).

  2. Compensation process
    (1) For a transactional message without a Commit/Rollback (means the message in the pending status), a “back-check” request is initiated from the broker.
    (2) The Producer receives the “back-check” request and checks the status of the local transaction corresponding to the “back-check” message.
    (3) Redo Commit or Rollback based on local transaction status.
    The compensation phase is used to resolve the timeout or failure case of the message Commit or Rollback.

5.2 The design of RocketMQ Transactional Message

  1. Transactional message is invisible to users in first phase(commit-request phase)

Upon on the main process of transactional message, the message of first phase is invisible to the user. This is also the biggest difference from normal message. So how do we write the message while making it invisible to the user? And below is the solution of RocketMQ: if the message is a Half message, the topic and queueId of the original message will be backed up, and then changes the topic to RMQ_SYS_TRANS_HALF_TOPIC. Since the consumer group does not subscribe to the topic, the consumer cannot consume the Half message. Then RocketMQ starts a timing task, pulls the message for RMQ_SYS_TRANS_HALF_TOPIC, obtains a channel according to producer group and sends a back-check to query local transaction status, and decide whether to submit or roll back the message according to the status.

In RocketMQ, the storage structure of the message in the broker is as follows. Each message has corresponding index information. The Consumer reads the content of the message through the secondary index of the ConsumeQueue. The flow is as follows:

The specific implementation strategy of RocketMQ is: if the transactional message is written, topic and queueId of the message are replaced, and the original topic and queueId are stored in the properties of the message. Because the replace of the topic, the message will not be forwarded to the Consumer Queue of the original topic, and the consumer cannot perceive the existence of the message and will not consume it. In fact, changing the topic is the conventional method of RocketMQ(just recall the implementation mechanism of the delay message).

  1. Commit/Rollback operation and introduction of Op message

After finishing writing a message that is invisible to the user in the first phase, here comes two cases in the second phase. One is Commit operation, after which the message needs to be visible to the user; the other one is Rollback operation, after which the first phase message(Half message) needs to be revoked. For the case of Rollback, since first-phase message itself is invisible to the user, there is no need to actually revoke the message (in fact, RocketMQ can’t actually delete a message because it is a sequential-write file). But still some operation needs to be done to identity the final status of the message, to differ it from pending status message. To do this, the concept of “Op message” is introduced, which means the message has a certain status(Commit or Rollback). If a transactional message does not have a corresponding Op message, the status of the transaction is still undetermined (probably the second-phase failed). By introducing the Op message, the RocketMQ records an Op message for every Half message regardless it is Commit or Rollback. The only difference between Commit and Rollback is that when it comes to Commit, the index of the Half message is created before the Op message is written.

  1. How Op message stored and the correspondence between Op message and Half message

RocketMQ writes the Op message to a specific system topic(RMQ_SYS_TRANS_OP_HALF_TOPIC) which will be created via the method - TransactionalMessageUtil.buildOpTopic(); this topic is an internal Topic (like the topic of RMQ_SYS_TRANS_HALF_TOPIC) and will not be consumed by the user. The content of the Op message is the physical offset of the corresponding Half message. Through the Op message we can index to the Half message for subsequent check-back operation.

  1. Index construction of Half messages

When performing Commit operation of the second phase, the index of the Half message needs to be built. Since the Half message is written to a special topic(RMQ_SYS_TRANS_HALF_TOPIC) in the first phase of 2PC, so it needs to be read out from the special topic when building index, and replace the topic and queueId with the real target topic and queueId, and then write through a normal message that is visible to the user. Therefore, in conclusion, the second phase recovers a complete normal message using the content of the Half message stored in the first phase, and then goes through the message-writing process.

  1. How to handle the message failed in the second phase?

If commit/rollback phase fails, for example, a network problem causes the Commit to fail when you do Commit. Then certain strategy is required to make sure the message finally commit. RocketMQ uses a compensation mechanism called “back-check”. The broker initiates a back-check request for the message in pending status, and sends the request to the corresponding producer side (the same producer group as the producer group who sent the Half message). The producer checks the status of local transaction and redo Commit or Rollback. The broker performs the back-check by comparing the RMQ_SYS_TRANS_HALF_TOPIC messages and the RMQ_SYS_TRANS_OP_HALF_TOPIC messages and advances the checkpoint(recording those transactional messages that the status are certain).

RocketMQ does not back-check the status of transactional messages endlessly. The default time is 15. If the transaction status is still unknown after 15 times, RocketMQ will roll back the message by default.

6 Message Query

6.1 Query messages by messageId

6.2 Query messages by message key