The zookeeper article for elegantly implementing delayed tasks

TimeFriends 2022-08-15 17:59:37 阅读数:933

zookeeperarticleelegantlyimplementingdelayed

前言

在《优雅实现延时任务之Redis篇》一文中提到,To realize the key point of delay,Is the description of the storage tasks and task execution time,Can also according to the task execution time sorting,So we can usezookeeperTo implement the delay呢?答案当然是肯定的.要知道,zookeeper的znodeCan also be used to store data,Then we can take advantage of this to delay tasks.实际上,著名的zookeeper客户端curator就提供了基于zookeeper的延时任务API,Today, from the perspective of source under the belt you knowcurator是如何使用zookeeperImplementation delay task.不过需要提前说明的是,使用zookeeperImplementation delay task is not a good choice,At least not grace,Title of elegant implementation delay task just to and handled in the echo,关于使用zookeeperTo realize the shortcomings of delay,后文我会详细解释.

上手curator

关于zookeeperInstall and use here is not introduced,Also push related articles before,如果对zookeeper不了解的,Can turn under the historical record.接下来直接进入主题,First of all, to experience acurator的延时任务API.

The first is task consumers:

public class DelayTaskConsumer implements QueueConsumer<String>{
@Override
public void consumeMessage(String message) throws Exception {
System.out.println(MessageFormat.format("发布资讯.id - {0} , timeStamp - {1} , " +
"threadName - {2}",message,System.currentTimeMillis(),Thread.currentThread().getName()));
}
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState) {
System.out.println(MessageFormat.format("State change . New State is - {0}",newState));
}
}

curator的消费者需要实现QueueConsumer接口,Here we have to do is to get the task description logic(这里简单起见,Task description is the informationid),And then release the corresponding information.

Look at the next task producers:

public class DelayTaskProducer {
private static final String CONNECT_ADDRESS="study-machine:32783";
private static final int SESSION_OUTTIME = 5000;
private static final String NAMESPACE = "delayTask";
private static final String QUEUE_PATH = "/queue";
private static final String LOCK_PATH = "/lock";
private CuratorFramework curatorFramework;
private DistributedDelayQueue<String> delayQueue;
{
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
curatorFramework= CuratorFrameworkFactory.builder().connectString(CONNECT_ADDRESS)
.sessionTimeoutMs(SESSION_OUTTIME).retryPolicy(retryPolicy)
.namespace(NAMESPACE).build();
curatorFramework.start();
delayQueue= QueueBuilder.builder(curatorFramework, new DelayTaskConsumer(),
new DelayTaskSerializer(), QUEUE_PATH).lockPath(LOCK_PATH).buildDelayQueue();
try {
delayQueue.start();
}catch (Exception e){
e.printStackTrace();
}
}
public void produce(String id,long timeStamp){
try {
delayQueue.put(id,timeStamp);
}catch (Exception e){
e.printStackTrace();
}
}
}

Task producers mainly include2个逻辑,One is the initialization in the tectonic blockcuratorThe time delay of task queue,Another is to provide aproduceMethod for external put into the queue delay task.

In the initial delay tasks,Need to pass in a byte array and task description between entities serializer,Here will be treated as task description into a string:

public class DelayTaskSerializer implements QueueSerializer<String> {
@Override
public byte[] serialize(String item) {
return item.getBytes();
}
@Override
public String deserialize(byte[] bytes) {
return new String(bytes);
}
}

Finally, write a client measure:

public class DelayTaskTest {
public static void main(String[] args) throws Exception{
DelayTaskProducer producer=new DelayTaskProducer();
long now=new Date().getTime();
System.out.println(MessageFormat.format("start time - {0}",now));
producer.produce("1",now+TimeUnit.SECONDS.toMillis(5));
producer.produce("2",now+TimeUnit.SECONDS.toMillis(10));
producer.produce("3",now+TimeUnit.SECONDS.toMillis(15));
producer.produce("4",now+TimeUnit.SECONDS.toMillis(20));
producer.produce("5",now+TimeUnit.SECONDS.toMillis(2000));
TimeUnit.HOURS.sleep(1);
}
}

