Hystrix 源码解析 —— 命令执行(二)之执行隔离策略 | 芋道源码 —— 纯源码解析BLOG

 6 years ago
source link: http://www.iocoder.cn/Hystrix/command-execute-second-isolation-strategy/?toutiao=
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Hystrix 源码解析 —— 命令执行(二)之执行隔离策略


摘要: 原创出处 http://www.iocoder.cn/Hystrix/command-execute-second-isolation-strategy/ 「芋道源码」欢迎转载,保留摘要,谢谢!

本文主要基于 Hystrix 1.5.X 版本

本文主要分享 Hystrix 命令执行(二)之执行隔离策略

建议 :对 RxJava 已经有一定的了解的基础上阅读本文。

Hystrix 提供两种执行隔离策略( ExecutionIsolationStrategy ) :

两种方式的优缺点比较,推荐阅读 《【翻译】Hystrix文档-实现原理》「依赖隔离」

推荐 Spring Cloud 书籍

2. HystrixThreadPoolProperties

com.netflix.hystrix.HystrixThreadPoolProperties ,Hystrix 线程池属性配置抽象类,点击 链接 查看,已添加中文注释说明。

com.netflix.hystrix.strategy.properties.HystrixPropertiesThreadPoolDefault ,Hystrix 线程池配置实现类,点击 链接 查看。实际上没什么内容,官方如是说 :

Default implementation of {@link HystrixThreadPoolProperties} using Archaius (https://github.com/Netflix/archaius)

3. HystrixThreadPoolKey

com.netflix.hystrix.HystrixThreadPoolKey ,Hystrix 线程池标识接口

FROM HystrixThreadPoolKey 接口注释
A key to represent a {@link HystrixThreadPool} for monitoring, metrics publishing, caching and other such uses.
This interface is intended to work natively with Enums so that implementing code can be an enum that implements this interface.

  • 直白的说 ,希望通过相同的 name ( 标识 ) 获得同 HystrixThreadPoolKey 对象。通过在内部维持一个 name 与 HystrixThreadPoolKey 对象的映射,以达到枚举的效果。

HystrixThreadPoolKey 代码如下 :

 1: public interface HystrixThreadPoolKey extends HystrixKey {
2: class Factory {
3: private Factory() {
4: }
6: // used to intern instances so we don't keep re-creating them millions of times for the same key
7: private static final InternMap<String, HystrixThreadPoolKey> intern
8: = new InternMap<String, HystrixThreadPoolKey>(
9: new InternMap.ValueConstructor<String, HystrixThreadPoolKey>() {
10: @Override
11: public HystrixThreadPoolKey create(String key) {
12: return new HystrixThreadPoolKeyDefault(key);
13: }
14: });
16: public static HystrixThreadPoolKey asKey(String name) {
17: return intern.interned(name);
18: }
20: private static class HystrixThreadPoolKeyDefault extends HystrixKeyDefault implements HystrixThreadPoolKey {
21: public HystrixThreadPoolKeyDefault(String name) {
22: super(name);
23: }
24: }
26: /* package-private */ static int getThreadPoolCount() {
27: return intern.size();
28: }
29: }
30: }
  • HystrixThreadPoolKey 实现 com.netflix.hystrix.HystrixKey 接口,点击 链接 查看。该接口定义的 #name() 方法,即是上文我们所说的标识( Key )。
  • intern 属性,name 与 HystrixThreadPoolKey 对象的映射,以达到枚举的效果。
    • com.netflix.hystrix.util.InternMap ,点击 链接 查看带中文注释的代码。
  • #asKey(name) 方法,从 intern 获得 HystrixThreadPoolKey 对象。
  • #getThreadPoolCount() 方法,获得 HystrixThreadPoolKey 数量。

在 AbstractCommand 构造方法里,初始化命令的 threadPoolKey 属性,代码如下 :

protected final HystrixThreadPoolKey threadPoolKey;

protected AbstractCommand(HystrixCommandGroupKey group, HystrixCommandKey key, HystrixThreadPoolKey threadPoolKey, HystrixCircuitBreaker circuitBreaker, HystrixThreadPool threadPool,
HystrixCommandProperties.Setter commandPropertiesDefaults, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults,
HystrixCommandMetrics metrics, TryableSemaphore fallbackSemaphore, TryableSemaphore executionSemaphore,
HystrixPropertiesStrategy propertiesStrategy, HystrixCommandExecutionHook executionHook) {

// ... 省略无关代码

this.commandGroup = initGroupKey(group);
this.commandKey = initCommandKey(key, getClass());
this.properties = initCommandProperties(this.commandKey, propertiesStrategy, commandPropertiesDefaults);
// 初始化 threadPoolKey
this.threadPoolKey = initThreadPoolKey(threadPoolKey, this.commandGroup, this.properties.executionIsolationThreadPoolKeyOverride().get());

  • 调用 #initThreadPoolKey(...) 方法,创建最终的 threadPoolKey 属性。代码如下 :

    private static HystrixThreadPoolKey initThreadPoolKey(HystrixThreadPoolKey threadPoolKey, HystrixCommandGroupKey groupKey, String threadPoolKeyOverride) {
    if (threadPoolKeyOverride == null) {
    // we don't have a property overriding the value so use either HystrixThreadPoolKey or HystrixCommandGroup
    if (threadPoolKey == null) {
    /* use HystrixCommandGroup if HystrixThreadPoolKey is null */
    return HystrixThreadPoolKey.Factory.asKey(groupKey.name());
    } else {
    return threadPoolKey;
    } else { // threadPoolKeyOverride 可覆盖属性
    // we have a property defining the thread-pool so use it instead
    return HystrixThreadPoolKey.Factory.asKey(threadPoolKeyOverride);
    • 优先级 :threadPoolKeyOverride > threadPoolKey > groupKey

4. HystrixConcurrencyStrategy

com.netflix.hystrix.strategy.concurrency.HystrixConcurrencyStrategy ,Hystrix 并发策略抽象类

HystrixConcurrencyStrategy#getThreadPool(...) 方法,代码如下 :

 1: public ThreadPoolExecutor getThreadPool(final HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties threadPoolProperties) {
2: final ThreadFactory threadFactory = getThreadFactory(threadPoolKey);
4: final boolean allowMaximumSizeToDivergeFromCoreSize = threadPoolProperties.getAllowMaximumSizeToDivergeFromCoreSize().get();
5: final int dynamicCoreSize = threadPoolProperties.coreSize().get();
6: final int keepAliveTime = threadPoolProperties.keepAliveTimeMinutes().get();
7: final int maxQueueSize = threadPoolProperties.maxQueueSize().get();
9: final BlockingQueue<Runnable> workQueue = getBlockingQueue(maxQueueSize);
11: if (allowMaximumSizeToDivergeFromCoreSize) {
12: final int dynamicMaximumSize = threadPoolProperties.maximumSize().get();
13: if (dynamicCoreSize > dynamicMaximumSize) {
14: logger.error("Hystrix ThreadPool configuration at startup for : " + threadPoolKey.name() + " is trying to set coreSize = " +
15: dynamicCoreSize + " and maximumSize = " + dynamicMaximumSize + ". Maximum size will be set to " +
16: dynamicCoreSize + ", the coreSize value, since it must be equal to or greater than the coreSize value");
17: return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
18: } else {
19: return new ThreadPoolExecutor(dynamicCoreSize, dynamicMaximumSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
20: }
21: } else {
22: return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
23: }
24: }

com.netflix.hystrix.strategy.concurrency.HystrixConcurrencyStrategyDefault ,Hystrix 并发策略实现类。代码如下( 基本没做啥 ) :

public class HystrixConcurrencyStrategyDefault extends HystrixConcurrencyStrategy {

* 单例
private static HystrixConcurrencyStrategyDefault INSTANCE = new HystrixConcurrencyStrategyDefault();

public static HystrixConcurrencyStrategy getInstance() {
return INSTANCE;

private HystrixConcurrencyStrategyDefault() {


在 AbstractCommand 构造方法里,初始化命令的 threadPoolKey 属性,代码如下 :

protected final HystrixConcurrencyStrategy concurrencyStrategy;

protected AbstractCommand(HystrixCommandGroupKey group, HystrixCommandKey key, HystrixThreadPoolKey threadPoolKey, HystrixCircuitBreaker circuitBreaker, HystrixThreadPool threadPool,
HystrixCommandProperties.Setter commandPropertiesDefaults, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults,
HystrixCommandMetrics metrics, TryableSemaphore fallbackSemaphore, TryableSemaphore executionSemaphore,
HystrixPropertiesStrategy propertiesStrategy, HystrixCommandExecutionHook executionHook) {

// ... 省略无关代码

// 初始化 并发策略
this.concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();

  • HystrixPlugins ,Hystrix 插件体系,https://github.com/Netflix/Hystrix/wiki/Plugins 有详细解析。
  • 调用 HystrixPlugins#getConcurrencyStrategy() 获得 HystrixConcurrencyStrategy 对象。默认情况下,使用 HystrixConcurrencyStrategyDefault 。当然你也可以参考 Hystrix 插件体系,实现自定义的 HystrixConcurrencyStrategy 实现,以达到覆写 #getThreadPool()#getBlockingQueue() 等方法。点击 链接 查看该方法代码。

5. HystrixThreadPool

com.netflix.hystrix.HystrixThreadPool ,Hystrix 线程池接口。当 Hystrix 命令使用 THREAD 执行隔离策略时,HystrixCommand#run() 方法在线程池执行。点击 链接 查看。HystrixThreadPool 定义接口如下 :

  • #getExecutor() :获得 ExecutorService 。
  • #getScheduler() / #getScheduler(Func0<Boolean>) :获得 RxJava Scheduler 。
  • #isQueueSpaceAvailable() :线程池队列是否有空余
  • #markThreadExecution() / #markThreadCompletion() / #markThreadRejection() :TODO 【2002】【metrics】

5.1 HystrixThreadPoolDefault

com.netflix.hystrix.HystrixThreadPool.HystrixThreadPoolDefault ,Hystrix 线程池实现类

构造方法,代码如下 :

 1: private final HystrixThreadPoolProperties properties;
2: private final BlockingQueue<Runnable> queue;
3: private final ThreadPoolExecutor threadPool;
4: private final HystrixThreadPoolMetrics metrics;
5: private final int queueSize;
7: public HystrixThreadPoolDefault(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesDefaults) {
8: // 初始化 HystrixThreadPoolProperties
9: this.properties = HystrixPropertiesFactory.getThreadPoolProperties(threadPoolKey, propertiesDefaults);
10: // 获得 HystrixConcurrencyStrategy
11: HystrixConcurrencyStrategy concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
12: // 队列大小
13: this.queueSize = properties.maxQueueSize().get();
15: // TODO 【2002】【metrics】
16: this.metrics = HystrixThreadPoolMetrics.getInstance(threadPoolKey,
17: concurrencyStrategy.getThreadPool(threadPoolKey, properties), // 初始化 ThreadPoolExecutor
18: properties);
20: // 获得 ThreadPoolExecutor
21: this.threadPool = this.metrics.getThreadPool();
22: this.queue = this.threadPool.getQueue(); // 队列
24: // TODO 【2002】【metrics】
25: /* strategy: HystrixMetricsPublisherThreadPool */
26: HystrixMetricsPublisherFactory.createOrRetrievePublisherForThreadPool(threadPoolKey, this.metrics, this.properties);
27: }
  • 第 9 行 :初始化 HystrixThreadPoolProperties 。
  • 第 11 行 :初始化 HystrixConcurrencyStrategy 。
  • 第 13 行 :初始化 queueSize
  • 第 16 至 18 行 :TODO 【2002】【metrics】
    • 第 17 行 :调用 HystrixConcurrencyStrategy#getThreadPool(...) 方法,初始化 ThreadPoolExecutor 。
  • 第 21 行 :获得 ThreadPoolExecutor 。
  • 第 22 行 :获得 ThreadPoolExecutor 的队列。
  • 第 26 行 :TODO 【2002】【metrics】

#getExecutor() 方法,代码如下 :

public ThreadPoolExecutor getExecutor() {
return threadPool;
  • 调用 #touchConfig() 方法,动态调整 threadPoolcoreSize / maximumSize / keepAliveTime 参数。点击 链接 查看该方法。

#getScheduler() / #getScheduler(Func0<Boolean>) 方法,代码如下 :

public Scheduler getScheduler() {
//by default, interrupt underlying threads on timeout
return getScheduler(new Func0<Boolean>() {
public Boolean call() {
return true;

public Scheduler getScheduler(Func0<Boolean> shouldInterruptThread) {
return new HystrixContextScheduler(HystrixPlugins.getInstance().getConcurrencyStrategy(), this, shouldInterruptThread);

#isQueueSpaceAvailable() 方法,代码如下 :

public boolean isQueueSpaceAvailable() {
if (queueSize <= 0) {
// we don't have a queue so we won't look for space but instead
// let the thread-pool reject or not
return true;
} else {
return threadPool.getQueue().size() < properties.queueSizeRejectionThreshold().get();
  • 由于线程池的队列大小不能动态调整,该方法的实现通过 HystrixThreadPoolProperties.queueSizeRejectionThreshold 属性控制。
  • 注意 queueSize 属性,决定了线程池的队列类型。
    • queueSize <= 0 时,#isQueueSpaceAvailable() 都返回 true 的原因是,线程池使用 SynchronousQueue 作为队列,不支持任务排队,任务超过线程池的 maximumPoolSize 时,新任务被拒绝。
    • queueSize > 0 时,#isQueueSpaceAvailable() 根据情况true/false 的原因是,线程池使用 LinkedBlockingQueue 作为队列,支持一定数量阻塞排队,但是这个数量无法调整。通过 #isQueueSpaceAvailable() 方法的判断,动态调整。另外,初始配置queueSize相对大,否则即使 queueSizeRejectionThreshold 配置的大于 queueSize ,实际提交任务到线程池,也会被拒绝

5.2 Factory

com.netflix.hystrix.HystrixThreadPool.Factory ,HystrixThreadPool 工厂类,不仅限于 HystrixThreadPool 的创建,也提供了 HystrixThreadPool 的管理( HystrixThreadPool 的容器 )。

threadPools 属性,维护创建的 HystrixThreadPool 对应的映射,代码如下 :

final static ConcurrentHashMap<String, HystrixThreadPool> threadPools = new ConcurrentHashMap<String, HystrixThreadPool>();
  • Key 为 HystrixThreadPoolKey#name() ,每个 HystrixThreadPoolKey 对应一个 HystrixThreadPool 对象。

#getInstance(...) 方法,获得 HystrixThreadPool 对象,代码如下 :

/* package */static HystrixThreadPool getInstance(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesBuilder) {
// get the key to use instead of using the object itself so that if people forget to implement equals/hashcode things will still work
String key = threadPoolKey.name();

// this should find it for all but the first time
HystrixThreadPool previouslyCached = threadPools.get(key);
if (previouslyCached != null) {
return previouslyCached;

// if we get here this is the first time so we need to initialize
synchronized (HystrixThreadPool.class) {
if (!threadPools.containsKey(key)) {
threadPools.put(key, new HystrixThreadPoolDefault(threadPoolKey, propertiesBuilder));
return threadPools.get(key);
  • 根据 threadPoolKey 先从 threadPool 获取已创建的 HystrixThreadPool ;获取不到,创建对应的 HystrixThreadPool 返回,并添加到 threadPool

#shutdown() / #shutdown(timeout, unit) 方法,比较易懂,点击 链接 查看。

5.3 初始化

在 AbstractCommand 构造方法里,初始化命令的 threadPool 属性,代码如下 :

protected final HystrixThreadPool threadPool;

protected AbstractCommand(HystrixCommandGroupKey group, HystrixCommandKey key, HystrixThreadPoolKey threadPoolKey, HystrixCircuitBreaker circuitBreaker, HystrixThreadPool threadPool,
HystrixCommandProperties.Setter commandPropertiesDefaults, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults,
HystrixCommandMetrics metrics, TryableSemaphore fallbackSemaphore, TryableSemaphore executionSemaphore,
HystrixPropertiesStrategy propertiesStrategy, HystrixCommandExecutionHook executionHook) {

// ... 省略其他代码

// 初始化 threadPoolKey
this.threadPoolKey = initThreadPoolKey(threadPoolKey, this.commandGroup, this.properties.executionIsolationThreadPoolKeyOverride().get());
// 初始化 threadPool
this.threadPool = initThreadPool(threadPool, this.threadPoolKey, threadPoolPropertiesDefaults);

  • 调用 #initThreadPool(...) 方法,获得 HystrixThreadPool ,点击 链接 查看。

6. HystrixScheduler

Hystrix 实现了自定义的 RxJava Scheduler ,整体类图如下 :

  • HystrixContextScheduler ( 实现 RxJava Scheduler 抽象类 ),内嵌类型为 ThreadPoolScheduler ( 实现 RxJava Scheduler 抽象类 )的 actualScheduler 属性。
  • HystrixContextWorker ( 实现 RxJava Worker 抽象类 ),内嵌类型为 ThreadPoolWorker ( 实现 RxJava Worker 抽象类 )的 worker 属性。

6.1 HystrixContextScheduler

构造方法,代码如下 :

public class HystrixContextScheduler extends Scheduler {

private final HystrixConcurrencyStrategy concurrencyStrategy;
private final Scheduler actualScheduler;
private final HystrixThreadPool threadPool;

// ... 省略无关代码

public HystrixContextScheduler(HystrixConcurrencyStrategy concurrencyStrategy, HystrixThreadPool threadPool, Func0<Boolean> shouldInterruptThread) {
this.concurrencyStrategy = concurrencyStrategy;
this.threadPool = threadPool;
this.actualScheduler = new ThreadPoolScheduler(threadPool, shouldInterruptThread);
  • actualScheduler 属性,类型为 ThreadPoolScheduler 。

#createWorker() 方法,代码如下 :

public Worker createWorker() {
return new HystrixContextSchedulerWorker(actualScheduler.createWorker());
  • 使用 actualScheduler 创建 ThreadPoolWorker ,传参给 HystrixContextSchedulerWorker 。

6.2 HystrixContextSchedulerWorker

构造方法,代码如下 :

private class HystrixContextSchedulerWorker extends Worker {

private final Worker worker;

// ... 省略无关代码

private HystrixContextSchedulerWorker(Worker actualWorker) {
this.worker = actualWorker;

  • worker 属性,类型为 ThreadPoolWorker 。

#schedule(Action0) 方法,代码如下 :

public Subscription schedule(Action0 action) {
if (threadPool != null) {
if (!threadPool.isQueueSpaceAvailable()) {
throw new RejectedExecutionException("Rejected command because thread-pool queueSize is at rejection threshold.");
return worker.schedule(new HystrixContexSchedulerAction(concurrencyStrategy, action));
  • 调用 ThreadPool#isQueueSpaceAvailable() 方法,判断线程池队列是否有空余。这个就是 HystrixContextScheduler 的实际用途。

#unsubscribe() / #isUnsubscribed() 方法,使用 worker 判断,点击 链接查看。

6.3 ThreadPoolScheduler

ThreadPoolScheduler 比较简单,点击 链接 查看。

6.4 ThreadPoolWorker

构造方法,代码如下 :

private static class ThreadPoolWorker extends Worker {

private final HystrixThreadPool threadPool;
private final CompositeSubscription subscription = new CompositeSubscription();
private final Func0<Boolean> shouldInterruptThread;

// ... 省略无关代码

public ThreadPoolWorker(HystrixThreadPool threadPool, Func0<Boolean> shouldInterruptThread) {
this.threadPool = threadPool;
this.shouldInterruptThread = shouldInterruptThread;
  • subscription 属性,订阅信息。

#schedule(Action0) 方法,代码如下 :

 1: @Override
2: public Subscription schedule(final Action0 action) {
3: // 未订阅,返回
4: if (subscription.isUnsubscribed()) {
5: // don't schedule, we are unsubscribed
6: return Subscriptions.unsubscribed();
7: }
9: // 创建 ScheduledAction
10: // This is internal RxJava API but it is too useful.
11: ScheduledAction sa = new ScheduledAction(action);
13: // 添加到 订阅
14: subscription.add(sa);
15: sa.addParent(subscription);
17: // 提交 任务
18: ThreadPoolExecutor executor = (ThreadPoolExecutor) threadPool.getExecutor();
19: FutureTask<?> f = (FutureTask<?>) executor.submit(sa);
20: sa.add(new FutureCompleterWithConfigurableInterrupt(f, shouldInterruptThread, executor));
22: return sa;
23: }
  • 第 4 至 7 行 :未订阅,返回。
  • 第 11 行 : 创建 ScheduledAction 。在 TODO 【2013】【ScheduledAction】 详细解析。
  • 第 14 至 15 行 :添加到订阅( subscription )。
  • 第 18 至 20 行 :使用 threadPool ,提交任务,并创建 FutureCompleterWithConfigurableInterrupt 添加到订阅( sa )。
  • 第 22 行 :返回订阅( sa )。整体订阅关系如下 :

#unsubscribe() / #isUnsubscribed() 方法,使用 subscription 判断,点击 链接查看。

6.5 FutureCompleterWithConfigurableInterrupt

com.netflix.hystrix.strategy.concurrency.HystrixContextScheduler.FutureCompleterWithConfigurableInterrupt ,实现类似 rx.internal.schedulers.ScheduledAction.FutureCompleter ,在它的基础上,支持配置 FutureTask#cancel(Boolean) 是否可打断运行( mayInterruptIfRunning )。

构造方法,代码如下 :

private static class FutureCompleterWithConfigurableInterrupt implements Subscription {
private final FutureTask<?> f;
private final Func0<Boolean> shouldInterruptThread;
private final ThreadPoolExecutor executor;

// ... 省略无关代码

private FutureCompleterWithConfigurableInterrupt(FutureTask<?> f, Func0<Boolean> shouldInterruptThread, ThreadPoolExecutor executor) {
this.f = f;
this.shouldInterruptThread = shouldInterruptThread;
this.executor = executor;

当命令执行超时,或是主动取消命令执行时,调用 #unsubscribe() 方法,取消执行
当命令执行超时,或是主动取消命令执行时,调用 #unsubscribe() 方法,取消执行
当命令执行超时,或是主动取消命令执行时,调用 #unsubscribe() 方法,取消执行

#unsubscribe() 方法,代码如下 :

public void unsubscribe() {
// 从 线程池 移除 任务
// 根据 shouldInterruptThread 配置,是否强制取消
if (shouldInterruptThread.call()) {
} else {
  • 根据 shouldInterruptThread 方法,判断是否强制取消。
  • shouldInterruptThread 对应的方法,实现代码如下 :
subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {
public Boolean call() {
return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT;
  • executionIsolationThreadInterruptOnTimeout = true 时,命令可执行超时。当命令可执行超时时,强制取消。
  • 当使用 HystrixCommand.queue() 返回的 Future ,可以使用 Future#cancel(Boolean) 取消命令执行。从 shouldInterruptThread 对应的方法可以看到,如果此时不满足命令执行超时的条件,命令执行取消的方式是非强制的。此时当 executionIsolationThreadInterruptOnFutureCancel = true 时,并且调用 Future#cancel(Boolean) 传递 mayInterruptIfRunning = true ,强制取消命令执行。

666. 彩蛋


一边写一边想明白了 RxJava 的一些东西,挺舒服的赶脚。

继续 Go On ~ 周末嗨不停。


About Joyk

Aggregate valuable and interesting links.
Joyk means Joy of geeK