Rocketmq improves performance by 30%

ImportNew 2022-02-13 09:13:30 阅读数:773

rocketmq improves performance

From the official side ,RocketMQ stay 4.9.1 There are a lot of optimizations for message sending in version , The performance improvement is very significant , Next, please follow me to appreciate the masterpieces of the great gods .

according to RocketMQ 4.9.1 Update log of , From it, we can extract some information about the optimization of message sending performance (Issues: 2883). Detailed links are as follows : The specific optimization points are shown in the screenshot :


First, try to make a brief introduction to the above optimization points :

  • Yes WaitNotifyObject The lock is optimized (item2)
  • remove HAService In the lock (item3)
  • remove GroupCommitService In the lock (item4)
  • eliminate HA Unnecessary array copy in (item5)
  • Adjust the default values of several parameters for message sending (item7)
    • sendMessageThreadPoolNums
    • useReentrantLockWhenPutMessage
    • flushCommitLogTimed
    • endTransactionThreadPoolNums
  • Reduce the scope of the lock (item8-12)

By reading the above changes , It is concluded that the optimization means mainly include the following three points :

  • Remove unnecessary locks
  • Reduced lock granularity ( Range )
  • Modify the parameters related to message sending

Next, combined with the source code , Select representative functions for detailed analysis , Understand together Java The charm of high concurrency programming .

1. Remove unnecessary locks

This performance optimization , The main aim is RocketMQ Synchronously copy the scene .

Let's first briefly introduce RocketMQ Master slave synchronization in programming skills .

RocketMQ After the main node writes the message to memory , If synchronous replication is used , You need to wait for the slave node to write successfully before returning success to the message sending client .

Also very skilled in coding , The sequence diagram is shown in the figure below :


reminder : stay RocketMQ 4.7 Message sending has been optimized since version . Synchronous message sending model introduces JDK Of CompletableFuture Realize the asynchronous sending of messages .