客户端比较简单,Is put into the delay queue5个任务,One final task execution time later,Mainly to observecuratorHow tozookeeperPut some what.运行程序,结果如下:

接下来我们看下zookeeperWhat saved the information:

[zk: localhost(CONNECTED) 2] ls /
[delayTask, zookeeper]

其中,zookeeper节点是zookeeper自带的,除了zookeeper之后,还有一个delayTask节点,Is the node in the producer we set the namespaceNAMESPACE.因为同一个zookeeperCluster may be used for different delay queue,NAMESPACEThe role of is used to distinguish different delay queue.再看看NAMESPACE里是啥:

[zk: localhost(CONNECTED) 3] ls /delayTask
[lock, queue]

可以看到,有2个子节点:lock跟queue,We set in our producers are distributed lock pathLOCK_PATHAnd the queue pathQUEUE_PATH.Because the same delay queue might be different threads to monitor,So in order to guarantee the task only by a thread,zookeeperUntil the task you need to apply for to the distributed lock to perform a task.接下来我们重点看下queueWhat's under the node:

[zk: localhost(CONNECTED) 7] ls /delayTask/queue
[queue-|165B92FCD69|0000000014]

Find there is only a child node,We should guess is that we just put delay queue inside haven't performed tasks,We went on to look at this child nodes there are no children:

[zk: localhost(CONNECTED) 8] ls /delayTask/queue/queue-|165B92FCD69|0000000014
[]

发现没有了.

那我们就看看queue-|165B92FCD69|0000000014What data was put inside the node:

[zk: localhost(CONNECTED) 9] get /delayTask/queue/queue-|165B92FCD69|0000000014
5
cZxid = 0x3d
ctime = Sat Sep 08 12:20:41 GMT 2018
mZxid = 0x3d
mtime = Sat Sep 08 12:20:41 GMT 2018
pZxid = 0x3d
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 11
numChildren = 0

Can be found on the task description,也就是资讯id——5.Here we will know,zookeeperUnder the task description on the corresponding task node,What about where to put the task execution time?由于queue-|165B92FCD69|0000000014并没有子节点,So we can assume that the task execution time on the node name.Observe the node name,queue只是一个前缀,没什么信息量.0000000014Should be the node number(Here also can guesszookeeperUsed to store task node is order).那么就只剩下165B92FCD69了,This doesn't look like a timestamp or date,But there is a letter,You can guess is timestamp hexadecimal representation.We convert it to a decimal to see:

@Test
public void test(){
long number = Long.parseLong("165B92FCD69", 16);
System.out.println(number);
System.out.println(new Date(number));
}

可以转化为十进制,Then put the decimal number into a date,Really is in the beginning we set the task execution time.So they probably clearcurator是怎么利用zookeeperTo store the delay task:The task execution time stored in the node name,The task description is stored in the node in the corresponding data.

So what is this?接下来我们看下curator的源码就知道了.

curator源码解析

1.DistributedDelayQueue类

curatorThe entrance to delay task isDistributedDelayQueue类的start方法了.我们先不说start方法,先来看看DistributedDelayQueue类有哪些属性:

private final DistributedQueue<T> queue;
DistributedDelayQueue
(
CuratorFramework client,
QueueConsumer<T> consumer,
QueueSerializer<T> serializer,
String queuePath,
ThreadFactory threadFactory,
Executor executor,
int minItemsBeforeRefresh,
String lockPath,
int maxItems,
boolean putInBackground,
int finalFlushMs
)
{
Preconditions.checkArgument(minItemsBeforeRefresh >= 0, "minItemsBeforeRefresh cannot be negative");
queue = new DistributedQueue<T>
(
client,
consumer,
serializer,
queuePath,
threadFactory,
executor,
minItemsBeforeRefresh,
true,
lockPath,
maxItems,
putInBackground,
finalFlushMs
)
{
@Override
protected long getDelay(String itemNode)
{
return getDelay(itemNode, System.currentTimeMillis());
}
private long getDelay(String itemNode, long sortTime)
{
long epoch = getEpoch(itemNode);
return epoch - sortTime;
}
@Override
protected void sortChildren(List<String> children)
{
final long sortTime = System.currentTimeMillis();
Collections.sort
(
children,
new Comparator<String>()
{
@Override
public int compare(String o1, String o2)
{
long diff = getDelay(o1, sortTime) - getDelay(o2, sortTime);
return (diff < 0) ? -1 : ((diff > 0) ? 1 : 0);
}
}
);
}
};
}

