3

Disruptor-源码解读 - 王谷雨

 1 year ago
source link: https://www.cnblogs.com/konghuanxi/p/17324988.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Disruptor-源码解读

Disruptor的高性能,是多种技术结合以及本身架构的结果。本文主要讲源码,涉及到的相关知识点需要读者自行去了解,以下列出:

  • 锁和CAS
  • 伪共享和缓存行
  • volatile和内存屏障

此节结合demo来看更容易理解:传送门

下图来自官方文档

官方原图有点乱,我翻译一下

在讲原理前,先了解 Disruptor 定义的术语

  • Event

    存放数据的单位,对应 demo 中的 LongEvent

  • Ring Buffer

    环形数据缓冲区:这是一个首尾相接的环,用于存放 Event ,用于生产者往其存入数据和消费者从其拉取数据

  • Sequence

    序列:用于跟踪进度(生产进度、消费进度)

  • Sequencer

    Disruptor的核心,用于在生产者和消费者之间传递数据,有单生产者和多生产者两种实现。

  • Sequence Barrier

    序列屏障,消费者之间的依赖关系就靠序列屏障实现

  • Wait Strategy

  • 等待策略,消费者等待生产者将发布的策略

  • Event Processor

    事件处理器,循环从 RingBuffer 获取 Event 并执行 EventHandler。

  • Event Handler

    事件处理程序,也就是消费者

  • Producer

Ring Buffer

环形数据缓冲区(RingBuffer),逻辑上是首尾相接的环,在代码中用数组来表示Object[]。Disruptor生产者发布分两步

  • 步骤一:申请写入 n 个元素,如果可以写入,这返回最大序列号
  • 步骤二:根据序列号去 RingBuffer 中获取 Event,修改并发布
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();// 获取下一个可用位置的下标(步骤1)long sequence = ringBuffer.next();try { // 返回可用位置的元素 LongEvent event = ringBuffer.get(sequence); // 设置该位置元素的值 event.set(l);} finally { // 发布 ringBuffer.publish(sequence);}

这两个步骤由 Sequencer 完成,分为单生产者和多生产者实现

Sequencer

如果申请 2 个元素,则如下图所示(圆表示 RingBuffer)

// 一般不会有以下写法,这里为了讲解源码才使用next(2)// 向RingBuffer申请两个元素long sequence = ringBuffer.next(2);for (long i = sequence-1; i <= sequence; i++) { try { // 返回可用位置的元素 LongEvent event = ringBuffer.get(i); // 设置该位置元素的值 event.set(1); } finally { ringBuffer.publish(i); }}

next 申请成功的序列,cursor 消费者最大可用序列,gatingSequence 表示能申请的最大序列号。红色表示待发布,绿色表示已发布。申请相当于占位置,发布需要一个一个按顺序发布

如果 RingBuffer 满了呢,在上图步骤二的基础上,生产者发布了3个元素,消费者消费1个。此时生产者再申请 2个元素,就会变成下图所示

只剩下 1 个空间,但是要申请 2个元素,此时程序会自旋等待空间足够。

接下来结合代码看,单生产者的 Sequencer 实现为 SingleProducerSequencer,先看看构造方法

abstract class SingleProducerSequencerPad extends AbstractSequencer{ protected long p1, p2, p3, p4, p5, p6, p7; SingleProducerSequencerPad(int bufferSize, WaitStrategy waitStrategy) { super(bufferSize, waitStrategy); }} abstract class SingleProducerSequencerFields extends SingleProducerSequencerPad{ SingleProducerSequencerFields(int bufferSize, WaitStrategy waitStrategy) { super(bufferSize, waitStrategy); } long nextValue = Sequence.INITIAL_VALUE; long cachedValue = Sequence.INITIAL_VALUE;} public final class SingleProducerSequencer extends SingleProducerSequencerFields{ protected long p1, p2, p3, p4, p5, p6, p7; public SingleProducerSequencer(int bufferSize, WaitStrategy waitStrategy) { super(bufferSize, waitStrategy); }}

这是 Disruptor 高性能的技巧之一,SingleProducerSequencer 需要的类变量只有 nextValue 和cachedValue,p1 ~ p7 的作用是填充缓存行,这能保证 nextValue 和cachedValue 必定在独立的缓存行,我们可以用ClassLayout打印内存布局看看

接下来看如何获取序列号(也就是步骤一)

