Rocketmq deferred message (scheduled message) version 4.9.3 optimized asynchronous delivery support

Scarb 2022-06-24 06:55:34 阅读数:922

rocketmqdeferredmessagescheduledmessage

[TOC]

1. summary

stay RocketMQ 4.9.3 In the version ,@Git-Yang Great optimization for deferred messages , Greatly improve the performance of delayed messages .

among ,PR#3287 Use what was originally used to initiate periodic tasks Timer Change to use ScheduledExecutorService, The performance of sending delay messages at the same time under multiple delay levels is improved 3+ times .

This article focuses on another change PR#3458: Support asynchronous delivery of delayed messages . The old version , Delay message due delivery to CommitLog Is synchronized , stay Dledger Poor performance in mode . The new change changes the expiration delivery mode of delayed messages to configurable , Use BrokerConfig Of enableScheduleAsyncDeliver Property configuration . Change to asynchronous delivery , stay Dledger Improved performance under 3 About times .

This article focuses on the logic of asynchronous delivery of timed messages , See this article for the old version of deferred message flow and source code analysis :RocketMQ Delay message ( Timing message )

2. Change resolution

2.1 Change the task of multi delay level delay message scanning and delivery from single thread execution to multi thread execution

This change will delay messages from the task scheduler Timer Change it to ScheduledExecutorService.

In the old version , all 18 The scheduled message scanning and delivery tasks with a delay level are composed of Timer Start scheduled task execution .Timer All scheduled tasks in are controlled by A worker thread is single threaded Of , If a task is slow , There are new tasks coming in , This will cause a new task to wait for the completion of the previous task .

Change it to ScheduledExecutorService Multithreading processing tasks after thread pool , It can greatly improve the processing speed of delayed messages , And avoid the blocking caused by the simultaneous sending of messages with multiple delay levels .


Performance changes after modification , Source :https://github.com/apache/rocketmq/issues/3286

  • Before modification , Simultaneous direction 4 Delay levels send delay messages ,TPS: 657 ​  Before modification , Simultaneous direction 4 Delay levels send delay messages

  • After the changes , Simultaneous direction 4 Delay levels send delay messages ,TPS: 2453

     After the changes , Simultaneous direction 4 Delay levels send delay messages

2.2 Support asynchronous delivery of delayed messages , promote Dledger Delivery performance in mode

The original scheduled message delivery is single thread synchronous delivery , stay DLedger There is a performance bottleneck in mode .

Because in DLedger In mode , The role of the master node will change to SYNC_MASTER, Synchronous replication . That is, you need enough slave nodes to store the message , Will return a write success to the primary node .

This change changes the write of deferred messages to configurable synchronous or asynchronous write , Asynchronous write in DLedger Improved performance in mode 3 About times .

2.2.1 Points for attention of asynchronous delivery

The two main disadvantages of asynchronous delivery are

  1. The order of message delivery cannot be guaranteed
  2. Messages may be delivered repeatedly

Points for attention of asynchronous delivery

  • Flow control is required , When writing TPS When it is too high , Page caching can be busy ; Even the node memory will be exploded .

  • There may be situations where messages may be lost , For example, when delivering, the page cache is busy or other reasons lead to a delivery failure . The processing at this time is to re deliver the failed message , retry 3 After failure , Block the thread corresponding to the current delay level , Until the re delivery is successful .

2.2.2 Asynchronous delivery logic

First, let's review Synchronous delivery The logic of : Each delay level is assigned a thread , Continuously start the task to scan whether there are expired messages in the consumption queue corresponding to this level . If yes, the expired messages will be delivered synchronously one by one , After the successful delivery, the corresponding... Of this level will be updated offset, The next task starts from this offset Start scanning for new messages .


Asynchronous delivery The logic of is different from that of synchronous delivery :

Asynchronous delivery uses production - Consumption patterns , The objects of production and consumption are asynchronous delivery tasks . The producer thread is responsible for creating a post task for expired messages , Consumers consume these tasks , Update according to the execution status of the task offset Or try again . Here we introduce a Blocking queues As a container for asynchronous delivery of tasks , The size of the blocking queue can be configured , Indicates the number of messages that can be delivered at the same time . Trigger flow control when the delivery task in the queue is full .

When the message corresponding to the delay level is delivered asynchronously , You need to put the tasks delivered asynchronously into the processing queue . here , It may be due to flow control and other reasons , The post task failed to put in the queue , Then wait a while and perform the scan again - Delivery logic .

Messages will not be delivered directly , Therefore, the consumer thread needs to consume from the queue and judge the status of these asynchronous delivery tasks . If the delivery task has been completed , Update offset; If the delivery is abnormal , Then wait for a while and re synchronize the delivery ; If the delivery is successful, it will be updated offset, If the delivery fails, continue to retry .

3. Asynchronous delivery details

The delivery logic of delayed messages is all in ScheduleMessageService Class .

The following takes the processing of a delay level as an example , Show the workflow of message delivery thread and task update thread .