Interpretation of core steps :

  1. The message sending thread calls Commitlog Of aysncPutMessage Method to write a message ;
  2. Commitlog call submitReplicaRequest Method , Submit the task to GroupTransferService in , And get a Future Implement asynchronous programming . It is worth noting that , We need to wait here , Wait until the data is successfully written to the slave node ( Internally based on CompletableFuture Mechanism's internal thread pool ForkJoin);
  3. GroupTransferService Judge the submitted tasks in turn , Judge whether the corresponding request has been synchronized to the slave node ;
  4. If it has been copied to the slave node , Through Future Wake up the , And return the result to the message sender .

GroupTransferService The code is shown in the following figure :


In order to make it easier for you to understand the next optimization points , First, summarize and refine GroupTransferService Design concept of :

  • First introduce two List combination , They are named read 、 Write a linked list ;
  • External call GroupTransferService Of putRequest request , Store in the write linked list (requestWrite);
  • GroupTransferService Of run Methods from requestRead Get the task from the linked list , Judge whether the requested data corresponding to these tasks are successfully written to the slave node ;
  • whenever requestRead There is no readable data in , Two queues interact , So as to realize the separation of reading and writing , Reduce lock competition .

The optimization points of the new version mainly include :

  • change putRequest Lock type of , Replace... With a spin lock synchronized;
  • Remove doWaitTransfer Extra locks in method .

1.1 Replace with spin lock synchronized

As shown in the figure below ,GroupTransferService Providing interfaces to the outside world putRequest, Used to accept external synchronization tasks . Need to be right ArrayList Lock for protection , Go to ArrayList Adding data to is a memory operation , The operation takes less time .


So there is no need to take synchronized such synchronized, Instead, you can use spin locks .

The implementation of spin lock is very lightweight , Its implementation is shown in the figure below :


The whole lock implementation only needs to introduce one AtomicBoolean, Lock 、 Release locks are based on CAS operation , It's very light . And the spin lock does not cause thread switching .

1.2 Remove excess locks

“ lock ” The abuse of is a very common phenomenon , Programming in multithreaded environment is a very complex interactive process . In the process of writing code , We may feel that we can't predict whether this code will be executed concurrently by multiple threads . For the sake of prudence , Just lock it directly, simply and rudely , The natural result is the loss of performance . Remove the lock here , We will combine the calling chain of this class , Determine whether locking is required .

Whole GroupTransferService Run in a multithreaded environment , What needs to be protected is mainly requestRead And requestWrite aggregate . The purpose of the lock introduced is to ensure that the two collections are safely accessed in a multithreaded environment , So we should first sort out GroupTransferService The operation process of the core method .


doWaitTransfer The main object of method operation is requestRead Linked list . And this method will only be GroupTransferService Thread calls , also requestRead The method will be in swapRequest Has been modified . These two methods are executed serially , And in the same thread , Therefore, there is no need to introduce a lock , The lock can be removed .

But since the lock was removed , stay swapRequests Lock in , because requestWrite This queue will be accessed by multiple threads .

The optimized code is as follows :


From this point of view , In fact, it mainly changes the type of lock from synchronized Replace with a lighter spin lock .

2. Reduce the range of the lock

The code block wrapped by the lock is executed serially , That is, it cannot be concurrent . In case of unavoidable lock , Code block for lowering lock , Can effectively improve concurrency , The diagram is as follows :


If multiple thread areas access lock1、lock2, stay lock1 in doSomeThing1、doSomeThing2 Both methods must be executed serially . Multiple threads access at the same time lock2 Method ,doSomeThing1 Can be executed by multiple threads at the same time , Only doSomething2 Serial execution is required . The overall concurrency effect must be lock2.

Based on this theory, a best practice of lock use : The fewer code blocks the lock wraps, the better .

In the old version , The code block for message writing and locking is relatively large , Some actions that can be executed concurrently are also locked , For example, generation offsetMsgId.


The new version adopts the idea of functional programming , Just define to get msgId Methods , When writing a message, it does not execute . Reduce lock granularity , bring offsetMsgId Parallelization of generation , The cleverness of its programming means , It's worth learning .

3. Adjust the parameters related to message sending

3.1 sendMessageThreadPoolNums

Broker End elimination Number of message sending end thread pools , The value is 4.9.0 Before version, the default is 1, The new version is adjusted to the of the operating system CPU Check the number , And not less than 4.

The adjustment of this parameter has advantages and disadvantages . Improve the concurrency of message sending , But at the same time, the message order will be out of order . The example is shown below. There will be no sequence problem under synchronous transmission , It can be modified with confidence :


In the sequential consumption scenario , This parameter is not recommended to be modified . In the actual process, we should treat RocketMQ Cluster governance , Sequential consumption scenarios use specialized clusters .

3.2 useReentrantLockWhenPutMessage

MQ The type of lock used to lock memory when writing messages . Before the lower version, the default is false Indicates that the spin lock is used by default , The new version uses ReentrantLock.

The main advantage of spin is that there is no thread switching cost , But spin is easy to cause CPU Waste of . Memory writes are fast in most cases , but   RocketMQ Compare dependent page cache , If cache jitter occurs , It brings CPU Waste is not worth it . stay sendMessageThreadPoolNums Set more than 1 after , The type of lock uses ReentrantLock A more stable .

3.3 flushCommitLogTimed

First, let's understand the meaning of this parameter by observing the source code :


Its main function is to control the way of disk brushing thread blocking and waiting , Low version flushCommitLogTimed by false, By default CountDownLatch, The higher version uses Thread.sleep.

Guess the reason is that the disk brushing thread is relatively independent , No direct interaction with other threads , So there is no need to use CountDownLatch This is specifically used for thread collaboration “ Foreign monk ”.

3.4 endTransactionThreadPoolNums

It is mainly used to set the size of the transaction message thread pool .


The new version can dynamically adjust the value of transaction messages by adjusting the sending thread pool , This can be dynamically adjusted according to the pressure measurement results .

- EOF -

Recommended reading    Click on the title to jump to

1、 From the end of the year to the release of production failure RocketMQ Best practices for cluster deployment

2、RocketMQ Scenario analysis of message loss and how to solve it !

3、Kafka and RocketMQ Things you don't know about the underlying storage

After reading this article, there are gains ? Please forward to share with more people

Focus on 「ImportNew」, promote Java Skill


Praise and watching is the greatest support

copyright:author[ImportNew],Please bring the original link to reprint, thank you.