// 调用路径// RingBuffer#next()// SingleProducerSequencer#next()public long next(int n){ if (n < 1) { throw new IllegalArgumentException("n must be > 0"); } long nextValue = this.nextValue; //生产者当前序号值+期望获取的序号数量后达到的序号值 long nextSequence = nextValue + n; //减掉RingBuffer的总的buffer值,用于判断是否出现‘覆盖’ long wrapPoint = nextSequence - bufferSize; //从后面代码分析可得:cachedValue就是缓存的消费者中最小序号值,他不是当前最新的‘消费者中最小序号值’,而是上次程序进入到下面的if判定代码段时,被赋值的当时的‘消费者中最小序号值’ //这样做的好处在于:在判定是否出现覆盖的时候,不用每次都调用getMininumSequence计算‘消费者中的最小序号值’,从而节约开销。只要确保当生产者的节奏大于了缓存的cachedGateingSequence一个bufferSize时,从新获取一下 getMinimumSequence()即可。 long cachedGatingSequence = this.cachedValue; //(wrapPoint > cachedGatingSequence) : 当生产者已经超过上一次缓存的‘消费者中最小序号值’(cachedGatingSequence)一个‘Ring’大小(bufferSize),需要重新获取cachedGatingSequence,避免当生产者一直在生产,但是消费者不再消费的情况下,出现‘覆盖’ //(cachedGatingSequence > nextValue) : https://github.com/LMAX-Exchange/disruptor/issues/76 // 这里判断就是生产者生产的填满BingBUffer,需要等待消费者消费 if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue) { cursor.setVolatile(nextValue); // StoreLoad fence //gatingSequences就是消费者队列末尾的序列,也就是消费者消费到哪里了 //实际上就是获得处理的队尾,如果队尾是current的话,说明所有的消费者都执行完成任务在等待新的事件了 long minSequence; while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue))) { // 等待1纳秒 LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin? } this.cachedValue = minSequence; } this.nextValue = nextSequence; return nextSequence;} public void publish(long sequence){ // 更新序列号 cursor.set(sequence); // 等待策略的唤醒 waitStrategy.signalAllWhenBlocking();}

要解释的都在注释里了,gatingSequences 是消费者队列末尾的序列,对应着就是下图中的 ApplicationConsumer 的 Sequence

看完单生产者版,接下来看多生产者的实现。因为是多生产者,需要考虑并发的情况。

如果有A、B两个消费者都来申请 2 个元素

cursor 申请成功的序列,HPS 消费者最大可用序列,gatingSequence 表示能申请的最大序列号。红色表示待发布,绿色表示已发布。HPS 是我自己编的缩写,表示 getHighestPublishedSequence 方法的返回值

如图所示,只要申请成功,就移动 cursor 的位置。RingBuffer 并没有记录发布情况(图中的红绿颜色),这个发布情况由 MultiProducerSequenceravailableBuffer 来维护。

下面看代码

