[JDK source code] - j.u.c-scheduledthreadpoolexecutor

L._ l 2022-01-26 15:43:27 阅读数:434

jdk source code j.u.c-scheduledthreadpoolexecutor scheduledthreadpoolexecutor

The following are some of the contents sorted out in the study , If there is any mistake , Thank you for pointing out .

ScheduledExecutorService

It can be used to perform asynchronous tasks or periodic tasks after a given delay , Because the task put in may not be able to be executed immediately , So it still needs to be put in the queue , And get , See if the execution conditions are met : Whether the time is enough .

ScheduledExecutorService Interface

public interface ScheduledExecutorService extends ExecutorService {

// In delay delay Execute after time command.unit In units of time . Schedule only once 
public ScheduledFuture<?> schedule(Runnable command,long delay, TimeUnit unit);
// perform callable.
public <V> ScheduledFuture<V> schedule(Callable<V> callable,long delay, TimeUnit unit);
// be based on Last start time To delay the next task after a fixed time 
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit);
// be based on Last end time 
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
}

ScheduledThreadPoolExecutor Core variables

public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService {

// Periodic tasks should be cancelled when closing 
private volatile boolean continueExistingPeriodicTasksAfterShutdown;
// If you should cancel aperiodic tasks when closing 
private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
// Whether it should be deleted from the queue 
private volatile boolean removeOnCancel = false;
// Sequence number , Guarantee FIFO
private static final AtomicLong sequencer = new AtomicLong();
// ScheduledFutureTask class For encapsulation Runnable Callable object 
private class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> {

// Serial number 
private final long sequenceNumber;
// In nanoseconds , Indicates the time when the task can be scheduled next 
private long time;
// The cycle of repeating tasks 
private final long period;
// By reExecutePeriodic Method to rejoin the actual task in the queue , Default to current task 
RunnableScheduledFuture<V> outerTask = this;
// Index of the delay queue 
int heapIndex;
// Used to cancel task execution 
public boolean cancel(boolean mayInterruptIfRunning) {

boolean cancelled = super.cancel(mayInterruptIfRunning);
// If the cancellation is successful , Then remove the task from the task queue 
if (cancelled && removeOnCancel && heapIndex >= 0)
remove(this);
return cancelled;
}
public void run() {

boolean periodic = isPeriodic();// Is it periodic 
// Determine whether you can continue 
if (!canRunInCurrentRunState(periodic))
cancel(false);
else if (!periodic)// Not a periodic scheduled task , Call directly run
ScheduledFutureTask.super.run(); //FutureTask Of run Method 
else if (ScheduledFutureTask.super.runAndReset()) {
// It's a periodic scheduling task runAndReset Method execution 
setNextRunTime(); // Set the next scheduling time 
reExecutePeriodic(outerTask);// Scheduling through this 
}
}
private void setNextRunTime() {

long p = period;
if (p > 0)//p>0 The time when the task began to execute + Periodic scheduling time 
time += p;
else
// If period Less than 0 , Calculate the next execution time based on the time after the task is completed 
time = triggerTime(-p);
}
// Put the task back into the task queue for execution 
void reExecutePeriodic(RunnableScheduledFuture<?> task) {

// Based on the current thread pool state , Judge whether the current task is allowed to be executed 
if (canRunInCurrentRunState(true)) {

super.getQueue().add(task); // Add tasks to the delay queue 
if (!canRunInCurrentRunState(true) && remove(task))// Judge again whether it should be executed 
task.cancel(false); // Cancel task execution 
else
// Under normal circumstances , Ensure that at least one worker thread in the thread pool is processing tasks 
ensurePrestart();
}
}
}
// Implementation principle of delay queue :DelayedWorkQueue : Because they are all periodic tasks With time Sort it out Using a small root pile 
static class DelayedWorkQueue extends AbstractQueue<Runnable> implements BlockingQueue<Runnable> {

// The initial capacity is 16
private static final int INITIAL_CAPACITY = 16;
// Task queue Array 
private RunnableScheduledFuture<?>[] queue = new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
private final ReentrantLock lock = new ReentrantLock();
private int size = 0;
// Threads Leader
private Thread leader = null;
// Condition variables are used for worker threads waiting to execute tasks 
private final Condition available = lock.newCondition();
}
}

scheduleAtFixedRate Realization

