7

调度线程池ScheduledThreadPoolExecutor源码解析

 1 year ago
source link: https://www.51cto.com/article/742463.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

调度线程池ScheduledThreadPoolExecutor源码解析

作者:JAVA旭阳 2022-12-16 08:31:37
ScheduledThreadPoolExecutor可以用来很方便实现我们的调度任务,具体使用可以参考调度线程池ScheduledThreadPoolExecutor的正确使用姿势这篇文章,那大家知道它是怎么实现的吗,本文就带大家来揭晓谜底。

ScheduledThreadPoolExecutor可以用来很方便实现我们的调度任务,具体使用可以参考调度线程池ScheduledThreadPoolExecutor的正确使用姿势这篇文章,那大家知道它是怎么实现的吗,本文就带大家来揭晓谜底。

实现机制分析

我们先思考下,如果让大家去实现ScheduledThreadPoolExecutor可以周期性执行任务的功能,需要考虑哪些方面呢?

  • ScheduledThreadPoolExecutor的整体实现思路是什么呢?

答:我们是不是可以继承线程池类,按照线程池的思路,将任务先丢到阻塞队列中,等到时间到了,工作线程就从阻塞队列获取任务执行。

  • 如何实现等到了未来的时间点就开始执行呢?

答:我们可以根据参数获取这个任务还要多少时间执行,那么我们是不是可以从阻塞队列中获取任务的时候,通过条件队列的的awaitNanos(delay)方法,阻塞一定时间。

  • 如何实现 任务的重复性执行呢?

答:这就更加简单了,任务执行完成后,把它再次加入到队列不就行了吗。

图片

类结构图

图片

ScheduledThreadPoolExecutor​的类结构图如上图所示,很明显它是在我们的线程池ThreadPoolExecutor框架基础上扩展的。

  • ScheduledExecutorService:实现了该接口,封装了调度相关的API
  • ThreadPoolExecutor:继承了该类,保留了线程池的能力和整个实现的框架
  • DelayedWorkQueue:内部类,延迟阻塞队列。
  • ScheduledFutureTask:延迟任务对象,包含了任务、任务状态、剩余的时间、结果等信息。

重要属性

通过ScheduledThreadPoolExecutor类的成员属性,我们可以了解它的数据结构。

  • shutdown 后是否继续执行周期任务(重复执行)
private volatile boolean continueExistingPeriodicTasksAfterShutdown;
  • shutdown 后是否继续执行延迟任务(只执行一次)
private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
  • 调用cancel()方法后,是否将该任务从队列中移除,默认false
private volatile boolean removeOnCancel = false;
  • 任务的序列号,保证FIFO队列的顺序,用来比较优先级
private static final AtomicLong sequencer = new AtomicLong()
  • ScheduledFutureTask延迟任务类

ScheduledFutureTask​ 继承 FutureTask​,实现 RunnableScheduledFuture​ 接口,无论是 runnable​ 还是 callable​,无论是否需要延迟和定时,所有的任务都会被封装成 ScheduledFutureTask。

该类具有延迟执行的特点, 覆盖FutureTask​ 的 run 方法来实现对延时执行、周期执行的支持。

对于延时任务调用FutureTask#run​,而对于周期性任务则调用FutureTask#runAndReset​ 并且在成功之后根据 fixed-delay/fixed-rate模式来设置下次执行时间并重新将任务塞到工作队列。

成员属性如下:

// 任务序列号
private final long sequenceNumber;
// 任务可以被执行的时间,交付时间,以纳秒表示
private long time;  
// 0 表示非周期任务
// 正数表示 fixed-rate(两次开始启动的间隔)模式的周期,
// 负数表示 fixed-delay(一次执行结束到下一次开始启动) 模式
private final long period;  
// 执行的任务对象
RunnableScheduledFuture<V> outerTask = this;
// 任务在队列数组中的索引下标, -1表示删除
int heapIndex;
  • DelayedWorkQueue延迟队列

DelayedWorkQueue 是支持延时获取元素的阻塞队列, 内部采用优先队列 PriorityQueue(小根堆、满二叉树)存储元素。

内部数据结构是数组,所以延迟队列出队头元素后需要让其他元素(尾)替换到头节点,防止空指针异常。

成员属性如下:

// 初始容量
private static final int INITIAL_CAPACITY = 16;    
// 节点数量
private int size = 0;
// 存放任务的数组
private RunnableScheduledFuture<?>[] queue = 
    new RunnableScheduledFuture<?>[INITIAL_CAPACITY];    