Capture part of the code here.实际上DistributedDelayQueue里只有一个queue属性,queue属性是DistributedQueue类的实例,Can see from the name it is a distributed queue.不过DistributedDelayQueue里的queue比较特殊,其是DistributedQueueAn anonymous inner class instance class,This anonymous subclasses override theDistributedQueue的部分方法,如:getDelay、sortChildren等.这一点很重要,At the back of the code will use this2个方法.

2.DistributedDelayQueue的入口start方法

接下来我们就来看下DistributedDelayQueue的入口start方法:

/**
* Start the queue. No other methods work until this is called
*
* @throws Exception startup errors
*/
@Override
public void start() throws Exception
{
queue.start();
}

可以看到,其调用的是queue的start方法.我们跟进去看看:

@Override
public void start() throws Exception
{
if ( !state.compareAndSet(State.LATENT, State.STARTED) )
{
throw new IllegalStateException();
}
try
{
client.create().creatingParentContainersIfNeeded().forPath(queuePath);
}
catch ( KeeperException.NodeExistsException ignore )
{
// this is OK
}
if ( lockPath != null )
{
try
{
client.create().creatingParentContainersIfNeeded().forPath(lockPath);
}
catch ( KeeperException.NodeExistsException ignore )
{
// this is OK
}
}
if ( !isProducerOnly || (maxItems != QueueBuilder.NOT_SET) )
{
childrenCache.start();
}
if ( !isProducerOnly )
{
service.submit
(
new Callable<Object>()
{
@Override
public Object call()
{
runLoop();
return null;
}
}
);
}
}

This method is first check status,Then create some nodes must be,如前面的queue节点和lockNode is created here.

由于我们创建queueWhen introduced to the consumer,所以这里isProducerOnly为true,故以下2Branch code will perform:

if ( !isProducerOnly || (maxItems != QueueBuilder.NOT_SET) )
{
childrenCache.start();
}
if ( !isProducerOnly )
{
service.submit
(
new Callable<Object>()
{
@Override
public Object call()
{
runLoop();
return null;
}
}
);
}

2.1.childrenCache.start()

Let's look at the first branch:

childrenCache.start();

从名字上看,这个childrenCacheShould be the child nodes cache,我们进到start方法里看看:

void start() throws Exception
{
sync(true);
}

调的是sync方法,我们跟进去看看:

private synchronized void sync(boolean watched) throws Exception
{
if ( watched )
{
client.getChildren().usingWatcher(watcher).inBackground(callback).forPath(path);
}
else
{
client.getChildren().inBackground(callback).forPath(path);
}
}

这里watched为true,So will go the first branch.The role of the first branch code is in the background to takepathThe child nodes of the path,这里的path就是我们配置的queue_path.After get a child node,会调用callback里的回调方法.我们看下这里的callback做了什么:

private final BackgroundCallback callback = new BackgroundCallback()
{
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
{
if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
{
setNewChildren(event.getChildren());
}
}
};

可以看到,When a child node,会去调用setNewChildren方法.我们继续跟进去:

private synchronized void setNewChildren(List<String> newChildren)
{
if ( newChildren != null )
{
Data currentData = children.get();
children.set(new Data(newChildren, currentData.version + 1));
notifyFromCallback();
}
}

Here is the traverse to the child node on the cache,并调用notifyFromCallback方法:

private synchronized void notifyFromCallback()
{
notifyAll();
}