On the left is the scheduled message expiration delivery thread , On the right is the post process status update thread .

3.1 Scheduled message delivery thread

The delayed message delivery service maintains a offset surface offsetTable, Indicates that the message currently delivered at each delay level is at ConsumeQuque Logic in offset. It is used to mark the start position of the scan when the shutdown is resumed , So the table will be persisted to the disk on a regular basis , And the slave node will periodically pull the latest value of the table from the master node .

When the delay message processing service is started, it will be in deliverExecutorService A thread pool is created and executed for each delay level DeliverDelayedMessageTimerTask Mission , This task is not periodic , Instead, execute the next task at the end of one task . For this task executeOnTimeup() Method is the logic of message delivery . The above figure shows the logic in this method .

  1. Get... Of this level ConsumeQueue, Scan messages for expiration in turn
  2. If the message expires , from CommitLog Find out the complete information of the message in , Restore its authenticity from the attribute Topic and QueueId, And then deliver .( Synchronous or asynchronous delivery according to the configuration , Here we will explain it asynchronously )
  3. After the asynchronous message is delivered , The delivery process is put into the blocking queue deliverPendingTable
  4. If putting in the queue fails , Indicates that flow control or blocking occurs at this time , Need to wait for a while and then re deliver
  5. If all are delivered successfully , take offset Update to the currently posted message offset + 1, Indicates the next time from the next offset Start scanning

3.2 Post process status update thread

Each delay level is at handleExecutorService A status update thread started in the thread pool , Per thread execution HandlePutResultTask Mission . Again , This task is not a periodic task , Instead, start a new task at the end of a task .

HandlePutResultTask The task continuously obtains the asynchronous delivery process object from the blocking queue header , Judge its status

  • If the delivery is successful , to update offset And statistics , And remove the post task from the queue
  • If it is being delivered , No action
  • If the delivery is wrong , Retry or skip directly depending on whether automatic retry is configured
  • Synchronous delivery is adopted for retry delivery , If the delivery is successful, it will be updated offset And statistics , Then remove ; Otherwise, continue to redeliver

Wait for a while after all tasks are scanned , Execute the new HandlePutResultTask Mission .

4. The source code parsing

4.1 Scheduled message delivery task