Indicates periodic task scheduling , Each task determines the next start time based on the start time of the last task

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit) {

if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
//triggerTime Used to calculate the time when the task should be scheduled ,unit.toNanos(period) Used to change the execution cycle to nanoseconds 
ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command,null,triggerTime(initialDelay, unit),
unit.toNanos(period));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
// Encapsulation is ScheduledFutureTask object , adopt decorateTask, call delayedExecute Method execution 
protected <V> RunnableScheduledFuture<V> decorateTask(
Callable<V> callable, RunnableScheduledFuture<V> task) {

return task;
}

delayedExecute Method

// The main method of executing periodic scheduling or delaying tasks .
private void delayedExecute(RunnableScheduledFuture<?> task) {

if (isShutdown())//SHURDOWN 了 ?
reject(task);
else {

super.getQueue().add(task); // Add to the queue 
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&// Whether the current thread is shutdown In the state of 
remove(task))// by false Under the circumstances Remove from the current queue Then cancel the current task 
task.cancel(false);
else
// Make sure at least one thread is still executing the task 
ensurePrestart();
}
}

ensurePrestart Method

If the thread pool is not closed, then ensurePrestart Method

// The guarantee is to start at least one core thread 
void ensurePrestart() {

int wc = workerCountOf(ctl.get());// Gets the number of worker threads 
if (wc < corePoolSize)// If it is less than Number of core threads 
addWorker(null, true);// add to Number of core threads 
else if (wc == 0)// At least one worker thread is guaranteed 
addWorker(null, false);
}

add add to

If the thread is not closed , Then call directly DelayedWorkQueue Inside add -> offer Method , Task task Put it in the queue

public boolean add(Runnable e) {

return offer(e);
}
public boolean offer(Runnable x) {
// Add tasks to the array 
if (x == null)
throw new NullPointerException();
RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
final ReentrantLock lock = this.lock;
lock.lock();
try {
// Do you want to expand 
int i = size;
if (i >= queue.length)
grow();
size = i + 1;
// There was no task before , Just put it first 
if (i == 0) {

queue[0] = e;
setIndex(e, 0);
} else {

// Otherwise, put the task last , And then through siftUp Method adjustment 
siftUp(i, e);
}
// If the first task in the queue is the current e Then remove leader Threads , Then wake up a thread available in the waiting queue to perform the task 
if (queue[0] == e) {

leader = null;
available.signal();
}
} finally {

lock.unlock();
}
return true;
}

grow Expansion method

private void grow() {

int oldCapacity = queue.length;
// Every expansion 50%
int newCapacity = oldCapacity + (oldCapacity >> 1);
if (newCapacity < 0) // If the new capacity is less than 0, Then it means that there is an overflow 
newCapacity = Integer.MAX_VALUE;
// take old Array task Copy to the trust array 
queue = Arrays.copyOf(queue, newCapacity);
}

siftUp Adjustment method

// Adjust the heap 
private void siftUp(int k, RunnableScheduledFuture<?> key) {

// When K>0 when , Keep adjusting .k be equal to 0 Indicates that the root node has been adjusted , That is, the first element , At this point, you must exit the loop 
while (k > 0) {

// Find him Father node 
int parent = (k - 1) >>> 1;
RunnableScheduledFuture<?> e = queue[parent];
// Compare with parent node See if you need to move , Until I find Location 
if (key.compareTo(e) >= 0)
break;
queue[k] = e;
setIndex(e, k);
k = parent;
}
queue[k] = key;
setIndex(key, k);
}

canRunInCurrentRunState

If after adding to the priority queue , Thread pool is closed , Need to pass through To determine whether the task should continue

/* Judge by whether it is a periodic task continueExistingPeriodicTasksAfterShutdown( Thread pool shutdown Whether to perform periodic tasks after ) executeExistingDelayedTasksAfterShutdown( Thread pool shutdown Whether to execute delayed tasks after ) Whether the task should continue */
boolean canRunInCurrentRunState(boolean periodic) {

return isRunningOrShutdown(periodic ?
continueExistingPeriodicTasksAfterShutdown :
executeExistingDelayedTasksAfterShutdown);
}

take Method