Here is the wakes up all threads waiting for.Since have wake up,Then there must be waiting for.继续看ChildrenCache类的其他方法,发现在blockingNextGetData方法中,调用了wait方法:

synchronized Data blockingNextGetData(long startVersion, long maxWait, TimeUnit unit) throws InterruptedException
{
long startMs = System.currentTimeMillis();
boolean hasMaxWait = (unit != null);
long maxWaitMs = hasMaxWait ? unit.toMillis(maxWait) : -1;
while ( startVersion == children.get().version )
{
if ( hasMaxWait )
{
long elapsedMs = System.currentTimeMillis() - startMs;
long thisWaitMs = maxWaitMs - elapsedMs;
if ( thisWaitMs <= 0 )
{
break;
}
wait(thisWaitMs);
}
else
{
wait();
}
}
return children.get();
}

当blockingNextGetData方法被调用时,Will try to sleep,When a child node comes,Waiting thread will be awakened,And then return the child nodes of the current.这个blockingNextGetDataMethods also see behind.

2.2.runLoop方法

接下来我们看下startMethods the last piece of code:

service.submit
(
new Callable<Object>()
{
@Override
public Object call()
{
runLoop();
return null;
}
}
);

This code is submitted to the thread pool aCallable,主要逻辑是runLoop方法.我们进到runLoop方法里看看:

private void runLoop()
{
long currentVersion = -1;
long maxWaitMs = -1;
try
{
while ( state.get() == State.STARTED )
{
try
{
ChildrenCache.Data data = (maxWaitMs > 0) ? childrenCache.blockingNextGetData(currentVersion, maxWaitMs, TimeUnit.MILLISECONDS) : childrenCache.blockingNextGetData(currentVersion);
currentVersion = data.version;
List<String> children = Lists.newArrayList(data.children);
sortChildren(children); // makes sure items are processed in the correct order
if ( children.size() > 0 )
{
maxWaitMs = getDelay(children.get(0));
if ( maxWaitMs > 0 )
{
continue;
}
}
else
{
continue;
}
processChildren(children, currentVersion);
}
catch ( InterruptedException e )
{
// swallow the interrupt as it's only possible from either a background
// operation and, thus, doesn't apply to this loop or the instance
// is being closed in which case the while test will get it
}
}
}
catch ( Exception e )
{
log.error("Exception caught in background handler", e);
}
}

可以看到,runLoop方法就是一个死循环,As long as with the server status has beenSTARTED,This cycle will not quit.

First look at the code:

ChildrenCache.Data data = (maxWaitMs > 0) ?
childrenCache.blockingNextGetData(currentVersion, maxWaitMs, TimeUnit.MILLISECONDS) :
childrenCache.blockingNextGetData(currentVersion);

This line of code is longer,I put him into multiple lines.This code is mainly to obtain child node,前面说了,当调用blockingNextGetData方法时,会先等待,Until a new child node,才会调用notifyAll唤醒等待线程.

Get a child node pair after the node list is sorted:

sortChildren(children); // makes sure items are processed in the correct order

sortChildren方法是DistributedQueue类的方法,In the beginning analysisDistributedDelayQueueClass when it comes to,DistributedDelayQueue类中的queueIs an anonymous inner class instance,其重写了getDelay和sortChildren等方法,So we should see the rewritegetDelay和sortChildren是怎样的,由于sortChildren方法依赖getDelay方法,Therefore we first take a look atgetDelay方法:

@Override
protected long getDelay(String itemNode)
{
return getDelay(itemNode, System.currentTimeMillis());
}

其会去调用getDelay私有方法,At the same time introduced to current timestamp:

private long getDelay(String itemNode, long sortTime)
{
long epoch = getEpoch(itemNode);
return epoch - sortTime;
}

getDelayPrivate method to call againgetEpoch方法:

private static long getEpoch(String itemNode)
{
int index2 = itemNode.lastIndexOf(SEPARATOR);
int index1 = (index2 > 0) ? itemNode.lastIndexOf(SEPARATOR, index2 - 1) : -1;
if ( (index1 > 0) && (index2 > (index1 + 1)) )
{
try
{
String epochStr = itemNode.substring(index1 + 1, index2);
return Long.parseLong(epochStr, 16);
}
catch ( NumberFormatException ignore )
{
// ignore
}
}
return 0;
}

getEpochMethod is to parse the child node name,Take you see in front of thezookeeperThe child nodes of the queue directory name,是这种形式的:queue-|165B92FCD69|0000000014.The role of this method is to divide the task execution time stamp to parse it out,Is that in the middle of the string.Get a string and then convert hex to decimal:

Long.parseLong(epochStr, 16);

This validation before we guess:curatorWill the task execution time encoded in hex on the node name.As for why encoded in hex,Personally think that should be in order to save the length of the string.

We return to a private methodgetDelay:

private long getDelay(String itemNode, long sortTime)
{
long epoch = getEpoch(itemNode);
return epoch - sortTime;
}

To get the delay after task execution time stamp,With the current timestamp subtraction again,It is concluded that task execution time stamp with the current timestamp difference,This difference determines the task will not executed immediately,If the difference is less than or equal to0,The task has the execution time,Then executes the corresponding task.This difference, of course, there's a purpose,Is used for sorting,具体在sortChildren方法里面:

@Override
protected void sortChildren(List<String> children)
{
final long sortTime = System.currentTimeMillis();
Collections.sort
(
children,
new Comparator<String>()
{
@Override
public int compare(String o1, String o2)
{
long diff = getDelay(o1, sortTime) - getDelay(o2, sortTime);
return (diff < 0) ? -1 : ((diff > 0) ? 1 : 0);
}
}
);
}

这个sortChildrenMethod is to rewrite the method of anonymous inner class,According to the task execution time and the difference between the current timestamp sort,The earlier tasks at the top,So that you can guarantee the delay task is according to the execution time ordering from morning till night.

分析完了getDelay和sortChildren,我们再回到runLoop方法:

ChildrenCache.Data data = (maxWaitMs > 0) ? childrenCache.blockingNextGetData(currentVersion, maxWaitMs, TimeUnit.MILLISECONDS) : childrenCache.blockingNextGetData(currentVersion);
currentVersion = data.version;
List<String> children = Lists.newArrayList(data.children);
sortChildren(children); // makes sure items are processed in the correct order
if ( children.size() > 0 )
{
maxWaitMs = getDelay(children.get(0));
if ( maxWaitMs > 0 )
{
continue;
}
}
else
{
continue;
}
processChildren(children, currentVersion);

After the child node ascending order according to the execution time,Will get ahead of the first child node,Concludes that the child nodes of the execution time and the current timestamp difference is less than0,如果小于0,Then the execution time,Then it invokes the following line of code:

processChildren(children, currentVersion);

我们跟进去看看:

private void processChildren(List<String> children, long currentVersion) throws Exception
{
final Semaphore processedLatch = new Semaphore(0);
final boolean isUsingLockSafety = (lockPath != null);
int min = minItemsBeforeRefresh;
for ( final String itemNode : children )
{
if ( Thread.currentThread().isInterrupted() )
{
processedLatch.release(children.size());
break;
}
if ( !itemNode.startsWith(QUEUE_ITEM_NAME) )
{
log.warn("Foreign node in queue path: " + itemNode);
processedLatch.release();
continue;
}
if ( min-- <= 0 )
{
if ( refreshOnWatch && (currentVersion != childrenCache.getData().version) )
{
processedLatch.release(children.size());
break;
}
}
if ( getDelay(itemNode) > 0 )
{
processedLatch.release();
continue;
}
executor.execute
(
new Runnable()
{
@Override
public void run()
{
try
{
if ( isUsingLockSafety )
{
processWithLockSafety(itemNode, ProcessType.NORMAL);
}
else
{
processNormally(itemNode, ProcessType.NORMAL);
}
}
catch ( Exception e )
{
ThreadUtils.checkInterrupted(e);
log.error("Error processing message at " + itemNode, e);
}
finally
{
processedLatch.release();
}
}
}
);
}
processedLatch.acquire(children.size());
}

Use a semaphore hereSemaphoreEnsures that only when all the child nodes were traversal and process or thread was interrupted,这个方法才会返回.If a program is a single-threaded execution,So don't need to use a semaphore can also do this.But you see code will know,This method at the time of delay due mission is in thread pool execution inside,So we need to use semaphore to ensure that when all tasks are traversal and process,这个方法才返回.

We focus on delay task execution part:

executor.execute
(
new Runnable()
{
@Override
public void run()
{
try
{
if ( isUsingLockSafety )
{
processWithLockSafety(itemNode, ProcessType.NORMAL);
}
else
{
processNormally(itemNode, ProcessType.NORMAL);
}
}
catch ( Exception e )
{
ThreadUtils.checkInterrupted(e);
log.error("Error processing message at " + itemNode, e);
}
finally
{
processedLatch.release();
}
}
}
);

Because we at the time of initial delay queue incominglockPath ,So this branch will actually go to the following:

processWithLockSafety(itemNode, ProcessType.NORMAL);

从方法名可以看到,This approach is to use a lock way to deal with delay task.这里顺便提一句,好的代码是自解释的,We can only see the method name probably know that this method is what to do,That everyone at ordinary times when writing code to keep in mind that,Because I am in the company of the old system has seen a lot ofmethod1、method2Such method named.这里略去1万字……

我们进到processWithLockSafety方法里面去:

@VisibleForTesting
protected boolean processWithLockSafety(String itemNode, ProcessType type) throws Exception
{
String lockNodePath = ZKPaths.makePath(lockPath, itemNode);
boolean lockCreated = false;
try
{
client.create().withMode(CreateMode.EPHEMERAL).forPath(lockNodePath);
lockCreated = true;
String itemPath = ZKPaths.makePath(queuePath, itemNode);
boolean requeue = false;
byte[] bytes = null;
if ( type == ProcessType.NORMAL )
{
bytes = client.getData().forPath(itemPath);
requeue = (processMessageBytes(itemNode, bytes) == ProcessMessageBytesCode.REQUEUE);
}
if ( requeue )
{
client.inTransaction()
.delete().forPath(itemPath)
.and()
.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath(makeRequeueItemPath(itemPath), bytes)
.and()
.commit();
}
else
{
client.delete().forPath(itemPath);
}
return true;
}
catch ( KeeperException.NodeExistsException ignore )
{
// another process got it
}
catch ( KeeperException.NoNodeException ignore )
{
// another process got it
}
catch ( KeeperException.BadVersionException ignore )
{
// another process got it
}
finally
{
if ( lockCreated )
{
client.delete().guaranteed().forPath(lockNodePath);
}
}
return false;
}

This method will first apply for a distributed lock:

client.create().withMode(CreateMode.EPHEMERAL).forPath(lockNodePath);

Here to apply for lock is by creating a temporary node way,A task only corresponds to a node,所以只有一个zk客户端能够创建成功,That is to say, there is only one client can get a lock.

After get a lock is processing tasks,最后在finally块中释放分布式锁.

We focus on the processing tasks that a piece of:

requeue = (processMessageBytes(itemNode, bytes) == ProcessMessageBytesCode.REQUEUE);

我们进到processMessageBytes里面去:

private ProcessMessageBytesCode processMessageBytes(String itemNode, byte[] bytes) throws Exception
{
ProcessMessageBytesCode resultCode = ProcessMessageBytesCode.NORMAL;
MultiItem<T> items;
try
{
items = ItemSerializer.deserialize(bytes, serializer);
}
catch ( Throwable e )
{
ThreadUtils.checkInterrupted(e);
log.error("Corrupted queue item: " + itemNode, e);
return resultCode;
}
for(;;)
{
T item = items.nextItem();
if ( item == null )
{
break;
}
try
{
consumer.consumeMessage(item);
}
catch ( Throwable e )
{
ThreadUtils.checkInterrupted(e);
log.error("Exception processing queue item: " + itemNode, e);
if ( errorMode.get() == ErrorMode.REQUEUE )
{
resultCode = ProcessMessageBytesCode.REQUEUE;
break;
}
}
}
return resultCode;
}