public void executeOnTimeup() { // according to delayLevel Find the corresponding delay message ConsumeQueue ConsumeQueue cq = ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, delayLevel2QueueId(delayLevel)); if (cq == null) { this.scheduleNextTimerTask(this.offset, DELAY_FOR_A_WHILE); return; } // according to ConsumeQueue Effective delay message logic for offset, Get all valid messages SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset); if (bufferCQ == null) { long resetOffset; if ((resetOffset = cq.getMinOffsetInQueue()) > this.offset) { log.error("schedule CQ offset invalid. offset={}, cqMinOffset={}, queueId={}", this.offset, resetOffset, cq.getQueueId()); } else if ((resetOffset = cq.getMaxOffsetInQueue()) < this.offset) { log.error("schedule CQ offset invalid. offset={}, cqMaxOffset={}, queueId={}", this.offset, resetOffset, cq.getQueueId()); } else { resetOffset = this.offset; } this.scheduleNextTimerTask(resetOffset, DELAY_FOR_A_WHILE); return; } long nextOffset = this.offset; try { int i = 0; ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit(); // Traverse ConsumeQueue All valid messages in for (; i < bufferCQ.getSize() && isStarted(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) { // obtain ConsumeQueue Three key attributes of the index long offsetPy = bufferCQ.getByteBuffer().getLong(); int sizePy = bufferCQ.getByteBuffer().getInt(); long tagsCode = bufferCQ.getByteBuffer().getLong(); if (cq.isExtAddr(tagsCode)) { if (cq.getExt(tagsCode, cqExtUnit)) { tagsCode = cqExtUnit.getTagsCode(); } else { //can't find ext content.So re compute tags code. log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}", tagsCode, offsetPy, sizePy); long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy); tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime); } } // ConsumeQueue Inside tagsCode It is actually a point in time ( Delivery time point ) long now = System.currentTimeMillis(); long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode); nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE); // If it is time for delivery , Deliver a message // If it is not time for delivery , Continue to create a scheduled task ,countdown Seconds later long countdown = deliverTimestamp - now; if (countdown > 0) { this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE); return; } MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(offsetPy, sizePy); if (msgExt == null) { continue; } MessageExtBrokerInner msgInner = ScheduleMessageService.this.messageTimeup(msgExt); if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) { log.error("[BUG] the real topic of schedule msg is {}, discard the msg. msg={}", msgInner.getTopic(), msgInner); continue; } // Resend the message to CommitLog boolean deliverSuc; if (ScheduleMessageService.this.enableAsyncDeliver) { // Asynchronous delivery deliverSuc = this.asyncDeliver(msgInner, msgExt.getMsgId(), offset, offsetPy, sizePy); } else { // Synchronous delivery deliverSuc = this.syncDeliver(msgInner, msgExt.getMsgId(), offset, offsetPy, sizePy); } // Delivery failed ( Flow control 、 Blocking 、 Abnormal delivery and other reasons ), wait for 0.1s Execute the delivery task again if (!deliverSuc) { this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE); return; } } nextOffset = this.offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE); } catch (Exception e) { log.error("ScheduleMessageService, messageTimeup execute error, offset = {}", nextOffset, e); } finally { bufferCQ.release(); } // This article ConsumeQueue If the message corresponding to the index is not delivered , Then create a scheduled task , Execute at the time of delivery // If there are undelivered messages , After creating a scheduled task, you can directly return to this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);}
private boolean asyncDeliver(MessageExtBrokerInner msgInner, String msgId, long offset, long offsetPy, int sizePy) { Queue<PutResultProcess> processesQueue = ScheduleMessageService.this.deliverPendingTable.get(this.delayLevel); //Flow Control Flow control , If the number of elements in the blocking queue is greater than the threshold, flow control will be triggered int currentPendingNum = processesQueue.size(); int maxPendingLimit = ScheduleMessageService.this.defaultMessageStore.getMessageStoreConfig() .getScheduleAsyncDeliverMaxPendingLimit(); if (currentPendingNum > maxPendingLimit) { log.warn("Asynchronous deliver triggers flow control, " + "currentPendingNum={}, maxPendingLimit={}", currentPendingNum, maxPendingLimit); return false; } //Blocked Blocking , If there is a post task, try again 3 More than once , Block message delivery of this delay level , Until the task is successfully delivered PutResultProcess firstProcess = processesQueue.peek(); if (firstProcess != null && firstProcess.need2Blocked()) { log.warn("Asynchronous deliver block. info={}", firstProcess.toString()); return false; } PutResultProcess resultProcess = deliverMessage(msgInner, msgId, offset, offsetPy, sizePy, true); processesQueue.add(resultProcess); return true;}

4.2 Asynchronous delivery process status update task

public void run() { LinkedBlockingQueue<PutResultProcess> pendingQueue = ScheduleMessageService.this.deliverPendingTable.get(this.delayLevel); PutResultProcess putResultProcess; // Loop to get the first delivery task in the queue , View its execution status and perform corresponding operations while ((putResultProcess = pendingQueue.peek()) != null) { try { switch (putResultProcess.getStatus()) { case SUCCESS: // Message delivered successfully , Remove the post task from the queue ScheduleMessageService.this.updateOffset(this.delayLevel, putResultProcess.getNextOffset()); pendingQueue.remove(); break; case RUNNING: // Posting , No operation break; case EXCEPTION: // Delivery error if (!isStarted()) { log.warn("HandlePutResultTask shutdown, info={}", putResultProcess.toString()); return; } log.warn("putResultProcess error, info={}", putResultProcess.toString()); // onException Method execution retry putResultProcess.onException(); break; case SKIP: // skip , Remove directly from the queue log.warn("putResultProcess skip, info={}", putResultProcess.toString()); pendingQueue.remove(); break; } } catch (Exception e) { log.error("HandlePutResultTask exception. info={}", putResultProcess.toString(), e); putResultProcess.onException(); } } // wait for 0.01s, Continue with the next scan if (isStarted()) { ScheduleMessageService.this.handleExecutorService .schedule(new HandlePutResultTask(this.delayLevel), DELAY_FOR_A_SLEEP, TimeUnit.MILLISECONDS); }}
private void resend() { log.info("Resend message, info: {}", this.toString()); // Gradually increase the resend interval. try { Thread.sleep(Math.min(this.resendCount++ * 100, 60 * 1000)); } catch (InterruptedException e) { e.printStackTrace(); } try { // from CommitLog Query the complete information of the message in MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(this.physicOffset, this.physicSize); // If the query fails , Check the number of retries , If arrive 6 Then print the log and skip the message if (msgExt == null) { log.warn("ScheduleMessageService resend not found message. info: {}", this.toString()); this.status = need2Skip() ? ProcessStatus.SKIP : ProcessStatus.EXCEPTION; return; } MessageExtBrokerInner msgInner = ScheduleMessageService.this.messageTimeup(msgExt); // Synchronous delivery PutMessageResult result = ScheduleMessageService.this.writeMessageStore.putMessage(msgInner); // Update status based on results this.handleResult(result); if (result != null && result.getPutMessageStatus() == PutMessageStatus.PUT_OK) { log.info("Resend message success, info: {}", this.toString()); } } catch (Exception e) { this.status = ProcessStatus.EXCEPTION; log.error("Resend message error, info: {}", this.toString(), e); }}

Welcome to the official account 【 Message middleware 】, Update the source code analysis and latest developments of message oriented middleware !

This article by the blog one article many sends the platform OpenWrite Release !

copyright:author[Scarb],Please bring the original link to reprint, thank you. https://en.javamana.com/2022/175/202206240638202418.html