// 控制并发用的锁
private final ReentrantLock lock = new ReentrantLock();    
// 条件队列
private final Condition available = lock.newCondition();
//指定用于等待队列头节点任务的线程
private Thread leader = null;

提交延迟任务schedule()原理

延迟执行方法,并指定延迟执行的时间,只会执行一次。

  • schedule()方法是延迟任务方法的入口。
public ScheduledFuture<?> schedule(Runnable command,
                                   long delay,
                                   TimeUnit unit) {
    // 判空处理
    if (command == null || unit == null)
        throw new NullPointerException();
    // 将外部传入的任务封装成延迟任务对象ScheduledFutureTask
    RunnableScheduledFuture<?> t = decorateTask(command,
        new ScheduledFutureTask<Void>(command, null,
                                      triggerTime(delay, unit)));
    // 执行延迟任务
    delayedExecute(t);
    return t;
}
  • decorateTask(...) 该方法是封装延迟任务

调用triggerTime(delay, unit)方法计算延迟的时间。

// 返回【当前时间 + 延迟时间】,就是触发当前任务执行的时间
private long triggerTime(long delay, TimeUnit unit) {
    // 设置触发的时间
    return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
}
long triggerTime(long delay) {
    // 如果 delay < Long.Max_VALUE/2,则下次执行时间为当前时间 +delay
    // 否则为了避免队列中出现由于溢出导致的排序紊乱,需要调用overflowFree来修正一下delay
    return now() + ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}

// 下面这种情况很少,大家看不懂可以不用强行理解
// 如果某个任务的 delay 为负数,说明当前可以执行(其实早该执行了)。
// 阻塞队列中维护任务顺序是基于 compareTo 比较的,比较两个任务的顺序会用 time 相减。
// 那么可能出现一个 delay 为正数减去另一个为负数的 delay,结果上溢为负数,则会导致 compareTo 产生错误的结果
private long overflowFree(long delay) {
    Delayed head = (Delayed) super.getQueue().peek();
    if (head != null) {
        long headDelay = head.getDelay(NANOSECONDS);
        // 判断一下队首的delay是不是负数,如果是正数就不用管,怎么减都不会溢出
        // 否则拿当前 delay 减去队首的 delay 来比较看,如果不出现上溢,排序不会乱
        // 不然就把当前 delay 值给调整为 Long.MAX_VALUE + 队首 delay
        if (headDelay < 0 && (delay - headDelay < 0))
            delay = Long.MAX_VALUE + headDelay;
    }
    return delay;
}
  • 调用RunnableScheduledFuture的构造方法封装为延迟任务
ScheduledFutureTask(Runnable r, V result, long ns) {
    super(r, result);
     // 任务的触发时间
    this.time = ns;
     // 任务的周期, 延迟任务的为0,因为不需要重复执行
    this.period = 0;
    // 任务的序号 + 1
    this.sequenceNumber = sequencer.getAndIncrement();
}
  • 调用decorateTask()方法装饰延迟任务
// 没有做任何操作,直接将 task 返回,该方法主要目的是用于子类扩展
protected <V> RunnableScheduledFuture<V> decorateTask(
    Runnable runnable, RunnableScheduledFuture<V> task) {
    return task;
}

提交周期任务scheduleAtFixedRate()原理

按照固定的频率周期性的执行任务,捕手renwu,一次任务的启动到下一次任务的启动的间隔

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period,
                                              TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    if (period <= 0)
        throw new IllegalArgumentException();
    // 任务封装,【指定初始的延迟时间和周期时间】
    ScheduledFutureTask<Void> sft =new ScheduledFutureTask<Void>(command, null,
                                      triggerTime(initialDelay, unit), unit.toNanos(period));
    // 默认返回本身
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    sft.outerTask = t;
    // 开始执行这个任务
    delayedExecute(t);
    return t;
}

提交周期任务scheduleWithFixedDelay()原理

按照指定的延时周期性执行任务,上一个任务执行完毕后,延时一定时间,再次执行任务。

public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay,
                                                 TimeUnit unit) {
    if (command == null || unit == null) 
        throw new NullPointerException();
    if (delay <= 0)
        throw new IllegalArgumentException();
    // 任务封装,【指定初始的延迟时间和周期时间】,周期时间为 - 表示是 fixed-delay 模式
    ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null,
                                      triggerTime(initialDelay, unit), unit.toNanos(-delay));
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    sft.outerTask = t;
     // 开始执行这个任务
    delayedExecute(t);
    return t;
}

执行任务delayedExecute(t)原理

