8

美团动态线程池实践思路开源项目(DynamicTp),线程池源码解析及通知告警篇 - yanhom...

 3 years ago
source link: https://www.cnblogs.com/yanhom/p/16106625.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

大家好,这篇文章我们来聊下动态线程池开源项目(DynamicTp)的通知告警模块。目前项目提供以下通知告警功能,每一个通知项都可以独立配置是否开启、告警阈值、告警间隔时间、平台等,具体代码请看core模块notify包。

1.核心参数变更通知

2.线程池活跃度告警

3.队列容量告警

4.拒绝策略告警

5.任务执行超时告警

6.任务排队超时告警


DynamicTp项目地址

目前700star,感谢你的star,欢迎pr,业务之余一起给开源贡献一份力量

gitee地址https://gitee.com/yanhom/dynamic-tp

github地址https://github.com/lyh200/dynamic-tp

美团动态线程池实践思路,开源了

动态线程池框架(DynamicTp),监控及源码解析篇

动态线程池(DynamicTp),动态调整Tomcat、Jetty、Undertow线程池参数篇


线程池解读

上篇文章里大概讲到了JUC线程池的执行流程,我们这里再仔细回顾下,上图是JUC下线程池ThreadPoolExecutor类的继承体系。

顶级接口Executor提供了一种方式,解耦任务的提交和执行,只定义了一个execute(Runnable command)方法用来提交任务,至于具体任务怎么执行则交给他的实现者去自定义实现。

ExecutorService接口继承Executor,且扩展了生命周期管理的方法、返回Futrue的方法、批量提交任务的方法

void shutdown();List<Runnable> shutdownNow();boolean isShutdown();boolean isTerminated();boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException; <T> Future<T> submit(Callable<T> task);<T> Future<T> submit(Runnable task, T result);Future<?> submit(Runnable task); <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit) throws InterruptedException;<T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;<T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;

AbstractExecutorService抽象类继承ExecutorService接口,对ExecutorService相关方法提供了默认实现,用RunnableFuture的实现类FutureTask包装Runnable任务,交给execute()方法执行,然后可以从该FutureTask阻塞获取执行结果,并且对批量任务的提交做了编排

protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {    return new FutureTask<T>(runnable, value);}    public Future<?> submit(Runnable task) {    if (task == null) throw new NullPointerException();    RunnableFuture<Void> ftask = newTaskFor(task, null);    execute(ftask);    return ftask;}

ThreadPoolExecutor继承AbstractExecutorService,采用池化思想管理一定数量的线程来调度执行提交的任务,且定义了一套线程池的生命周期状态,用一个ctl变量来同时保存当前池状态(高3位)和当前池线程数(低29位)。看过源码的小伙伴会发现,ThreadPoolExecutor类里的方法大量有同时需要获取或更新池状态和池当前线程数的场景,放一个原子变量里,可以很好的保证数据的一致性以及代码的简洁性。

  // 用此变量保存当前池状态(高3位)和当前线程数(低29位)  private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));   private static final int COUNT_BITS = Integer.SIZE - 3;  private static final int CAPACITY   = (1 << COUNT_BITS) - 1;   // runState is stored in the high-order bits  // 可以接受新任务提交,也会处理任务队列中的任务  // 结果:111 00000000000000000000000000000  private static final int RUNNING    = -1 << COUNT_BITS;    // 不接受新任务提交,但会处理任务队列中的任务  // 结果:000 00000000000000000000000000000  private static final int SHUTDOWN   =  0 << COUNT_BITS;    // 不接受新任务,不执行队列中的任务,且会中断正在执行的任务  // 结果:001 00000000000000000000000000000  private static final int STOP       =  1 << COUNT_BITS;    // 任务队列为空,workerCount = 0,线程池的状态在转换为TIDYING状态时,会执行钩子方法terminated()  // 结果:010 00000000000000000000000000000  private static final int TIDYING    =  2 << COUNT_BITS;    // 调用terminated()钩子方法后进入TERMINATED状态  // 结果:010 00000000000000000000000000000  private static final int TERMINATED =  3 << COUNT_BITS;   // Packing and unpacking ctl  // 低29位变为0,得到了线程池的状态  private static int runStateOf(int c)     { return c & ~CAPACITY; }  // 高3位变为为0,得到了线程池中的线程数  private static int workerCountOf(int c)  { return c & CAPACITY; }  private static int ctlOf(int rs, int wc) { return rs | wc; }

核心入口execute()方法执行逻辑如下:

public void execute(Runnable command) {    if (command == null)        throw new NullPointerException();    int c = ctl.get();    if (workerCountOf(c) < corePoolSize) {        if (addWorker(command, true))            return;        c = ctl.get();    }    if (isRunning(c) && workQueue.offer(command)) {        int recheck = ctl.get();        if (! isRunning(recheck) && remove(command))            reject(command);        else if (workerCountOf(recheck) == 0)            addWorker(null, false);    }    else if (!addWorker(command, false))        reject(command);}

可以总结出如下主要执行流程,当然看上述代码会有一些异常分支判断,可以自己顺理加到下述执行主流程里

1.判断线程池的状态,如果不是RUNNING状态,直接执行拒绝策略

2.如果当前线程数 < 核心线程池,则新建一个线程来处理提交的任务

3.如果当前线程数 > 核心线程数且任务队列没满,则将任务放入任务队列等待执行

4.如果 核心线程池 < 当前线程池数 < 最大线程数,且任务队列已满,则创建新的线程执行提交的任务

5.如果当前线程数 > 最大线程数,且队列已满,则拒绝该任务

addWorker()方法逻辑

private boolean addWorker(Runnable firstTask, boolean core) {        retry:        for (;;) {            int c = ctl.get();            // 获取当前池状态            int rs = runStateOf(c);             // 1.判断如果线程池状态 > SHUTDOWN,直接返回false,否则2            // 2.如果线程池状态 = SHUTDOWN,并且firstTask不为null则直接返回false,因为SHUTDOWN状态的线程池不能在接受新任务,否则3            // 3.如果线程池状态 = SHUTDOWN,并且firstTask == null,此时如果任务队列为空,则直接返回false            if (rs >= SHUTDOWN &&                ! (rs == SHUTDOWN &&                   firstTask == null &&                   ! workQueue.isEmpty()))                return false;             for (;;) {                int wc = workerCountOf(c);                // 1.如果当前线程池线程数大于等于CAPACITY(理论上的最大值5亿),则返回fasle                // 2.如果创建核心线程情况下当前池线程数 >= corePoolSize,则返回false                // 3.如果创建非核心线程情况下当前池线程数 >= maximumPoolSize,则返回false                if (wc >= CAPACITY ||                    wc >= (core ? corePoolSize : maximumPoolSize))                    return false;                // cas 增加当前池线程数量,成功则退出循环                    if (compareAndIncrementWorkerCount(c))                    break retry;                c = ctl.get();  // Re-read ctl                // cas 增加当前池线程数量失败(多线程并发),则重新获取ctl,计算出当前线程池状态,如果不等于上述计算的状态rs,则说明线程池状态发生了改变,需要跳到外层循环重新进行状态判断,否则执行内部循环                if (runStateOf(c) != rs)                    continue retry;                // else CAS failed due to workerCount change; retry inner loop            }        }         boolean workerStarted = false;        boolean workerAdded = false;        Worker w = null;        try {            // 至此说明线程池状态校验通过,且增加池线程数量成功,则创建一个Worker线程来执行任务            w = new Worker(firstTask);            final Thread t = w.thread;            if (t != null) {                final ReentrantLock mainLock = this.mainLock;                // 访问worker set时需要获取mainLock全局锁                mainLock.lock();                try {                    // Recheck while holding lock.                    // Back out on ThreadFactory failure or if                    // shut down before lock acquired.                    int rs = runStateOf(ctl.get());                     // 1.当前池状态 < SHUTDOWN,也就是RUNNING状态,如果已经started,抛出异常                    // 2.当前池状态 = SHUTDOWN,且firstTask == null,需要处理任务队列中的任务,如果已经started,抛出异常                    if (rs < SHUTDOWN ||                        (rs == SHUTDOWN && firstTask == null)) {                        if (t.isAlive()) // precheck that t is startable                            throw new IllegalThreadStateException();                        // 刚创建线程添加到workers集合中                        workers.add(w);                        int s = workers.size();                        // 判断更新历史最大线程数量                        if (s > largestPoolSize)                            largestPoolSize = s;                        workerAdded = true;                    }                } finally {                    mainLock.unlock();                }                if (workerAdded) {                    // 启动新建线程                    t.start();                    workerStarted = true;                }            }        } finally {            if (! workerStarted)                // 启动失败,workerCount--,workers里移除该worker                addWorkerFailed(w);        }        return workerStarted;    }

线程池中的线程并不是直接用的Thread类,而是定义了一个内部工作线程Worker类,实现了AQS以及Runnable接口,然后持有一个Thread类的引用及一个firstTask(创建后第一个要执行的任务),每个Worker线程启动后会执行run()方法,该方法会调用执行外层runWorker(Worker w)方法

final void runWorker(Worker w) {    Thread wt = Thread.currentThread();    Runnable task = w.firstTask;    w.firstTask = null;    w.unlock(); // allow interrupts    boolean completedAbruptly = true;    try {        // 1.如果task不为空,则作为该线程的第一个任务直接执行        // 2.如果task为空,则通过getTask()方法从任务队列中获取任务执行        while (task != null || (task = getTask()) != null) {            w.lock();            // If pool is stopping, ensure thread is interrupted;            // if not, ensure thread is not interrupted.  This            // requires a recheck in second case to deal with            // shutdownNow race while clearing interrupt            // 线程池状态 >= STOP,则中断线程            if ((runStateAtLeast(ctl.get(), STOP) ||                 (Thread.interrupted() &&                  runStateAtLeast(ctl.get(), STOP))) &&                !wt.isInterrupted())                wt.interrupt();            try {                // 实际执行任务前调用的钩子方法                beforeExecute(wt, task);                Throwable thrown = null;                try {                    // 实际执行任务                    task.run();                } catch (RuntimeException x) {                    thrown = x; throw x;                } catch (Error x) {                    thrown = x; throw x;                } catch (Throwable x) {                    thrown = x; throw new Error(x);                } finally {                    // 任务执行后调用的钩子方法                    afterExecute(task, thrown);                }            } finally {                // 任务置为null,重新获取新任务,完成数++                task = null;                w.completedTasks++;                w.unlock();            }        }        completedAbruptly = false;    } finally {        // 无任务可执行,执行worker销毁逻辑        processWorkerExit(w, completedAbruptly);    }}

getTask()方法逻辑

private Runnable getTask() {    boolean timedOut = false; // Did the last poll() time out?     for (;;) {        int c = ctl.get();        int rs = runStateOf(c);                // 以下两种情况递减工作线程数量        // 1. rs >= STOP        // 2. rs == SHUTDOWN && workQueue.isEmpty()        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {            decrementWorkerCount();            return null;        }         int wc = workerCountOf(c);         // Are workers subject to culling?        // 允许核心线程超时 或者 当前线程数 > 核心线程数,有可能发生超时关闭        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;         // wc什么情况 > maximumPoolSize,调用setMaximumPoolSize()方法将maximumPoolSize调小了,会发生这种情况,此时需要关闭多余线程        if ((wc > maximumPoolSize || (timed && timedOut))            && (wc > 1 || workQueue.isEmpty())) {            if (compareAndDecrementWorkerCount(c))                return null;            continue;        }         try {            // 阻塞队列获取任务            Runnable r = timed ?                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :                workQueue.take();            if (r != null)                return r;            timedOut = true;        } catch (InterruptedException retry) {            // 发生中断,进行重试            timedOut = false;        }    }}

以上内容比较详细的介绍了ThreadPoolExecutor的继承体系,以及相关的核心源码,基于此,现在我们来看DynamicTp提供的告警通知能力。


核心参数变更通知

对应配置中心的监听端监听到配置变更后,封装到DtpProperties中然后交由DtpRegistry类中的refresh()方法去做配置更新,同时通知时会高亮显示有变更的字段


线程池活跃度告警

活跃度 = activeCount / maximumPoolSize

服务启动后会开启一个定时监控任务,每隔一定时间(可配置)去计算线程池的活跃度,达到配置的threshold阈值后会触发一次告警,告警间隔内多次触发不会发送告警通知


队列容量告警

容量使用率 = queueSize / queueCapacity

服务启动后会开启一个定时监控任务,每隔一定时间去计算任务队列的使用率,达到配置的threshold阈值后会触发一次告警,告警间隔内多次触发不会发送告警通知


拒绝策略告警

/** * Do sth before reject. * @param executor ThreadPoolExecutor instance */default void beforeReject(ThreadPoolExecutor executor) {    if (executor instanceof DtpExecutor) {        DtpExecutor dtpExecutor = (DtpExecutor) executor;        dtpExecutor.incRejectCount(1);        Runnable runnable = () -> AlarmManager.doAlarm(dtpExecutor, REJECT);        AlarmManager.triggerAlarm(dtpExecutor.getThreadPoolName(), REJECT.getValue(), runnable);    }}

线程池线程数达到配置的最大线程数,且任务队列已满,再提交任务会触发拒绝策略。DtpExecutor线程池用到的RejectedExecutionHandler是经过动态代理包装过的,在执行具体的拒绝策略之前会执行RejectedAware类beforeReject()方法,此方法会去做拒绝数量累加(总数值累加、周期值累加)。且判断如果周期累计值达到配置的阈值,则会触发一次告警通知(同时重置周期累加值为0及上次告警时间为当前时间),告警间隔内多次触发不会发送告警通知


任务队列超时告警

重写ThreadPoolExecutor的execute()方法和beforeExecute()方法,如果配置了执行超时或排队超时值,则会用DtpRunnable包装任务,同时记录任务的提交时间submitTime,beforeExecute根据当前时间和submitTime的差值就可以计算到该任务在队列中的等待时间,然后判断如果差值大于配置的queueTimeout则累加排队超时任务数量(总数值累加、周期值累加)。且判断如果周期累计值达到配置的阈值,则会触发一次告警通知(同时重置周期累加值为0及上次告警时间为当前时间),告警间隔内多次触发不会发送告警通知

@Overridepublic void execute(Runnable command) {    if (CollUtil.isNotEmpty(taskWrappers)) {        for (TaskWrapper t : taskWrappers) {            command = t.wrap(command);        }    }     if (runTimeout > 0 || queueTimeout > 0) {        command = new DtpRunnable(command);    }    super.execute(command);}
@Overrideprotected void beforeExecute(Thread t, Runnable r) {    if (!(r instanceof DtpRunnable)) {        super.beforeExecute(t, r);        return;    }    DtpRunnable runnable = (DtpRunnable) r;    long currTime = System.currentTimeMillis();    if (runTimeout > 0) {        runnable.setStartTime(currTime);    }    if (queueTimeout > 0) {        long waitTime = currTime - runnable.getSubmitTime();        if (waitTime > queueTimeout) {            queueTimeoutCount.incrementAndGet();            Runnable alarmTask = () -> AlarmManager.doAlarm(this, QUEUE_TIMEOUT);            AlarmManager.triggerAlarm(this.getThreadPoolName(), QUEUE_TIMEOUT.getValue(), alarmTask);        }    }     super.beforeExecute(t, r);}



任务执行超时告警

重写ThreadPoolExecutor的afterExecute()方法,根据当前时间和beforeExecute()中设置的startTime的差值即可算出任务的实际执行时间,然后判断如果差值大于配置的runTimeout则累加排队超时任务数量(总数值累加、周期值累加)。且判断如果周期累计值达到配置的阈值,则会触发一次告警通知(同时重置周期累加值为0及上次告警时间为当前时间),告警间隔内多次触发不会发送告警通知

@Overrideprotected void afterExecute(Runnable r, Throwable t) {     if (runTimeout > 0) {        DtpRunnable runnable = (DtpRunnable) r;        long runTime = System.currentTimeMillis() - runnable.getStartTime();        if (runTime > runTimeout) {            runTimeoutCount.incrementAndGet();            Runnable alarmTask = () -> AlarmManager.doAlarm(this, RUN_TIMEOUT);            AlarmManager.triggerAlarm(this.getThreadPoolName(), RUN_TIMEOUT.getValue(), alarmTask);        }    }     super.afterExecute(r, t);}



告警通知相关配置项

如果想使用通知告警功能,配置文件必须要配置platforms字段,且可以配置多个平台,如钉钉、企微等;notifyItems配置具体告警项,包括阈值、平台、告警间隔等。

spring:  dynamic:    tp:      # 省略其他项      platforms:                         # 通知平台        - platform: wechat          urlKey: 38a98-0c5c3b649c          receivers: test        - platform: ding          urlKey: f80db3e801d593604f4a08dcd6a          secret: SECb5444a6f375d5b9d21          receivers: 17811511815      executors:                                   # 动态线程池配置,都有默认值,采用默认值的可以不配置该项,减少配置量        - threadPoolName: dtpExecutor1          executorType: common                          # 线程池类型common、eager:适用于io密集型          corePoolSize: 2          maximumPoolSize: 4          queueCapacity: 200          queueType: VariableLinkedBlockingQueue       # 任务队列,查看源码QueueTypeEnum枚举类          rejectedHandlerType: CallerRunsPolicy        # 拒绝策略,查看RejectedTypeEnum枚举类          keepAliveTime: 50          allowCoreThreadTimeOut: false          threadNamePrefix: dtp1                         # 线程名前缀          waitForTasksToCompleteOnShutdown: false        # 参考spring线程池设计          awaitTerminationSeconds: 5                     # 单位(s)          preStartAllCoreThreads: false                  # 是否预热核心线程,默认false          runTimeout: 200                                # 任务执行超时阈值,目前只做告警用,单位(ms)          queueTimeout: 100                              # 任务在队列等待超时阈值,目前只做告警用,单位(ms)          taskWrapperNames: ["ttl"]                      # 任务包装器名称,集成TaskWrapper接口          notifyItems:                     # 报警项,不配置自动会按默认值配置(变更通知、容量报警、活性报警、拒绝报警、任务超时报警)            - type: capacity               # 报警项类型,查看源码 NotifyTypeEnum枚举类              threshold: 80                # 报警阈值              platforms: [ding,wechat]     # 可选配置,不配置默认拿上层platforms配置的所以平台              interval: 120                # 报警间隔(单位:s)            - type: change            - type: liveness              threshold: 80              interval: 120            - type: reject              threshold: 1              interval: 160            - type: run_timeout              threshold: 1              interval: 120            - type: queue_timeout              threshold: 1              interval: 140

本文开头介绍了线程池ThreadPoolExecutor的继承体系,核心流程的源码解读。然后介绍了DynamicTp提供的以上6种告警通知能力,希望通过监控+告警可以让我们及时感知到我们业务线程池的执行负载情况,第一时间做出调整,防止事故的发生。

对项目有什么想法或者建议,可以加我微信交流,或者创建issues,一起完善项目

公众号:CodeFox

微信:yanhom1314


Recommend

  • 31

  • 34

    借用大佬的一张图,侵权立删 前言 在今年年初在掘金发布了一篇文章记一次Vue动态渲染路由的实现,现在代码经过不断的Review 现在完全优化了之前的实现方法,代码量减少很多,逻辑更加简单,同时也更加稳定 demo已经部署到github,欢迎体验~~ vue-e

  • 43

  • 46
    • www.tuicool.com 5 years ago
    • Cache

    一个email下载器:多线程思路

    本文介绍下emaildownloader工具多线程实现思路,主要说下编写过程中遇到的问题,当然还有很多问题自己也没有充分搞明白。 相对于多进程解决方案,多线程最大的好处就是imap连接可以复用,不用每次fetch的时候都要新建一个连接。

  • 9

    vivo 互联网领域的部分业务在微服务的实践过程当中基于很多综合因素的考虑选择了TARS微服务框架。 官方的描述是:TARS是一个支持多语言、内嵌服务治理功能,与Devops能很好协同的微服务框架。我们在开源的基础上做了很多适配内部系统的事情,比如与CICD...

  • 5

    美团动态线程池实践思路,开源了 2022年02月11日 11:12 ·  阅读 12083 大家好,今天我们来聊...

  • 5

    大家好,这篇文章我们来介绍下动态线程池框架( DynamicTp )的 adapter 模块,上篇文章也大概介绍过了,该模块主要是用来适配一些第三方组件的线程池管理,让第三方组件内置的线程池也能享受到动态参数调整,监控告警这些增强功能。 DynamicTp...

  • 10
    • blog.51cto.com 2 years ago
    • Cache

    线程池设计思路

    线程池设计思路线程池是什么我们先来打个比方,线程池就好像一个工具箱,我们每次需要拧螺丝的时候都要从工具箱里面取出一个螺丝刀来,有时候需要取出一个来拧,有时候螺丝多的时候需要多个人取出多个来拧,拧完自己的螺丝那么就...

  • 6

    V2EX  ›  Java Spring JPA 动态列查询有什么好的思路   lei...

  • 13

    大家好,我是马称。 最近,有很多同学在微信上问我这么一个问题: Hippo4j 动态线程池框架是美团开源的么? 类似于这样的问题还挺多,在这里统一回复下: 美团官方并没有开源任何关于动态线程池的框架

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK