Rocketmq easy to get started

Han Han turtle 2022-01-27 06:25:19 阅读数:583

rocketmq easy started

1. Document summary

Source code address :https://github.com/apache/rocketmq

Chinese document :https://github.com/apache/rocketmq/tree/master/docs/cn

Business Edition :https://www.aliyun.com/product/rocketmq

Official translation :http://www.itmuch.com/books/rocketmq/

FAQ:http://rocketmq.apache.org/docs/faq/

RocketMQ Common administrative commands :https://blog.csdn.net/gwd1154978352/article/details/80829534

RocketMQ The default configuration :https://www.cnblogs.com/jice/p/11981107.html

2. Architecture design

image-20211216140721107

The general architecture diagram is like this , Let's look at the role of each role .

2.1 Broker

RocketMQ Service for , Or a process , be called Broker.Broker The function of is to store and forward messages .RocketMQ Single machine can bear about 10 ten thousand QPS Request .

image-20211216143537625

In order to improve Broker The usability of ( Prevent a single point of failure ), And improve the performance of the server ( Load balancing ), Cluster deployment is usually done .

and Kafka equally ,RocketMQ Every one of the clusters Broker The node holds part of the total data , Therefore, horizontal expansion can be realized .

image-20211216143829615

To improve reliability ( Prevent data loss ), Every Broker Each has its own copy (slave).

image-20211216144713097

By default , Reading and writing are all in master On . stay slaveReadEnable=true Under the circumstances ,slave You can also participate in the read load . But the default is Broker=1 Of slave Will participate in the read load , And really master When consumption is slow , from whichBrokerWhenConsumeSlowly This parameter determines .

2.2 Topic

Topic Used to divide messages by subject . For example, order message 、 Logistics news, etc . Be careful , Follow Kafka The difference is , stay RocketMQ in ,topic It 's a logical concept , The message is not in accordance with Topic Divide the stored .

image-20211216145724600

Producer Send the message to the designated Topic,Consumer Subscribe to this Topic You can receive the corresponding message .

Follow Kafka equally , If topic If it doesn't exist, it will be created automatically .

private Boolean autoCreateTopicEnable=tru;

Topic And producers , Consumers are many to many relationships , A producer can send messages to multiple topic, A consumer can also subscribe to multiple topic.

2.3 NameServer

When different messages are stored in different Broker On , Producers and consumers are interested in Broker Acquisition , In other words, routing is a very key problem .

therefore , Just like the registry in distributed services ,RocketMQ It also needs a role to manage Broker Information about .

stay Kafka Inside , It's using zookeeper To unify management . however ,RocketMQ But I didn't do that , He implemented his own service ——NameServer.

We can understand it as RocketMQ Our routing center , every last NameServer All nodes store the full amount of routing information , To ensure high availability ,NameServer You can also deploy clusters . Function like Eureka perhaps Redis Sentinel.

in other words ,Broker Will be in NameServer Register yourself on ,Producer and Consumer use NameServer To discover Broker.

image-20211216153347894

2.3.1NameServer How does a routing center work ?

Every Broker When the node starts , Will traverse according to the configuration NameServer list .

rocket/conf/broke.conf

namesrv=localhost:9876

With each NameServer establish TCP A long connection , Register your information , After every 30S Send heartbeat message ( Service active registration ).

If Broker Hang up , Don't send heartbeat ,NameServer How did you find out ?

So in addition to actively registering , And regular detection . Every NameServer every other 10S Check all the Broker The last heartbeat time of , If you find someone Broker exceed 120S No heartbeat sent , Think of this as Broker It's gone , It will be removed from the routing information .

image-20211216154334519

Since they are all registration centers , Then why not use Zookeeper Well ?

In fact, in earlier versions ,RocketMQ Also used zookeeper, But removed zookeeper rely on , Choose to use your own NameServer.