public final class MultiProducerSequencer extends AbstractSequencer{ // 缓存的消费者中最小序号值,相当于SingleProducerSequencerFields的cachedValue private final Sequence gatingSequenceCache = new Sequence(Sequencer.INITIAL_CURSOR_VALUE); // 标记元素是否可用 private final int[] availableBuffer; public long next(int n) { if (n < 1) { throw new IllegalArgumentException("n must be > 0"); } long current; long next; do { current = cursor.get(); next = current + n; //减掉RingBuffer的总的buffer值,用于判断是否出现‘覆盖’ long wrapPoint = next - bufferSize; //从后面代码分析可得:cachedValue就是缓存的消费者中最小序号值,他不是当前最新的‘消费者中最小序号值’,而是上次程序进入到下面的if判定代码段时,被赋值的当时的‘消费者中最小序号值’ //这样做的好处在于:在判定是否出现覆盖的时候,不用每次都调用getMininumSequence计算‘消费者中的最小序号值’,从而节约开销。只要确保当生产者的节奏大于了缓存的cachedGateingSequence一个bufferSize时,从新获取一下 getMinimumSequence()即可。 long cachedGatingSequence = gatingSequenceCache.get(); //(wrapPoint > cachedGatingSequence) : 当生产者已经超过上一次缓存的‘消费者中最小序号值’(cachedGatingSequence)一个‘Ring’大小(bufferSize),需要重新获取cachedGatingSequence,避免当生产者一直在生产,但是消费者不再消费的情况下,出现‘覆盖’ //(cachedGatingSequence > nextValue) : https://github.com/LMAX-Exchange/disruptor/issues/76 // 这里判断就是生产者生产的填满BingBUffer,需要等待消费者消费 if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current) { long gatingSequence = Util.getMinimumSequence(gatingSequences, current); if (wrapPoint > gatingSequence) { LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy? continue; } gatingSequenceCache.set(gatingSequence); } // 使用cas保证只有一个生产者能拿到next else if (cursor.compareAndSet(current, next)) { break; } } while (true); return next; }......}

MultiProducerSequencerSingleProducerSequencer的 next()方法逻辑大致一样,只是多了CAS的步骤来保证并发的正确性。接着看发布方法

public void publish(final long sequence){ // 记录发布情况 setAvailable(sequence); // 等待策略的唤醒 waitStrategy.signalAllWhenBlocking();} private void setAvailable(final long sequence){ // calculateIndex(sequence):获取序号 // calculateAvailabilityFlag(sequence):RingBuffer的圈数 setAvailableBufferValue(calculateIndex(sequence), calculateAvailabilityFlag(sequence));} private void setAvailableBufferValue(int index, int flag){ long bufferAddress = (index * SCALE) + BASE; UNSAFE.putOrderedInt(availableBuffer, bufferAddress, flag); // 上面相当于 availableBuffer[index] = flag 的高性能版}

记录发布情况,其实相当于 availableBuffer[sequence] = 圈数,前面说了,availableBuffer是用来标记元素是否可用的,如果消费者的圈数 ≠ availableBuffer中的圈数,则表示元素不可用

public boolean isAvailable(long sequence){ int index = calculateIndex(sequence); // 计算圈数 int flag = calculateAvailabilityFlag(sequence); long bufferAddress = (index * SCALE) + BASE; // UNSAFE.getIntVolatile(availableBuffer, bufferAddress):相当于availableBuffer[sequence] 的高性能版 return UNSAFE.getIntVolatile(availableBuffer, bufferAddress) == flag;} private int calculateAvailabilityFlag(final long sequence){ // 相当于 sequence % bufferSize ,但是位操作更快 return (int) (sequence >>> indexShift);}

isAvailable() 方法判断元素是否可用,此方法的调用堆栈看完消费者就清楚了。

本小节介绍两个方面,一是 Disruptor 的消费者如何实现依赖关系的,二是消费者如何拉取消息并消费

消费者的依赖关系实现

我们看回这张图,每个消费者前都有一个 SequenceBarrier ,这就是消费者之间能实现依赖的关键。每个消费者都有一个 Sequence,表示自身消费的进度,如图中,ApplicationConsumer 的 SequenceBarrier 就持有 ReplicaionConsumer 和 JournalConsumer 的 Sequence,这样就能控制 ApplicationConsumer 的消费进度不超过其依赖的消费者。

下面看源码,这是 disruptor 配置消费者的代码。

EventHandler journalConsumer = xxx;EventHandler replicaionConsumer = xxx;EventHandler applicationConsumer = xxx; disruptor.handleEventsWith(journalConsumer, replicaionConsumer) .then(applicationConsumer); // 下面两行等同于上面这行// disruptor.handleEventsWith(journalConsumer, replicaionConsumer);// disruptor.after(journalConsumer, replicaionConsumer).then(applicationConsumer);

先看ReplicaionConsumer 和 JournalConsumer 的配置 disruptor.handleEventsWith(journalConsumer, replicaionConsumer)

/** 代码都在Disruptor类 **/ public final EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers){ // 没有依赖的消费者就创建新的Sequence return createEventProcessors(new Sequence[0], handlers);} /** * 创建消费者 * @param barrierSequences 当前消费者组的屏障序列数组,如果当前消费者组是第一组,则取一个空的序列数组;否则,barrierSequences就是上一组消费者组的序列数组 * @param eventHandlers 事件消费逻辑的EventHandler数组 */EventHandlerGroup<T> createEventProcessors( final Sequence[] barrierSequences, final EventHandler<? super T>[] eventHandlers){ checkNotStarted(); // 对应此事件处理器组的序列组 final Sequence[] processorSequences = new Sequence[eventHandlers.length]; final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences); for (int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++) { final EventHandler<? super T> eventHandler = eventHandlers[i]; // 创建消费者,注意这里传入了SequenceBarrier final BatchEventProcessor<T> batchEventProcessor = new BatchEventProcessor<>(ringBuffer, barrier, eventHandler); if (exceptionHandler != null) { batchEventProcessor.setExceptionHandler(exceptionHandler); } consumerRepository.add(batchEventProcessor, eventHandler, barrier); processorSequences[i] = batchEventProcessor.getSequence(); } // 每次添加完事件处理器后,更新门控序列,以便后续调用链的添加 // 所谓门控,就是RingBuffer要知道在消费链末尾的那组消费者(也是最慢的)的进度,避免消息未消费就被写入覆盖 updateGatingSequencesForNextInChain(barrierSequences, processorSequences); return new EventHandlerGroup<>(this, consumerRepository, processorSequences);}

createEventProcessors() 方法主要做了3件事,创建消费者、保存eventHandler和消费者的映射关系、更新 gatingSequences

  • EventProcessor 是消费者
  • SequenceBarrier 是消费者屏障,保证了消费者的依赖关系
  • consumerRepository 保存了eventHandler和消费者的映射关系

gatingSequences 我们在前面说过,生产者通过 gatingSequences 知道消费者的进度,防止生产过快导致消息被覆盖,更新操作在 updateGatingSequencesForNextInChain() 方法中

// 为消费链下一组消费者,更新门控序列// barrierSequences是上一组事件处理器组的序列(如果本次是第一次,则为空数组),本组不能超过上组序列值// processorSequences是本次要设置的事件处理器组的序列private void updateGatingSequencesForNextInChain(final Sequence[] barrierSequences, final Sequence[] processorSequences){ if (processorSequences.length > 0) { // 将本组序列添加到Sequencer中的gatingSequences中 ringBuffer.addGatingSequences(processorSequences); // 将上组消费者的序列从gatingSequences中移除 for (final Sequence barrierSequence : barrierSequences) { ringBuffer.removeGatingSequence(barrierSequence); } // 取消标记上一组消费者为消费链末端 consumerRepository.unMarkEventProcessorsAsEndOfChain(barrierSequences); }}

让我们把视线再回到消费者的设置方法

disruptor.handleEventsWith(journalConsumer, replicaionConsumer) .then(applicationConsumer);

journalConsumer 和 replicaionConsumer 已经设置了,接下来是 applicationConsumer

/** 代码在EventHandlerGroup类 **/ public final EventHandlerGroup<T> then(final EventHandler<? super T>... handlers){ return handleEventsWith(handlers);} public final EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers){ return disruptor.createEventProcessors(sequences, handlers);}

可以看到,设置 applicationConsumer 最终调用的也是 createEventProcessors() 方法,区别就在于 createEventProcessors() 方法的第一个参数,这里的 sequences 就是 journalConsumer 和 replicaionConsumer 这两个消费者的 Sequence

消费者的消费逻辑

消费者的主要消费逻辑在 EventProcessor#run()方法中,下面以BatchEventProcessor举例

// BatchEventProcessor#run()// BatchEventProcessor#processEvents()private void processEvents(){ T event = null; long nextSequence = sequence.get() + 1L; while (true) { try { // 获取最大可用序列 final long availableSequence = sequenceBarrier.waitFor(nextSequence); ... // 执行消费逻辑 while (nextSequence <= availableSequence) { // dataProvider就是RingBuffer event = dataProvider.get(nextSequence); eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence); nextSequence++; } sequence.set(availableSequence); } catch () { // 异常处理 } }}

方法简洁明了,在死循环中通过 sequenceBarrier 获取最大可用序列,然后从 RingBuffer 中获取 Event 并调用 EventHandler 进行消费。重点在 sequenceBarrier.waitFor(nextSequence); 中

public long waitFor(final long sequence) throws AlertException, InterruptedException, TimeoutException{ checkAlert(); // 获取可用的序列,这里返回的是Sequencer#next方法设置成功的可用下标,不是Sequencer#publish // cursorSequence:生产者的最大可用序列 // dependentSequence:依赖的消费者的最大可用序列 long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this); if (availableSequence < sequence) { return availableSequence; } // 获取最大的已发布成功的序号(对于发布是否成功的校验在此方法中) return sequencer.getHighestPublishedSequence(sequence, availableSequence);}

熟悉的 getHighestPublishedSequence() 方法,忘了就回去看看生产者小节。waitStrategy.waitFor() 对应着图片中的 waitFor() 。

消费者的启动

前面讲了消费者的处理逻辑,但是 BatchEventProcessor#run() 是如何被调用的呢,关键在于disruptor.start();

// Disruptor#start()public RingBuffer<T> start(){ checkOnlyStartedOnce(); for (final ConsumerInfo consumerInfo : consumerRepository) { consumerInfo.start(executor); } return ringBuffer;} class EventProcessorInfo<T> implements ConsumerInfo{ public void start(final Executor executor) { // eventprocessor就是消费者 executor.execute(eventprocessor); }}

还记得 consumerRepository吗,没有就往上翻翻设置消费者那里的 disruptor.handleEventsWith() 方法。

所以启动过程就是
disruptor#start() → ConsumerInfo#start() → Executor#execute() → EventProcessor#run()

课后作业:Disruptor 的消费者使用了多少线程?

本文讲了 Disruptor 大体逻辑和源码,当然其高性能的秘诀不止文中描述的那些。还有不同的等待策略,Sequence 中使用Unsafe而不是JDK中的 Atomic 原子类等等。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK