L._ l 2022-01-26 15:43:27 阅读数:434
The following are some of the contents sorted out in the study , If there is any mistake , Thank you for pointing out .
It can be used to perform asynchronous tasks or periodic tasks after a given delay , Because the task put in may not be able to be executed immediately , So it still needs to be put in the queue , And get , See if the execution conditions are met : Whether the time is enough .
public interface ScheduledExecutorService extends ExecutorService {
// In delay delay Execute after time command.unit In units of time . Schedule only once
public ScheduledFuture<?> schedule(Runnable command,long delay, TimeUnit unit);
// perform callable.
public <V> ScheduledFuture<V> schedule(Callable<V> callable,long delay, TimeUnit unit);
// be based on Last start time To delay the next task after a fixed time
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit);
// be based on Last end time
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
}
public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService {
// Periodic tasks should be cancelled when closing
private volatile boolean continueExistingPeriodicTasksAfterShutdown;
// If you should cancel aperiodic tasks when closing
private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
// Whether it should be deleted from the queue
private volatile boolean removeOnCancel = false;
// Sequence number , Guarantee FIFO
private static final AtomicLong sequencer = new AtomicLong();
// ScheduledFutureTask class For encapsulation Runnable Callable object
private class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> {
// Serial number
private final long sequenceNumber;
// In nanoseconds , Indicates the time when the task can be scheduled next
private long time;
// The cycle of repeating tasks
private final long period;
// By reExecutePeriodic Method to rejoin the actual task in the queue , Default to current task
RunnableScheduledFuture<V> outerTask = this;
// Index of the delay queue
int heapIndex;
// Used to cancel task execution
public boolean cancel(boolean mayInterruptIfRunning) {
boolean cancelled = super.cancel(mayInterruptIfRunning);
// If the cancellation is successful , Then remove the task from the task queue
if (cancelled && removeOnCancel && heapIndex >= 0)
remove(this);
return cancelled;
}
public void run() {
boolean periodic = isPeriodic();// Is it periodic
// Determine whether you can continue
if (!canRunInCurrentRunState(periodic))
cancel(false);
else if (!periodic)// Not a periodic scheduled task , Call directly run
ScheduledFutureTask.super.run(); //FutureTask Of run Method
else if (ScheduledFutureTask.super.runAndReset()) {
// It's a periodic scheduling task runAndReset Method execution
setNextRunTime(); // Set the next scheduling time
reExecutePeriodic(outerTask);// Scheduling through this
}
}
private void setNextRunTime() {
long p = period;
if (p > 0)//p>0 The time when the task began to execute + Periodic scheduling time
time += p;
else
// If period Less than 0 , Calculate the next execution time based on the time after the task is completed
time = triggerTime(-p);
}
// Put the task back into the task queue for execution
void reExecutePeriodic(RunnableScheduledFuture<?> task) {
// Based on the current thread pool state , Judge whether the current task is allowed to be executed
if (canRunInCurrentRunState(true)) {
super.getQueue().add(task); // Add tasks to the delay queue
if (!canRunInCurrentRunState(true) && remove(task))// Judge again whether it should be executed
task.cancel(false); // Cancel task execution
else
// Under normal circumstances , Ensure that at least one worker thread in the thread pool is processing tasks
ensurePrestart();
}
}
}
// Implementation principle of delay queue :DelayedWorkQueue : Because they are all periodic tasks With time Sort it out Using a small root pile
static class DelayedWorkQueue extends AbstractQueue<Runnable> implements BlockingQueue<Runnable> {
// The initial capacity is 16
private static final int INITIAL_CAPACITY = 16;
// Task queue Array
private RunnableScheduledFuture<?>[] queue = new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
private final ReentrantLock lock = new ReentrantLock();
private int size = 0;
// Threads Leader
private Thread leader = null;
// Condition variables are used for worker threads waiting to execute tasks
private final Condition available = lock.newCondition();
}
}
Indicates periodic task scheduling , Each task determines the next start time based on the start time of the last task
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
//triggerTime Used to calculate the time when the task should be scheduled ,unit.toNanos(period) Used to change the execution cycle to nanoseconds
ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command,null,triggerTime(initialDelay, unit),
unit.toNanos(period));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
// Encapsulation is ScheduledFutureTask object , adopt decorateTask, call delayedExecute Method execution
protected <V> RunnableScheduledFuture<V> decorateTask(
Callable<V> callable, RunnableScheduledFuture<V> task) {
return task;
}
// The main method of executing periodic scheduling or delaying tasks .
private void delayedExecute(RunnableScheduledFuture<?> task) {
if (isShutdown())//SHURDOWN 了 ?
reject(task);
else {
super.getQueue().add(task); // Add to the queue
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&// Whether the current thread is shutdown In the state of
remove(task))// by false Under the circumstances Remove from the current queue Then cancel the current task
task.cancel(false);
else
// Make sure at least one thread is still executing the task
ensurePrestart();
}
}
If the thread pool is not closed, then ensurePrestart Method
// The guarantee is to start at least one core thread
void ensurePrestart() {
int wc = workerCountOf(ctl.get());// Gets the number of worker threads
if (wc < corePoolSize)// If it is less than Number of core threads
addWorker(null, true);// add to Number of core threads
else if (wc == 0)// At least one worker thread is guaranteed
addWorker(null, false);
}
If the thread is not closed , Then call directly DelayedWorkQueue Inside add -> offer Method , Task task Put it in the queue
public boolean add(Runnable e) {
return offer(e);
}
public boolean offer(Runnable x) {
// Add tasks to the array
if (x == null)
throw new NullPointerException();
RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
final ReentrantLock lock = this.lock;
lock.lock();
try {
// Do you want to expand
int i = size;
if (i >= queue.length)
grow();
size = i + 1;
// There was no task before , Just put it first
if (i == 0) {
queue[0] = e;
setIndex(e, 0);
} else {
// Otherwise, put the task last , And then through siftUp Method adjustment
siftUp(i, e);
}
// If the first task in the queue is the current e Then remove leader Threads , Then wake up a thread available in the waiting queue to perform the task
if (queue[0] == e) {
leader = null;
available.signal();
}
} finally {
lock.unlock();
}
return true;
}
private void grow() {
int oldCapacity = queue.length;
// Every expansion 50%
int newCapacity = oldCapacity + (oldCapacity >> 1);
if (newCapacity < 0) // If the new capacity is less than 0, Then it means that there is an overflow
newCapacity = Integer.MAX_VALUE;
// take old Array task Copy to the trust array
queue = Arrays.copyOf(queue, newCapacity);
}
// Adjust the heap
private void siftUp(int k, RunnableScheduledFuture<?> key) {
// When K>0 when , Keep adjusting .k be equal to 0 Indicates that the root node has been adjusted , That is, the first element , At this point, you must exit the loop
while (k > 0) {
// Find him Father node
int parent = (k - 1) >>> 1;
RunnableScheduledFuture<?> e = queue[parent];
// Compare with parent node See if you need to move , Until I find Location
if (key.compareTo(e) >= 0)
break;
queue[k] = e;
setIndex(e, k);
k = parent;
}
queue[k] = key;
setIndex(key, k);
}
If after adding to the priority queue , Thread pool is closed , Need to pass through To determine whether the task should continue
/* Judge by whether it is a periodic task continueExistingPeriodicTasksAfterShutdown( Thread pool shutdown Whether to perform periodic tasks after ) executeExistingDelayedTasksAfterShutdown( Thread pool shutdown Whether to execute delayed tasks after ) Whether the task should continue */
boolean canRunInCurrentRunState(boolean periodic) {
return isRunningOrShutdown(periodic ?
continueExistingPeriodicTasksAfterShutdown :
executeExistingDelayedTasksAfterShutdown);
}
// Get the task from the queue , If there are no tasks in the current queue , Then block until there are tasks in the queue , Wait offer Method wake up
public RunnableScheduledFuture<?> take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();// It can be locked in response to interruption
try {
for (;;) {
RunnableScheduledFuture<?> first = queue[0];
// If the first task in the queue is null, It proves that there is no task , The current thread is waiting
if (first == null)
available.await();
else {
// Otherwise, get the remaining waiting time of the first task , Judge whether it is less than 0. Need to execute
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
return finishPoll(first);
first = null; // Do not keep references while the thread is waiting for the task to execute
// Because the task still needs to wait for some time to execute , At this time, check whether there is a thread waiting in front of you , If there is , Then the current thread continues to wait
if (leader != null)// There is no need to make the thread that gets the first task wait
available.await();
else {
/* If there is no thread waiting ahead , Set yourself to leader Threads , Then start waiting delay Time At this time, if another thread gets the task , Can only make this leader The thread delay is awakened */
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
// take leader Variables are removed
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
/* When the task is acquired by the thread , Judge leader Whether it is empty and the queue is not empty , Because there is no thread to wait or get the next task in the queue , Therefore, you need to wake up a thread to act as leader Wait or get the next task */
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}
// Finish the final task and leave the team , Here comes in f Wait for the first task . Out of the team due to the task , Therefore, the reactor needs to be adjusted
private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
// The number of tasks in the current task queue --, Then take out a task at the end of the queue , call siftDown Reorder the heap
int s = --size;
RunnableScheduledFuture<?> x = queue[s];
queue[s] = null;
if (s != 0)
siftDown(0, x);
setIndex(f, -1);
return f;
}
// Adjust the heap . Move the task to the end of the heap .
private void siftDown(int k, RunnableScheduledFuture<?> key) {
int half = size >>> 1;
while (k < half) {
int child = (k << 1) + 1; // Get the left child node index
RunnableScheduledFuture<?> c = queue[child];
int right = child + 1; // The right child
if (right < size && c.compareTo(queue[right]) > 0)// Compare the size of the left and right children
c = queue[child = right];
if (key.compareTo(c) <= 0)// To control children's min Come and join us key Compare
break;
queue[k] = c; // If key Less than their min Then exchange
setIndex(c, k);
k = child;
}
queue[k] = key; // here k That is, the incoming key Subscript that should be stored
setIndex(key, k);
}
scheduleAtFixedRate and scheduleWithFixedDelay, The former is calculated based on the task start time , the latter yes Calculated based on the completion time of the previous task
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (delay <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(-delay));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
ThreadPoolExecutor Of shutdown ->onShutdown
@Override void onShutdown() {
// First, get the task queue q
BlockingQueue<Runnable> q = super.getQueue();
// Default true
boolean keepDelayed =
getExecuteExistingDelayedTasksAfterShutdownPolicy();// Flag indicating whether to continue to execute delayed tasks after the thread pool is closed
// Default false
boolean keepPeriodic =
getContinueExistingPeriodicTasksAfterShutdownPolicy();// Flag whether to continue to execute periodic tasks after the thread pool is closed
// Determine whether the process pool shutdown Then continue to perform the delayed task , Whether to continue to execute periodic scheduling tasks
if (!keepDelayed && !keepPeriodic) {
// Are not , Empty the task queue , Cancel task execution at the same time
for (Object e : q.toArray())
if (e instanceof RunnableScheduledFuture<?>)
((RunnableScheduledFuture<?>) e).cancel(false);
q.clear();
}
else {
// Otherwise, traverse the task queue , Deal with periodic tasks and delayed tasks respectively
for (Object e : q.toArray()) {
if (e instanceof RunnableScheduledFuture) {
RunnableScheduledFuture<?> t =
(RunnableScheduledFuture<?>)e;
if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
t.isCancelled()) {
// there t.isCancelled() Indicates that the task has been cancelled , Should be removed
if (q.remove(t))
t.cancel(false);
}
}
}
}
tryTerminate();// Call this method to attempt to further transition the thread pool state
}
copyright:author[L._ l],Please bring the original link to reprint, thank you. https://en.javamana.com/2022/01/202201261543220890.html