This is because RocketMQ The architecture design of determines that only a lightweight metadata database is enough , Just keep the final agreement , Without the need for zookeeper Such a strong consistency solution , No need to rely on another middleware , Thus reducing maintenance costs .

According to the famous CAP theory : Uniformity , Usability , Partition tolerance .zookeeper I chose CP, and NameServer I chose AP, Abandon real-time consistency .

2.3.2 Consistency issues

since NameServer They don't communicate with each other , There is no master-slave , Then how do they maintain consistency ?

Let's analyze from the following three points :

2.3.2.1 Service registration

If you add Broker, How to add to all NameServer in ?

Because no master,Broker every other 30 Seconds will be sent to all NameServer Send heartbeat message , So it can be consistent .

2.3.2.2 Service to eliminate

If one Broker Hang up , How from all NameServer Remove information from ?

  • If Broker Normally shut down ,Netty The channel close listener will listen for the disconnect event , And then I'll take this Broker Information culling .
  • If Broker Abnormal shutdown :NameServer Your scheduled tasks will be every 10 Second scan Broker list , If a Broker The latest timestamp of your heartbeat packet exceeds the current time 120S Will be removed .

Through the above two points , No matter Broker Hang up , Still recovered , Increased or decreased ,NameServer Can keep the data consistent .

2.3.2.3 Route discovery

If Broker The information of ( Increase or decrease nodes ), How does the client get the latest Broker And the list ?

Let's start with producers . When sending the first message , according to Topic from nameserver Get routing message .

Then there are consumers . Consumers usually subscribe to fixed Topic, Get... At startup Broker Information about .

After that, if Broker What to do when the information changes ?

because NameServer It will not actively push service information to the client , The client will not send heartbeat to NameServer, So after establishing the connection , Producers and consumers need to update regularly .

We can know how it works by looking at the source code .

stay MQClientInstance Class send In the method , Started a scheduled task .

image-20211216161524404

The second task updateTopicRouteInfoFromNameServer Method , It is used to update regularly NameServer The information of , The default is 30S Get once .

image-20211216161644604

2.3.2.4 summary

each NameServer The data can be kept consistent through active registration and regular detection . Consumers and producers get the latest information by regularly updating their routes .

problem 1: If Broker Hang up , client 30 The routing information will not be updated until seconds later , Will that be the most 30 Seconds of data delay ?

answer : The solution is as follows

  • retry
  • Put the unreachable Broker Isolation
  • Or give priority to the node with small delay , You can avoid connecting to easy to hang Broker 了 .

problem 2: If you are the routing center NameServer All hung up , And it hasn't recovered yet ?

answer : It's okay , The client will cache Broker Information about , Not entirely dependent on NameServer.

2.4 Producer

producer , Used to produce messages , It will start from NameServer Pull routing information ( Don't need to configure RocketMQ Service address of ), Then, according to the routing information and the specified Broker establish TCP A long connection , This sends the message to Broker in , Send logically consistent Producer Can form a Group.

RocketMQ Our producers also support batch sending , however List Pass it on yourself .

image-20211216163046009

Producer Writing data can only operate master node .

2.5 Consumer

Consumers of messages , adopt NameServer Cluster acquisition Topic Routing information for , Connect to the corresponding Broker On the consumption news . Consumption logic is consistent Consumer Can form a Group, At this time, the news will be in Consumer Load between .

because Master and Slave Can read messages , therefore Consumer Will be with Master and Slave All set up connections .

Be careful : The same consumer group Consumers in should subscribe to the same topic. Or vice versa , Consumption is different topic Consumers should not use the same consumer group name . If it's not the same , The subscription of later consumers , Will overwrite the previous subscription .

Consumers have two ways of spending : One is cluster consumption ( Message polling ), One is broadcast consumption ( All received the same copy ).

image-20211216163641748

image-20211216163733900

2.5.1 pull

In terms of consumption model ,RocketMQ Support pull and push Two modes .

Pull The pattern is consumer Polling from broker Pull the news . Implementation class :DefaultMQPullConsumer( obsolete ), substitution-type :DefaultLitePullConsumer.

DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("my_test_consumer_group");

Pull There are two ways to do it : One is ordinary polling (Polling). Whether the server data is updated or not , The client requests to pull the data at regular intervals , There may be updated data returned , Or maybe nothing .

Disadvantages of ordinary polling : Because most of the time there is no data , These invalid requests will greatly waste server resources . And when the interval between timing requests is too long , Can cause message delay .

RocketMQ Of pull Use long polling to achieve .

Client initiated Long Polling, If the server does not have relevant data at this time , Meeting hold Request for accommodation , Until the server has relevant data , Or wait for a certain time to timeout before returning . After the return , The client will immediately initiate the next Long Polling ( So-called hold The request means that the server will not reply to the result for the time being , Save related requests , Do not close the requested connection , When the relevant data are ready , Write back to the client ).

Long polling solves the problem of polling , The only disadvantage is that the server consumes more memory when it is suspended .

2.5.2 push

Push The pattern is Broker Push a message to consumer, Implementation class :DefaultMQPushConsumer.

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my_test_consumer_group");

RocketMQ Of push The pattern is actually based on pull Pattern implementation , It's just that pull A layer is encapsulated on the pattern , therefore RocketMQ push Patterns are not real “ Push mode ”.

stay RocketMQ in ,PushConsumer registers MessageListener Monitor , After getting the message , Wake up the MessageListener Of consumeMessage() To consume , For users , I feel that the news is pushed .

2.6 Message Queue

RocketMQ Support more master The architecture of .

Think about such a problem : When we have more than one master When , To a Topic Multiple messages will be in multiple master Of Broker On storage .

that , To a certain Topic Multiple messages , Is not in all Broker Store exactly the same content on ?

Definitely not . If all master Store the same content , and slave Follow again master Store the same content : first , Wasted storage space . the second , It is not possible to linearly raise... By increasing the number of machines Broker Performance of , That is, it can only expand vertically , Improve performance by upgrading hardware , Cannot achieve horizontal ( level ) Expand . So in a distributed environment ,RocketMQ The performance of is bound to be very limited .

In a word : Not in line with the idea of fragmentation .

So the most critical question is , How to send to a Topic The messages are distributed to different master How about it ?

stay kafka There is a partition, One Topic It can be split into multiple partition, these partition Can be distributed in different Broker On , In this way, data fragmentation is realized , It has also been decided. kafka Horizontal expansion can be realized .

RocketMQ Is there such a design ?

In a Broker On ,RocketMQ There is only one storage file , Did not like Kafka Same, according to different Topic Separate storage . Data directory :

image-20211216165936581

in other words , If there is 3 individual Broker, It's just 3 A for storing different data commitlog. That's the question , If you don't distribute according to the partition , What should the data be distributed according to ?

RocketMQ There is a design called Message Queue The logical concept of , Action follows partition similar .

First , We created Topic The number of queues will be specified when , One is called writeQueueNums ( Number of write queues ), One readQueueNums( Number of read queues ).

image-20211216170045651

The number of write queues determines how many Message Queue, The number of read queues determines how many threads consume these Message Queue( Just for loading ).

That doesn't specify MQ When? ? There are several by default MQ Well ?

The server creates a Topic Default 8 A queue (BrokerConfig):

private int defaultTopicQueueNums=8

topi non-existent , Create a default when the producer sends a message 4 A queue (DefaultMQProducer):

private volatile int defaultTopicQueueNums=4;

MessageQueue You can see it on disk , But the number is only related to the write queue .

For example, I use producer Send a message ,consumerqueue Four directories will appear under the directory .

image-20211216172232013

The client encapsulates a MessageQueue object , There are actually three pieces of content :

private String topic;
private String brokerName;
private int queueId;

topic Which is it topic Queues .Broker Represents where it is Broker On , For example, there are two master, One is called broker-a, One is called broker-b.queueld Represents the number of segments it is .

give an example : One Topic Yes 3 individual Message Queue, The number is 1、2、3. There are just three Broker, first MQ Point to Broker1, the second MQ Point to Broker2, Third MQ Point to Broker3.

When sending a message , Producers will follow certain rules , get MessageQueue, Just get it queueld, You know which one to send to Broker, And then in commitlog Write message .

Number of queues seen on disk , It is determined by the number of write queues , And in all master The last number is the same ( But data storage is different ).

give an example : There are two clusters master. If you create a topic , Yes 2 Write queues 、1 Read queues (topic name :q-2-1).

So the of the two machines consumequeue The directory will appear 2 A queue , altogether 4 A queue ./opt/rocketmq/store/broker-a/consumequeue/q-2-1 That is, the total number of queues is : Number of write queues * Number of nodes .

image-20211216172833654

If we send 6 Bar message , Number the messages in sequence , What queue will you choose to send ?

We test the following with code :

Start the consumer code first :

public class SimpleConsumer {

public static void main(String[] args) throws MQClientException {

// Instantiate with specified consumer group name.
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my_test_consumer_group");
// Specify name server addresses.
consumer.setNamesrvAddr("192.168.44.163:9876;192.168.44.164:9876");
consumer.setMessageModel(MessageModel.BROADCASTING);
// Subscribe one more more topics to consume.
consumer.subscribe("q-2-1", "*");
// Register callback to execute on arrival of messages fetched from brokers.
consumer.registerMessageListener(new MessageListenerConcurrently() {

@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {

System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
for (MessageExt msg : msgs) {

String topic = msg.getTopic();
String messageBody = "";
try {

messageBody = new String(msg.getBody(), "utf-8");
} catch (UnsupportedEncodingException e) {

e.printStackTrace();
// Consume again 
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
String tags = msg.getTags();
System.out.println("topic:" + topic + ",tags:" + tags + ",msg:" + messageBody);
}
// Consumption success 
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//Launch the consumer instance.
consumer.start();
System.out.printf("Consumer Started.%n");
}
}

Then start the producer :

public class Producer {

public static void main(String[] args) throws MQClientException, InterruptedException {

DefaultMQProducer producer = new DefaultMQProducer("my_test_producer_group");
producer.setNamesrvAddr("192.168.44.163:9876;192.168.44.164:9876");
producer.start();
for (int i = 0; i < 6; i++) {

try {

// tags Used to filter messages keys Index key , Multiple spaces ,RocketMQ According to these key Quickly retrieve messages 
Message msg = new Message("q-2-1",
"TagA",
"2673",
("RocketMQ " + String.format("%05d", i)).getBytes());
SendResult sendResult = producer.send(msg);
System.out.println(String.format("%05d", i) + " : " + sendResult);
} catch (Exception e) {

e.printStackTrace();
}
}
producer.shutdown();
}
}

Because consumers have only one read queue , So it can only consume with the number 0 Queues .

In turn, , If it is 1 Write queues ,2 Read queues :

that broker Of consumerqueue The directory will appear 1 A queue .

image-20211216203446901

Because there are 2 Read queues , So all messages can be received .

summary

The number of read-write queues should be consistent , Otherwise, it will be impossible to consume .

3. Java Development

3.1 Java API

The official provided Java client API, You just need to introduce related dependencies .

<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.7.1</version>
</dependency>

The official also provides detailed code cases .

package effect
batch Bulk messages , use List send out
benchmart Performance testing
broadcast Broadcast news
delay Delay message msg.setDelayTimeLevel
filter be based on tag or sql Expression filtering
operation Command line
ordermessage Sequential message
quickstart introduction
rpc Realization RPC call
simple ACL, asynchronous ,assign,subscribe
tracemessage Message tracking
transaction Transaction message

3.1.1 producer

Let's take a look at the simple producer code :

public class Producer {

public static void main(String[] args) throws MQClientException, InterruptedException {

DefaultMQProducer producer = new DefaultMQProducer("my_test_producer_group");
producer.setNamesrvAddr("192.168.44.163:9876;192.168.44.164:9876");
producer.start();
for (int i = 0; i < 6; i++) {

try {

// tags Used to filter messages keys Index key , Multiple spaces ,RocketMQ According to these key Quickly retrieve messages 
Message msg = new Message("q-2-1",
"TagA",
"2673",
("RocketMQ " + String.format("%05d", i)).getBytes());
SendResult sendResult = producer.send(msg);
System.out.println(String.format("%05d", i) + " : " + sendResult);
} catch (Exception e) {

e.printStackTrace();
}
}
producer.shutdown();
}
}

Code reading :

  1. Multiple producers sending the same type of message are called a producer group .
  2. Producers need to pass NameServer Get all Broker Routing information for , Multiple are separated by semicolons . Follow Redis Like a sentinel , Get the server address through the sentry .
  3. Message On behalf of a message , Must specify Topic, Represents the classification of messages , It 's a logical concept , For example, order messages 、 News about funds .
  4. Tags: Optional , Used to filter messages on the consumer side , Is a breakdown of the purpose of the message , such as Topic It's an order message , tag It can be create,update, delete.
  5. Keys: Message keywords . If there are more than one , Space off .RocketMQ According to these key Quickly retrieve messages , Equivalent to the index of the message , It can be set as the unique number of the message ( Primary key ).
  6. SendResult Is the encapsulation of the sending result , Including message status 、 news ld、 Selected queue, etc , As long as we don't throw exceptions , It means that the sending is successful .
SendResult [sendStatus=SEND_OK, msgId=COA8006F5B6418B4AAC299CF37140009, offsetMsgld=null,messageQueue=MessageQueue [topic=ransaction-test-topic, brokerName=broker-b, queueld=3], queueOffset=5]

msgld: Unique number generated by the producer , Globally unique , Also called uniqld.

offsetMsgld: Message offset ID, The ID The physical address of the cluster where the message is located is recorded , It mainly contains the stored data Broker The address of the server (IP And port number ) And where commitlog The physical offset of the file .

3.1.2 consumer

public class SimpleConsumer {

public static void main(String[] args) throws MQClientException {

// Instantiate with specified consumer group name.
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my_test_consumer_group");
// Specify name server addresses.
consumer.setNamesrvAddr("192.168.44.163:9876;192.168.44.164:9876");
consumer.setMessageModel(MessageModel.BROADCASTING);
// Subscribe one more more topics to consume.
consumer.subscribe("q-2-1", "*");
// Register callback to execute on arrival of messages fetched from brokers.
consumer.registerMessageListener(new MessageListenerConcurrently() {

@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {

System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
for (MessageExt msg : msgs) {

String topic = msg.getTopic();
String messageBody = "";
try {

messageBody = new String(msg.getBody(), "utf-8");
} catch (UnsupportedEncodingException e) {

e.printStackTrace();
// Consume again 
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
String tags = msg.getTags();
System.out.println("topic:" + topic + ",tags:" + tags + ",msg:" + messageBody);
}
// Consumption success 
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//Launch the consumer instance.
consumer.start();
System.out.printf("Consumer Started.%n");
}
}

Consumer code interpretation :

1、 Consumer groups consume the same topic Consumer groups .

2、 Consumers need to start from nameServer Get topic Of queue Where Broker Address , Multiple are separated by semicolons .

3、Consumer It can be started in two modes , radio broadcast (Broadcast) And clusters (Cluster), In broadcast mode , A message will be sent to all Consumer, In cluster mode, only one message will be sent Consumer.

4、 Wildcards can be used for subscriptions ,topic The parameter after the name follows the producer message tag That's the correspondence , You can use wildcards ,* Represents matching all messages ;|| Separate multiple .

consumer.subscribe("q-2-1", "*");

5、return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; This sentence is to tell Broker Consumption success , Can be updated offset 了 . That is, sending ACK.

3.2 Spring boot

3.2.1 rely on

<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.4</version>
</dependency>

stay spring boot Use in RocketMQ It can simplify the configuration , Management object , Purpose of providing template method .

3.2.2 To configure

The configuration of the client can be written directly in the configuration file

server.port=9096
spring.application.name=demo
rocketmq.name-server=localhost:9876
rocketmq.producer.group=test-group
rocketmq.producer.send-message-timeout=3000

3.2.3 consumer

Create a consumer class , add @RocketMQMessageListener Comment listening messages .

/** * MessageModel: Cluster pattern ; Broadcast mode * ConsumeMode: Sequential consumption ; Disorderly consumption */
@Component
@RocketMQMessageListener(topic = "springboot-topic", consumerGroup = "consumer-group",
//selectorExpression = "tag1",selectorType = SelectorType.TAG,
messageModel = MessageModel.CLUSTERING, consumeMode = ConsumeMode.CONCURRENTLY)
public class MessageConsumer implements RocketMQListener<String> {

@Override
public void onMessage(String message) {

try {

System.out.println("---------- Received rocketmq news :" + message);
} catch (Exception e) {

e.printStackTrace();
}
}
}

1、 Use the... In the annotation selectorExpression Property to filter messages .

2、 In annotation properties MessageModel It's the message model , Two values :

​ CLISTERING: Multiple messengers poll consumer messages ( Default ).

​ BROADCASTING: All consumers receive the same message .

3、consumeMode It's a consumption model , Two values :

  • CONCURRENTLY: concurrent ( Default ). Concurrent consumption on the consumer side , Message order is not guaranteed . How many threads consume concurrently , Depending on the size of the thread pool .
  • ORDERLY: Orderly . Orderly consumption on the consumer side , That is, the order in which producers send is consistent with the order in which consumers consume .

​ The difference between the two : Sequential consumption requires locking the queue to be processed , Make sure the same queue , At the same time , Only one consuming thread is allowed to process . Obviously, concurrent consumption is more efficient .

3.2.4 producer

@Component
public class MessageSender {

@Autowired
private RocketMQTemplate rocketMQTemplate;
public void syncSend() {

SendResult result = rocketMQTemplate.syncSend("springboot-topic:tag", " This is a synchronous message ", 10000);
System.out.println(result);
}
}

The producer's code is simpler , Just inject RocketMQTemplate You can send messages .

There are several types of sending messages :

1、 Sync (syncSend Method )∶ After the message sender sends data , The communication method of sending the next packet after receiving the response from the receiver .

2、 asynchronous (asyncSend Method )︰ After the sender sends data , Wait for the receiver to send back the response , Then send the next packet communication mode .MQ Send asynchronously , The user needs to implement the asynchronous send callback interface (SendCallback), When performing asynchronous sending of messages , The application can return directly without waiting for the server response , Receive the server response through the callback interface , And process the response results of the server .

3、 A one-way (sendOneWay Method )∶ The feature is only responsible for sending messages , Don't wait for the server to respond and no callback function to trigger , I.e. only send the request without waiting for the answer . The process of sending messages in this way is very short , Usually at the microsecond level . Application scenarios : For some very short time consuming , But the reliability is not high , For example, log collection .

How to select the sending method ?

  • When the message sent is not important , use one-way The way , To improve throughput ;
  • When the message sent is important , And not sensitive to response time sync The way ;
  • When sending a message is important , And very sensitive to response time async The way .

4. Project address

RocketMQ-Demo

copyright:author[Han Han turtle],Please bring the original link to reprint, thank you. https://en.javamana.com/2022/01/202201270625030411.html