ImportNew 2022-02-13 09:13:30 阅读数:773
From the official side ,RocketMQ stay 4.9.1 There are a lot of optimizations for message sending in version , The performance improvement is very significant , Next, please follow me to appreciate the masterpieces of the great gods .
according to RocketMQ 4.9.1 Update log of , From it, we can extract some information about the optimization of message sending performance (Issues: 2883). Detailed links are as follows ： The specific optimization points are shown in the screenshot ：
First, try to make a brief introduction to the above optimization points ：
By reading the above changes , It is concluded that the optimization means mainly include the following three points ：
Next, combined with the source code , Select representative functions for detailed analysis , Understand together Java The charm of high concurrency programming .
This performance optimization , The main aim is RocketMQ Synchronously copy the scene .
Let's first briefly introduce RocketMQ Master slave synchronization in programming skills .
RocketMQ After the main node writes the message to memory , If synchronous replication is used , You need to wait for the slave node to write successfully before returning success to the message sending client .
Also very skilled in coding , The sequence diagram is shown in the figure below ：
reminder ： stay RocketMQ 4.7 Message sending has been optimized since version . Synchronous message sending model introduces JDK Of CompletableFuture Realize the asynchronous sending of messages .
Interpretation of core steps ：
GroupTransferService The code is shown in the following figure ：
In order to make it easier for you to understand the next optimization points , First, summarize and refine GroupTransferService Design concept of ：
The optimization points of the new version mainly include ：
As shown in the figure below ,GroupTransferService Providing interfaces to the outside world putRequest, Used to accept external synchronization tasks . Need to be right ArrayList Lock for protection , Go to ArrayList Adding data to is a memory operation , The operation takes less time .
So there is no need to take synchronized such synchronized, Instead, you can use spin locks .
The implementation of spin lock is very lightweight , Its implementation is shown in the figure below ：
The whole lock implementation only needs to introduce one AtomicBoolean, Lock 、 Release locks are based on CAS operation , It's very light . And the spin lock does not cause thread switching .
“ lock ” The abuse of is a very common phenomenon , Programming in multithreaded environment is a very complex interactive process . In the process of writing code , We may feel that we can't predict whether this code will be executed concurrently by multiple threads . For the sake of prudence , Just lock it directly, simply and rudely , The natural result is the loss of performance . Remove the lock here , We will combine the calling chain of this class , Determine whether locking is required .
Whole GroupTransferService Run in a multithreaded environment , What needs to be protected is mainly requestRead And requestWrite aggregate . The purpose of the lock introduced is to ensure that the two collections are safely accessed in a multithreaded environment , So we should first sort out GroupTransferService The operation process of the core method .
doWaitTransfer The main object of method operation is requestRead Linked list . And this method will only be GroupTransferService Thread calls , also requestRead The method will be in swapRequest Has been modified . These two methods are executed serially , And in the same thread , Therefore, there is no need to introduce a lock , The lock can be removed .
But since the lock was removed , stay swapRequests Lock in , because requestWrite This queue will be accessed by multiple threads .
The optimized code is as follows ：
The code block wrapped by the lock is executed serially , That is, it cannot be concurrent . In case of unavoidable lock , Code block for lowering lock , Can effectively improve concurrency , The diagram is as follows ：
If multiple thread areas access lock1、lock2, stay lock1 in doSomeThing1、doSomeThing2 Both methods must be executed serially . Multiple threads access at the same time lock2 Method ,doSomeThing1 Can be executed by multiple threads at the same time , Only doSomething2 Serial execution is required . The overall concurrency effect must be lock2.
Based on this theory, a best practice of lock use ： The fewer code blocks the lock wraps, the better .
In the old version , The code block for message writing and locking is relatively large , Some actions that can be executed concurrently are also locked , For example, generation offsetMsgId.
Broker End elimination Number of message sending end thread pools , The value is 4.9.0 Before version, the default is 1, The new version is adjusted to the of the operating system CPU Check the number , And not less than 4.
The adjustment of this parameter has advantages and disadvantages . Improve the concurrency of message sending , But at the same time, the message order will be out of order . The example is shown below. There will be no sequence problem under synchronous transmission , It can be modified with confidence ：
In the sequential consumption scenario , This parameter is not recommended to be modified . In the actual process, we should treat RocketMQ Cluster governance , Sequential consumption scenarios use specialized clusters .
MQ The type of lock used to lock memory when writing messages . Before the lower version, the default is false Indicates that the spin lock is used by default , The new version uses ReentrantLock.
The main advantage of spin is that there is no thread switching cost , But spin is easy to cause CPU Waste of . Memory writes are fast in most cases , but RocketMQ Compare dependent page cache , If cache jitter occurs , It brings CPU Waste is not worth it . stay sendMessageThreadPoolNums Set more than 1 after , The type of lock uses ReentrantLock A more stable .
First, let's understand the meaning of this parameter by observing the source code ：
Its main function is to control the way of disk brushing thread blocking and waiting , Low version flushCommitLogTimed by false, By default CountDownLatch, The higher version uses Thread.sleep.
Guess the reason is that the disk brushing thread is relatively independent , No direct interaction with other threads , So there is no need to use CountDownLatch This is specifically used for thread collaboration “ Foreign monk ”.
It is mainly used to set the size of the transaction message thread pool .
The new version can dynamically adjust the value of transaction messages by adjusting the sending thread pool , This can be dynamically adjusted according to the pressure measurement results .
- EOF -
copyright：author[ImportNew]，Please bring the original link to reprint, thank you. https://en.javamana.com/2022/02/202202130913282987.html