// Get the task from the queue , If there are no tasks in the current queue , Then block until there are tasks in the queue , Wait offer Method wake up 
public RunnableScheduledFuture<?> take() throws InterruptedException {

final ReentrantLock lock = this.lock;
lock.lockInterruptibly();// It can be locked in response to interruption 
try {

for (;;) {

RunnableScheduledFuture<?> first = queue[0];
// If the first task in the queue is null, It proves that there is no task , The current thread is waiting 
if (first == null)
available.await();
else {
// Otherwise, get the remaining waiting time of the first task , Judge whether it is less than 0. Need to execute 
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
return finishPoll(first);
first = null; // Do not keep references while the thread is waiting for the task to execute 
// Because the task still needs to wait for some time to execute , At this time, check whether there is a thread waiting in front of you , If there is , Then the current thread continues to wait 
if (leader != null)// There is no need to make the thread that gets the first task wait 
available.await();
else {
/* If there is no thread waiting ahead , Set yourself to leader Threads , Then start waiting delay Time At this time, if another thread gets the task , Can only make this leader The thread delay is awakened */
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {

available.awaitNanos(delay);
} finally {

// take leader Variables are removed 
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {

/* When the task is acquired by the thread , Judge leader Whether it is empty and the queue is not empty , Because there is no thread to wait or get the next task in the queue , Therefore, you need to wake up a thread to act as leader Wait or get the next task */
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}
// Finish the final task and leave the team , Here comes in f Wait for the first task . Out of the team due to the task , Therefore, the reactor needs to be adjusted 
private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {

// The number of tasks in the current task queue --, Then take out a task at the end of the queue , call siftDown Reorder the heap 
int s = --size;
RunnableScheduledFuture<?> x = queue[s];
queue[s] = null;
if (s != 0)
siftDown(0, x);
setIndex(f, -1);
return f;
}

siftDown : Adjust the heap

// Adjust the heap . Move the task to the end of the heap .
private void siftDown(int k, RunnableScheduledFuture<?> key) {

int half = size >>> 1;
while (k < half) {

int child = (k << 1) + 1; // Get the left child node index 
RunnableScheduledFuture<?> c = queue[child];
int right = child + 1; // The right child 
if (right < size && c.compareTo(queue[right]) > 0)// Compare the size of the left and right children 
c = queue[child = right];
if (key.compareTo(c) <= 0)// To control children's min Come and join us key Compare 
break;
queue[k] = c; // If key Less than their min Then exchange 
setIndex(c, k);
k = child;
}
queue[k] = key; // here k That is, the incoming key Subscript that should be stored 
setIndex(key, k);
}

scheduleWithFixedDelay Realization

scheduleAtFixedRate and scheduleWithFixedDelay, The former is calculated based on the task start time , the latter yes Calculated based on the completion time of the previous task

public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit) {

if (command == null || unit == null)
throw new NullPointerException();
if (delay <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(-delay));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}

shutdown

ThreadPoolExecutor Of shutdown ->onShutdown

@Override void onShutdown() {

// First, get the task queue q
BlockingQueue<Runnable> q = super.getQueue();
// Default true
boolean keepDelayed =
getExecuteExistingDelayedTasksAfterShutdownPolicy();// Flag indicating whether to continue to execute delayed tasks after the thread pool is closed 
// Default false
boolean keepPeriodic =
getContinueExistingPeriodicTasksAfterShutdownPolicy();// Flag whether to continue to execute periodic tasks after the thread pool is closed 
// Determine whether the process pool shutdown Then continue to perform the delayed task , Whether to continue to execute periodic scheduling tasks 
if (!keepDelayed && !keepPeriodic) {
// Are not , Empty the task queue , Cancel task execution at the same time 
for (Object e : q.toArray())
if (e instanceof RunnableScheduledFuture<?>)
((RunnableScheduledFuture<?>) e).cancel(false);
q.clear();
}
else {

// Otherwise, traverse the task queue , Deal with periodic tasks and delayed tasks respectively 
for (Object e : q.toArray()) {

if (e instanceof RunnableScheduledFuture) {

RunnableScheduledFuture<?> t =
(RunnableScheduledFuture<?>)e;
if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
t.isCancelled()) {
 // there t.isCancelled() Indicates that the task has been cancelled , Should be removed 
if (q.remove(t))
t.cancel(false);
}
}
}
}
tryTerminate();// Call this method to attempt to further transition the thread pool state 
}
copyright:author[L._ l],Please bring the original link to reprint, thank you. https://en.javamana.com/2022/01/202201261543220890.html