Two functions below are provided in the basic sample:
The RocketMQ can be utilized to send messages in three ways: reliable synchronous, reliable asynchronous, and one-way transmission. The first two message types are reliable because there is a response whether they were sent successfully.
publicclassSyncProducer { publicstaticvoidmain(String[] args)throws Exception { // Instantiate with a producer group name DefaultMQProducerproducer=newDefaultMQProducer("please_rename_unique_group_name"); // Specify name server addresses producer.setNamesrvAddr("localhost:9876"); // Launch the producer instance producer.start(); for (inti=0; i < 100; i++) { // Create a message instance with specifying topic, tag and message body Messagemsg=newMessage("TopicTest"/* Topic */, "TagA"/* Tag */, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */ ); // Send message to one of brokers SendResultsendResult= producer.send(msg); // Check whether the message has been delivered by the callback of sendResult System.out.printf("%s%n", sendResult); } // Shut down once the producer instance is not longer in use producer.shutdown(); } }
2.2 Send Asynchronous Messages
Asynchronous transmission is generally used in response time sensitive business scenarios. It means that it is unable for the sender to wait the response of the Broker too long.
publicclassAsyncProducer { publicstaticvoidmain(String[] args)throws Exception { // Instantiate with a producer group name DefaultMQProducerproducer=newDefaultMQProducer("please_rename_unique_group_name"); // Specify name server addresses producer.setNamesrvAddr("localhost:9876"); // Launch the producer instance producer.start(); producer.setRetryTimesWhenSendAsyncFailed(0); for (inti=0; i < 100; i++) { finalintindex= i; // Create a message instance with specifying topic, tag and message body Messagemsg=newMessage("TopicTest", "TagA", "OrderID188", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); // SendCallback: receive the callback of the asynchronous return result. producer.send(msg, newSendCallback() { @Override publicvoidonSuccess(SendResult sendResult) { System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId()); } @Override publicvoidonException(Throwable e) { System.out.printf("%-10d Exception %s %n", index, e); e.printStackTrace(); } }); } // Shut down once the producer instance is not longer in use producer.shutdown(); } }
2.3 Send Messages in One-way Mode
One-way transmission is used for cases requiring moderate reliability, such as log collection.
publicclassOnewayProducer { publicstaticvoidmain(String[] args)throws Exception{ // Instantiate with a producer group name DefaultMQProducerproducer=newDefaultMQProducer("please_rename_unique_group_name"); // Specify name server addresses producer.setNamesrvAddr("localhost:9876"); // Launch the producer instance producer.start(); for (inti=0; i < 100; i++) { // Create a message instance with specifying topic, tag and message body Messagemsg=newMessage("TopicTest"/* Topic */, "TagA"/* Tag */, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */ ); // Send in one-way mode, no return result producer.sendOneway(msg); } // Shut down once the producer instance is not longer in use producer.shutdown(); } }
publicclassConsumer { publicstaticvoidmain(String[] args)throws InterruptedException, MQClientException { // Instantiate with specified consumer group name DefaultMQPushConsumerconsumer=newDefaultMQPushConsumer("please_rename_unique_group_name"); // Specify name server addresses consumer.setNamesrvAddr("localhost:9876");
// Subscribe one or more topics and tags for finding those messages need to be consumed consumer.subscribe("TopicTest", "*"); // Register callback to execute on arrival of messages fetched from brokers consumer.registerMessageListener(newMessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); // Mark the message that have been consumed successfully return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // Launch the consumer instance consumer.start(); System.out.printf("Consumer Started.%n"); } }