千呼万唤始出来,Managed to see the task consumer code:

consumer.consumeMessage(item);

这里的consumerIs our initial delay task queue when introduced to consumers the task of.到这里curatorDelay task processing logic is all finished.Other details you can go to see the source code,这里就不细讲了.

总结

这里简单回顾下curatorTo realize logic delay:In the first place in production task,Put all the tasks in the same node below,The task execution time into the name of the child node,Task description on the child nodes ofdata中.The background will have a thread to sweep all child nodes under the corresponding queue node,After the client get the child nodes described task execution time and parse it out,According to the task execution time ordering from morning till night again,In turn due tasks,Processed to remove the child nodes of the corresponding.这就是curatorProcessing delay task roughly process.

前面说了,curatorImplementation delay task is not very elegant,Where is the specific not grace?首先,curatorThe task execution time sorting is not inzookeeper服务端完成的,But in the client,If say someone once tozookeeper里放了100M a delay task,那么curatorAlso will get the client all sorts,In the number of jobs more when there is certainly a problem.再者,zookeeperThe main purpose is not used to store,他不像MySQL或者Redis一样,Is designed to be storage system,zookeeperMore as a distributed coordination system,Storage is not his forte,So if you want to store the delay task a lot,用zookeeperIs not appropriate to do.

Have spent such a large space to introducecurator如何利用zookeeperTo implement the delay,是为了告诉大家,Not as long as there is a wheel can directly use it,If you don't care about the wheels is how to realize,That one day out of the question is do not know how to start.

{
resultCode = ProcessMessageBytesCode.REQUEUE;
break;
}
}
}
return resultCode;
}


千呼万唤始出来,Managed to see the task consumer code:

consumer.consumeMessage(item);

这里的consumerIs our initial delay task queue when introduced to consumers the task of.到这里curatorDelay task processing logic is all finished.Other details you can go to see the source code,这里就不细讲了.

总结

这里简单回顾下curatorTo realize logic delay:In the first place in production task,Put all the tasks in the same node below,The task execution time into the name of the child node,Task description on the child nodes ofdata中.The background will have a thread to sweep all child nodes under the corresponding queue node,After the client get the child nodes described task execution time and parse it out,According to the task execution time ordering from morning till night again,In turn due tasks,Processed to remove the child nodes of the corresponding.这就是curatorProcessing delay task roughly process.

前面说了,curatorImplementation delay task is not very elegant,Where is the specific not grace?首先,curatorThe task execution time sorting is not inzookeeper服务端完成的,But in the client,If say someone once tozookeeper里放了100M a delay task,那么curatorAlso will get the client all sorts,In the number of jobs more when there is certainly a problem.再者,zookeeperThe main purpose is not used to store,他不像MySQL或者Redis一样,Is designed to be storage system,zookeeperMore as a distributed coordination system,Storage is not his forte,So if you want to store the delay task a lot,用zookeeperIs not appropriate to do.

Have spent such a large space to introducecurator如何利用zookeeperTo implement the delay,是为了告诉大家,Not as long as there is a wheel can directly use it,If you don't care about the wheels is how to realize,That one day out of the question is do not know how to start.

About the delay of taskszookeeper,你学废了么?


真诚地邀请您加入我们的大家庭.
在这里不仅有技术知识分享,还有博主们之间的互帮互助
不定期发红包,每月更有抽奖环节,游戏机和实体书相赠(包邮)
让我们抱团取暖,抱团内卷.打造美好C站.期待您的加入.
备注 : CSDN-xxxxxx (xxxxxx代表你csdn的昵称)

copyright:author[TimeFriends],Please bring the original link to reprint, thank you. https://en.javamana.com/2022/227/202208151746244720.html