上面多种提交任务的方式,殊途同归,最终都会调用delayedExecute()方法执行延迟或者周期任务。

delayedExecute()方法是执行延迟任务的入口

private void delayedExecute(RunnableScheduledFuture<?> task) {
    // 线程池是 SHUTDOWN 状态,执行拒绝策略
    if (isShutdown())
        // 调用拒绝策略的方法
        reject(task);
    else {
        // 把当前任务放入阻塞队列
        super.getQueue().add(task);
        // 线程池状态为 SHUTDOWN 并且不允许执行任务了,就从队列删除该任务,并设置任务的状态为取消状态
        // 非主流程,可以跳过,不重点看了
        if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task))
            task.cancel(false);
        else
            // 开始执行了哈
            ensurePrestart();
    }
}

ensurePrestart()方法开启线程执行

// ThreadPoolExecutor#ensurePrestart
void ensurePrestart() {
    int wc = workerCountOf(ctl.get());
    // worker数目小于corePoolSize,则添加一个worker。
    if (wc < corePoolSize)
        // 第二个参数 true 表示采用核心线程数量限制,false 表示采用 maximumPoolSize
        addWorker(null, true);
    // corePoolSize = 0的情况,至少开启一个线程,【担保机制】
    else if (wc == 0)
        addWorker(null, false);
}

addWorker()​方法实际上父类ThreadPoolExecutor的方法,这个方法在该文章 Java线程池源码深度解析中详细介绍过,这边做个总结:

  • 如果线程池中工作线程数量小于最大线程数,创建工作线程,执行任务。
  • 如果线程池中工作线程数量大于最大线程数,直接返回。

获取延迟任务take()原理

目前工作线程已经创建好了,工作线程开始工作了,它会从阻塞队列中获取延迟任务执行,这部分也是线程池里面的原理,不做展开,那我们看下它是如何实现延迟执行的? 主要关注如何从阻塞队列中获取任务。

  • DelayedWorkQueue#take()方法获取延迟任务

该方法会在上面的addWoker()​方法创建工作线程后,工作线程中循环持续调用workQueue.take()方法获取延迟任务。

该方法主要获取延迟队列中任务延迟时间小于等于0 的任务。

如果延迟时间不小于0,那么调用条件队列的awaitNanos(delay)阻塞方法等待一段时间,等时间到了,延迟时间自然小于等于0了。

获取到任务后,工作线程就可以开始执行调度任务了。

// DelayedWorkQueue#take()
public RunnableScheduledFuture<?> take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    // 加可中断锁
    lock.lockInterruptibly();
    try {
        // 自旋
        for (;;) {
            // 获取阻塞队列中的头结点
            RunnableScheduledFuture<?> first = queue[0];
            // 如果阻塞队列没有数据,为空
            if (first == null)
                // 等待队列不空,直至有任务通过 offer 入队并唤醒
                available.await();
            else {
                // 获取头节点的的任务还剩余多少时间才执行
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0)
                    // 到达触发时间,获取头节点并调整堆,重新选择延迟时间最小的节点放入头部
                    return finishPoll(first);
                
                // 逻辑到这说明头节点的延迟时间还没到
                first = null;
                // 说明有 leader 线程在等待获取头节点,当前线程直接去阻塞等待
                if (leader != null)
                    // 当前线程阻塞
                    available.await();
                else {
                    // 没有 leader 线程,【当前线程作为leader线程,并设置头结点的延迟时间作为阻塞时间】
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        // 当前线程通过awaitNanos方法等待delay时间后,会自动唤醒,往后面继续执行
                        available.awaitNanos(delay);
                        // 到达阻塞时间时,当前线程会从这里醒来,进入下一轮循环,就有可能执行了
                    } finally {
                        // t堆顶更新,leader 置为 null,offer 方法释放锁后,
                        // 有其它线程通过 take/poll 拿到锁,读到 leader == null,然后将自身更新为leader。
                        if (leader == thisThread)
                            // leader 置为 null 用以接下来判断是否需要唤醒后继线程
                            leader = null;
                    }
                }
            }
        }
    } finally {
        // 没有 leader 线程并且头结点不为 null,唤醒阻塞获取头节点的线程,
        // 【如果没有这一步,就会出现有了需要执行的任务,但是没有线程去执行】
        if (leader == null && queue[0] != null)
            available.signal();
        // 解锁
        lock.unlock();
    }
}
  • finishPoll()方法获取到任务后执行

该方法主要做两个事情, 获取头节点并调整堆,重新选择延迟时间最小的节点放入头部。

private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
    // 获取尾索引
    int s = --size;
    // 获取尾节点
    RunnableScheduledFuture<?> x = queue[s];
    // 将堆结构最后一个节点占用的 slot 设置为 null,因为该节点要尝试升级成堆顶,会根据特性下调
    queue[s] = null;
    // s == 0 说明 当前堆结构只有堆顶一个节点,此时不需要做任何的事情
    if (s != 0)
        // 从索引处 0 开始向下调整
        siftDown(0, x);
    // 出队的元素索引设置为 -1
    setIndex(f, -1);
    return f;
}

延迟任务运行的原理

从延迟队列中获取任务后,工作线程会调用延迟任务的run()方法执行任务。

  • ScheduledFutureTask#run()方法运行任务

调用isPeriodic()方法判断任务是否是周期性任务还是非周期性任务

如果任务是非周期任务,就调用父类的FutureTask#run()执行一次

如果任务是非周期任务,就调用父类的FutureTask#runAndReset(), 返回true会设置下一次的执行时间,重新放入线程池的阻塞队列中,等待下次获取执行

public void run() {
    // 是否周期性,就是判断 period 是否为 0
    boolean periodic = isPeriodic();
    // 根据是否是周期任务检查当前状态能否执行任务,不能执行就取消任务
    if (!canRunInCurrentRunState(periodic))
        cancel(false);
    // 非周期任务,直接调用 FutureTask#run 执行一次
    else if (!periodic)
        ScheduledFutureTask.super.run();
    // 周期任务的执行,返回 true 表示执行成功
    else if (ScheduledFutureTask.super.runAndReset()) {
        // 设置周期任务的下一次执行时间
        setNextRunTime();
        // 任务的下一次执行安排,如果当前线程池状态可以执行周期任务,加入队列,并开启新线程
        reExecutePeriodic(outerTask);
    }
}
  • FutureTask#runAndReset()执行周期性任务

周期任务正常完成后任务的状态不会变化,依旧是 NEW,不会设置 outcome 属性。

但是如果本次任务执行出现异常,会进入 setException 方法将任务状态置为异常,把异常保存在 outcome 中。

方法返回 false,后续的该任务将不会再周期的执行

protected boolean runAndReset(){
    // 任务不是新建的状态了,或者被别的线程执行了,直接返回 false
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
        return false;
    boolean ran = false;
    int s = state;
    try {
        Callable<V> c = callable;
        if (c != null && s == NEW) {
            try {
                // 执行方法,没有返回值
                c.call();
                ran = true;
            } catch (Throwable ex) {
                // 出现异常,把任务设置为异常状态,唤醒所有的 get 阻塞线程
                setException(ex);
            }
        }
    } finally {
  // 执行完成把执行线程引用置为 null
        runner = null;
        s = state;
        // 如果线程被中断进行中断处理
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
    // 如果正常执行,返回 true,并且任务状态没有被取消
    return ran && s == NEW;
}
  • ScheduledFutureTask#setNextRunTime()设置下次执行时间

如果属性period大于0,表示fixed-rate模式,直接加上period时间即可。

如果属性period小于等于0, 表示是fixed-delay模式, 调用triggerTime重新计算下次时间。

// 任务下一次的触发时间
private void setNextRunTime() {
    long p = period;
    if (p > 0)
        // fixed-rate 模式,【时间设置为上一次执行任务的时间 + p】,两次任务执行的时间差
        time += p;
    else
        // fixed-delay 模式,下一次执行时间是【当前这次任务结束的时间(就是现在) + delay 值】
        time = triggerTime(-p);
}
  • ScheduledFutureTask#reExecutePeriodic(),重新放入阻塞任务队列,等待获取,进行下一轮执行
// ScheduledThreadPoolExecutor#reExecutePeriodic
void reExecutePeriodic(RunnableScheduledFuture<?> task) {
    if (canRunInCurrentRunState(true)) {
        // 【放入任务队列】
        super.getQueue().add(task);
        // 如果提交完任务之后,线程池状态变为了 shutdown 状态,需要再次检查是否可以执行,
        // 如果不能执行且任务还在队列中未被取走,则取消任务
        if (!canRunInCurrentRunState(true) && remove(task))
            task.cancel(false);
        else
            // 当前线程池状态可以执行周期任务,加入队列,并【根据线程数量是否大于核心线程数确定是否开启新线程】
            ensurePrestart();
    }
}
责任编辑:武晓燕 来源: JAVA旭阳

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK