17

Disruptor核心源码分析 - 简书

 4 years ago
source link: https://www.jianshu.com/p/b5aa623654ff?
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
0.0812020.02.21 12:45:47字数 3,738阅读 687

Disruptor核心源码分析

说来惭愧,Log4j2的异步日志已经用了将近2年时间了。但是每次想看Disruptor源码的时候,总是没能坚持下去。这次通过一次生产环境的故障,坚定了看源码的决心。

在阅读这篇文章之前,需要你具备一些对Disruptor的基本了解。如果你对它
还一无所知,希望你先通过下面的文章来入个门。

http://ifeve.com/dissecting-disruptor-whats-so-special/
http://ifeve.com/dissecting_the_disruptor_how_doi_read_from_the_ring_buffer/
http://ifeve.com/disruptor-writing-ringbuffer/

上面几篇文章对Disruptor总体流程的讲解还是比较清楚的,如果你看完仍然不是特别理解,没关系,也可以继续往下看,毕竟talk is cheap,下面会借助code把Disruptor的工作原理阐述清楚。本篇文章不会涉及”为什么Disruptor这么快“这个主题,而把重点放在理解它的工作流程以及熟悉源代码上。

最简单的Demo

我们先通过一个最简单的Demo来感受一下Disruptor的工作流程

// 事件数据结构 StringEvent
@Data
@NoArgsConstructor
public class StringEvent {
    private String value;
}
    public static void main(String[] args) throws InterruptedException {
        Disruptor<StringEvent> disruptor = new Disruptor<>(StringEvent::new, 1024,
                DaemonThreadFactory.INSTANCE);

        disruptor.handleEventsWith(
                (EventHandler<StringEvent>) (event, sequence, endOfBatch) -> System.out
                        .println(event));

        disruptor.start();

        disruptor.publishEvent((event, sequence) -> event.setValue("changed"));
        
        // sleep一下 让消费者可以执行到 因为消费线程是守护线程
        Thread.sleep(1000);
    }

寥寥几行代码,就展示了一个完整的过程。我们来看看每一步具体做了什么:

第一步——创建Disruptor

创建Disruptor,Demo中采用的是参数较少的构造方法,实际上完整的参数列表还包括producerTypewaitStrategy

    public Disruptor(
            final EventFactory<T> eventFactory,
            final int ringBufferSize,
            final ThreadFactory threadFactory,
            final ProducerType producerType,
            final WaitStrategy waitStrategy)
    {
        this(
            RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy),
            new BasicExecutor(threadFactory));
    }
    
    private Disruptor(final RingBuffer<T> ringBuffer, final Executor executor)
    {
        this.ringBuffer = ringBuffer;
        this.executor = executor;
    }
  • eventFactory表示事件的构造器
  • ringBufferSize表示RingBuffer的长度(容量)
  • threadFactory表示消费线程的创建工厂
  • producerType表示是单生产者模式还是多生产者模式(默认是MULTI
  • waitStrategy表示当RingBuffer中没有可消费的Event时消费者的等待策略(默认是BlockingWaitStrategy

可以看到,通过上面5个参数构造出了一个RingBuffer和一个Executor,而这两个组件构成了一个Disruptor。这里的RingBuffer除了存储事件的职能(DataProvider)还承担着申请sequence和publish event的职能。Executor作为消费者线程池,主要是运行消费逻辑的。因此可以说,Disruptor串联起了生产者、消费者以及RingBuffer

创建RingBuffer

RingBuffer是个重点,因为它不止是存储,还干了很多活。所谓能者多劳,也更值得我们研究。我们先看下创建它的静态方法:

    public static <E> RingBuffer<E> create(
        ProducerType producerType,
        EventFactory<E> factory,
        int bufferSize,
        WaitStrategy waitStrategy)
    {
        switch (producerType)
        {
            case SINGLE:
                return createSingleProducer(factory, bufferSize, waitStrategy);
            case MULTI:
                return createMultiProducer(factory, bufferSize, waitStrategy);
            default:
                throw new IllegalStateException(producerType.toString());
        }
    }

根据生产者类型的不同,存在两种类型的RingBuffer:单生产者类型和多生产者类型。为了更容易理解,我们这里先看Single类型的,也就是单生产者类型的RingBuffer

    // class RingBuffer
    public static <E> RingBuffer<E> createSingleProducer(
        EventFactory<E> factory,
        int bufferSize,
        WaitStrategy waitStrategy)
    {
        SingleProducerSequencer sequencer = new SingleProducerSequencer(bufferSize, waitStrategy);

        return new RingBuffer<E>(factory, sequencer);
    }

可以看到,这里通过一个EventFactory和一个SingleProducerSequencer构造了一个RingBuffer。前者是用来创建事件对象的,而后者可以理解成RingBuffer的"帮手",RingBuffer委托Sequencer来处理一些非存储类的工作(比如申请sequence,维护sequence进度,发布事件等)。

我们接着跟进去看看RingBuffer的构造函数:

    // class RingBuffer
    RingBuffer(
        EventFactory<E> eventFactory,
        Sequencer sequencer)
    {
        super(eventFactory, sequencer);
    }
    
    // class RingBufferFields
    RingBufferFields(
        EventFactory<E> eventFactory,
        Sequencer sequencer)
    {
        // “帮手”,主要用来处理sequence申请、维护以及发布等工作
        this.sequencer = sequencer;
        this.bufferSize = sequencer.getBufferSize();

        if (bufferSize < 1)
        {
            throw new IllegalArgumentException("bufferSize must not be less than 1");
        }
        if (Integer.bitCount(bufferSize) != 1)
        {
            throw new IllegalArgumentException("bufferSize must be a power of 2");
        }

        // indexMask主要是为了使用位运算取模的,很多源码里都能看到这类优化
        this.indexMask = bufferSize - 1;
        // 可以看到这个数组除了正常的size之外还有填充的元素,这个是为了解决false sharing的,本篇文章暂不展开
        this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD];
        // 预先填充数组元素,这对垃圾回收很优化,后续发布事件等操作都不需要创建对象,而只需要即可
        fill(eventFactory);
    }
    
    // class RingBufferFields
    private void fill(EventFactory<E> eventFactory)
    {
        for (int i = 0; i < bufferSize; i++)
        {
            entries[BUFFER_PAD + i] = eventFactory.newInstance();
        }
    }

RingBuffer的构造函数还是比较清晰的,跟着上面的注释应该就可以理解。除了Buffer_PAD,这个我们留到后续文章中再去详细的讲解。

创建BasicExecutor

BasicExecutor实现了java.util.concurrent.Executor接口,通过单参数ThreadFactory函数构造:

public class BasicExecutor implements Executor{

    public BasicExecutor(ThreadFactory factory)
    {
        this.factory = factory;
    }
}

具体在哪里使用,我们后面会看到

第二步——注册事件处理逻辑

disruptor.handleEventsWith(
                (EventHandler<StringEvent>) (event, sequence, endOfBatch) -> System.out
                        .println(event));

代码非常容易理解,注册了一个事件处理的回调,并且可以注册多个,其中回调里有三个参数:

  • event表示消费到的本次事件的主体,在例子里也就是StringEvent
  • sequence表示消费到的本次事件对应的sequence
  • endOfBatch表示消费到的本次事件是否是这个批次中的最后一个

由于默认的消费处理器(BatchEventProcessor)是批量来处理事件的,所以会有批次的概念。怎么样算一个批次呢,这个后面讲BatchEveentProessor的时候会讲到。DEMO里的消费逻辑很简单,打印一下event就完事。下面看看handleEventsWith的源代码:

    public EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers)
    {
        // 注意,第一个参数恒为一个空数组
        return createEventProcessors(new Sequence[0], handlers);
    }

createEventProcessors方法的第一个入参叫作barrierSequences,是给存在依赖关系的消费者用的。由于走Disruptor实例调用handleEventsWith都是像上面一样传的是空数组,为了便于理解,可以先将它当成恒为空数组。

    EventHandlerGroup<T> createEventProcessors(
        final Sequence[] barrierSequences,
        final EventHandler<? super T>[] eventHandlers)
    {
        checkNotStarted();

        // 用来保存每个消费者的消费进度
        final Sequence[] processorSequences = new Sequence[eventHandlers.length];
        // SequenceBarrier主要是用来设置消费依赖的[详解1]
        final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences);

        for (int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++)
        {
            final EventHandler<? super T> eventHandler = eventHandlers[i];

            // 可以看到每个eventHandler会被封装成BatchEventProcessor,看名字就知道是批量处理的了吧
            final BatchEventProcessor<T> batchEventProcessor =
                new BatchEventProcessor<T>(ringBuffer, barrier, eventHandler);

            // 设置异常处理器
            if (exceptionHandler != null)
            {
                batchEventProcessor.setExceptionHandler(exceptionHandler);
            }

            // 注册到consumerRepository[详解2]
            consumerRepository.add(batchEventProcessor, eventHandler, barrier);
            // 每一个BatchEventProcessor的消费进度
            processorSequences[i] = batchEventProcessor.getSequence();
        }

        // 更新一些重要的东西[详解3]
        updateGatingSequencesForNextInChain(barrierSequences, processorSequences);

        // 返回一个EventHandlerGroup,这个主要是为了DSL服务的,可以先不关心,可以看到DEMO中我们也没有用到这个返回值
        return new EventHandlerGroup<T>(this, consumerRepository, processorSequences);
    }

上面采用了注释来解释代码含义,我觉得这种形式可能更有助于在看代码的过程中理解。不过注释也有局限性,比如上面写了详解的几处,这里我会详细再跟进下源代码:

详解1——SequenceBarrier

SequenceBarrier主要是设置消费依赖的。比如某个消费者必须等它依赖的消费者消费完某个消息之后才可以消费该消息。当然此处是从Disruptor上直接创建消费组,sequencesToTrack都为空数组,所以只依赖于RingBuffer上的cursorSequence(也就是只要RingBuffer上写(publish)到哪了,那么我就能消费到哪)

下面的代码展示了通过RingBuffer创建SequenceBarrier的链路,发现最终创建的是ProcessingSequenceBarrier。并且在这条链路上,我们前面假定的sequencesToTrack(也就是dependentSequences)为空数组。那么根据上面的构造函数得出dependentSequence = cursorSequence = cursor

    // class RingBuffer
    public SequenceBarrier newBarrier(Sequence... sequencesToTrack)
    {
        return sequencer.newBarrier(sequencesToTrack);
    }
    
    // class AbstractSequencer
    public SequenceBarrier newBarrier(Sequence... sequencesToTrack)
    {
        return new ProcessingSequenceBarrier(this, waitStrategy, cursor, sequencesToTrack);
    }
    
    // class ProcessingSequenceBarrier
    public ProcessingSequenceBarrier(
        final Sequencer sequencer,
        final WaitStrategy waitStrategy,
        final Sequence cursorSequence,
        final Sequence[] dependentSequences)
    {
        this.sequencer = sequencer;
        this.waitStrategy = waitStrategy;
        this.cursorSequence = cursorSequence;
        if (0 == dependentSequences.length)
        {
            dependentSequence = cursorSequence;
        }
        else
        {
            dependentSequence = new FixedSequenceGroup(dependentSequences);
        }
    }

这个cursor是什么呢?首先它是Sequencer的成员变量。而Sequencer有两种:SingleProducerSequencerMultiProducerSequencer。对于SingleProducerSequencer来说,cursor表示的是RingBuffer上当前已发布的最大sequence,而对于MultiProducerSequencer来说,cursor表示的是RingBuffer上当前已申请的最大sequence。此处先有个概念即可,下面讲完生产逻辑之后会详细描述

详解2——ConsumerRepository

    // class ConsumerRepository
    public void add(
        final EventProcessor eventprocessor,
        final EventHandler<? super T> handler,
        final SequenceBarrier barrier)
    {
        final EventProcessorInfo<T> consumerInfo = new EventProcessorInfo<T>(eventprocessor, handler, barrier);
        eventProcessorInfoByEventHandler.put(handler, consumerInfo);
        eventProcessorInfoBySequence.put(eventprocessor.getSequence(), consumerInfo);
        consumerInfos.add(consumerInfo);
    }

无论从类名还是方法体,都可以看出,这个对象主要是用来存储消费者信息的,有两个维度的Map。具体是哪里用,我们用到的时候再说好了~

详解3——一些更新

    private void updateGatingSequencesForNextInChain(Sequence[] barrierSequences, Sequence[] processorSequences)
    {
        if (processorSequences.length > 0)
        {
            ringBuffer.addGatingSequences(processorSequences);
            for (final Sequence barrierSequence : barrierSequences)
            {
                ringBuffer.removeGatingSequence(barrierSequence);
            }
            consumerRepository.unMarkEventProcessorsAsEndOfChain(barrierSequences);
        }
    }

首先,我们要搞清楚这两个入参的意义:

  • barrierSequences:依赖的消费进度
  • processorSequences:新进消费者的进度

其次,还要弄明白一个问题:在向RingBuffer写入数据的时候,如何判定RingBuffer已满(这个应该在前面入门那几篇文章里要掌握的)?通过看最慢的消费者的消费进度是不是已经被生产者拉了一圈了(类似1000米跑步的套圈)。

理解了上面两个问题之后,再来看代码总共做的三个事情:

  1. 把新进消费者的消费进度加入到【所有消费者的消费进度数组】中
  2. 如果说这个新进消费者是依赖了其他的消费者的,那么把其他的消费者从【所有消费者的消费进度数组】中移除。这里为什么要移除呢?因为【所有消费者的消费进度数组】主要是用来获取最慢的进度的。那么被依赖的可以不用考虑,因为它不可能比依赖它的慢。并且让这个数组足够小,可以提升计算最慢进度的性能。
  3. 把被依赖的消费者的endOfChain属性设置成false。这个endOfChain是用来干嘛的呢?其实主要是Disruptor在shutdown的时候需要判定是否所有消费者都已经消费完了(如果依赖了别人的消费者都消费完了,那么整条链路上一定都消费完了)。

第三步——启动Disruptor

    public RingBuffer<T> start()
    {
        checkOnlyStartedOnce();
        for (final ConsumerInfo consumerInfo : consumerRepository)
        {
            consumerInfo.start(executor);
        }

        return ringBuffer;
    }

这个consumerRepository是不是很熟悉?这是第二步详解二里ConsumerInfo注册的地方。可以看到启动Disruptor其实就是在启动消费线程:

    // class EventProcessorInfo
    public void start(final Executor executor)
    {
        // 这里对应的是BatchEventProcessor
        executor.execute(eventprocessor);
    }
    
    // class BasicExecutor
    public void execute(Runnable command)
    {
        final Thread thread = factory.newThread(command);
        if (null == thread)
        {
            throw new RuntimeException("Failed to create thread to run: " + command);
        }

        thread.start();

        threads.add(thread);
    }

那么消费线程的具体逻辑是?看看BatchEventProcessorrun()方法:

    public void run()
    {
        if (!running.compareAndSet(false, true))
        {
            throw new IllegalStateException("Thread is already running");
        }
        sequenceBarrier.clearAlert();

        notifyStart();

        T event = null;
        // 成员变量sequence维护该Processor的消费进度
        long nextSequence = sequence.get() + 1L;
        try
        {
            while (true)
            {
                try
                {
                    // 以nextSequence作为底线,去获取最大的可用sequence(也就是已经被publish的sequence)
                    final long availableSequence = sequenceBarrier.waitFor(nextSequence);

                    // 如果获取到的sequence大于等于nextSequence,说明有可以消费的event,从nextSequence(包含)到availableSequence(包含)这一段的事件就作为同一个批次
                    while (nextSequence <= availableSequence)
                    {
                        event = dataProvider.get(nextSequence);
                        // 调用了前面注册的回调函数
                        eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
                        nextSequence++;
                    }

                    // 消费完一批之后 一次性更新消费进度
                    sequence.set(availableSequence);
                }
                catch (final TimeoutException e)
                {
                    // waitFor超时的场景
                    notifyTimeout(sequence.get());
                }
                catch (final AlertException ex)
                {
                    if (!running.get())
                    {
                        break;
                    }
                }
                catch (final Throwable ex)
                {
                
                    // 消费过程中如果抛出异常,表面上看会更新消费进度,也就是说没有补偿机制。但实际上默认的策略是会抛异常的,消费线程会直接结束掉
                    exceptionHandler.handleEventException(ex, nextSequence, event);
                    sequence.set(nextSequence);
                    nextSequence++;
                }
            }
        }
        finally
        {
            notifyShutdown();
            running.set(false);
        }
    }

sequenceBarrier.waitFor里的逻辑值得好好看一看,不过我觉得等看完发布事件的流程之后会更容易理解。

第四步——发布事件

这里使用的是EventTranslator的方式来发布事件的:

    // class Disruptor
    public void publishEvent(final EventTranslator<T> eventTranslator)
    {
        ringBuffer.publishEvent(eventTranslator);
    }
    
    // class RingBuffer
    public void publishEvent(EventTranslator<E> translator)
    {
        final long sequence = sequencer.next();
        translateAndPublish(translator, sequence);
    }
    
    private void translateAndPublish(EventTranslator<E> translator, long sequence)
    {
        try
        {
            translator.translateTo(get(sequence), sequence);
        }
        finally
        {
            sequencer.publish(sequence);
        }
    }

从上面的代码结构可以看出来,发布事件总共分为三个步骤:

  1. 申请sequence
  2. 填充事件内容

有点类似数据库事务的味道。为了便于理解,我们还是以SingleProducerSequencer来分析下上面三个步骤:

申请sequence

    // class SingleProducerSequencer
    public long next()
    {
        return next(1);
    }
    
    public long next(int n)
    {
        if (n < 1)
        {
            throw new IllegalArgumentException("n must be > 0");
        }

        // nextValue这个变量名有点诡异,实际上表示已经申请到的那个sequence
        long nextValue = this.nextValue;

        // nextSequence表示本次需要申请的最大sequence
        long nextSequence = nextValue + n;
        // 计算出nextSequence在上一圈的点位
        long wrapPoint = nextSequence - bufferSize;
        // 最慢消费进度的缓存
        long cachedGatingSequence = this.cachedValue;
        // 下面这个条件表达式以及其代码块解释起来可能需要比较大的篇幅,所以在下面[核心代码详解]里说明
        if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue)
        {
            cursor.setVolatile(nextValue);  // StoreLoad fence

            long minSequence;
            
            while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue)))
            {
                // 通知下消费者
                waitStrategy.signalAllWhenBlocking();
                // 生产者如果没有空间写数据了,只能无限park
                LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin?
            }
            
            this.cachedValue = minSequence;
        }

        this.nextValue = nextSequence;

        return nextSequence;
    }
核心代码详解

首先,我们看下if表达式里的两个条件:

  1. wrapPoint > cachedGatingSequence,结合我们对Disruptor的了解,此处判断的应该是此次要申请的sequence是否已经领先最慢消费进度一圈了(类似1000米跑步的套圈)

  2. cachedGatingSequence > nextValue判断的是最慢消费进度超过了我们即将要申请的sequence,乍一看这应该是不可能的吧,都还没申请到该sequence怎么可能消费到呢?找了些资料,发现确实是存在该场景的:RingBuffer提供了一个叫resetTo的方法,可以重置当前已申请sequence为一个指定值并publish出去:

    @Deprecated
    public void resetTo(long sequence)
    {
        sequencer.claim(sequence);
        sequencer.publish(sequence);
    }
    

    具体资料可参考:

    不过该代码已经标注为@Deprecated,按照作者的意思,后续是要删掉的。那么在此处分析的时候,我们就将当它恒为false。

对于第一个条件表达式,也有个值得注意的地方:因为里面的【最慢消费进度】取的是缓存值(cached)。而这个缓存值是什么时候更新的呢?答案是只有在“套圈”了以后才会更新。这个逻辑你品,你细品,那么你会发现,每申请RingBuffer.size()个sequence之后都会走进上面的“套圈”逻辑来更新cachedGatingSequence。这样就极大的减少了Util.getMinimumSequence(gatingSequences, nextValue)的运算量

再来看看“套圈”时需要执行的逻辑:

  1. 插入一个StoreLoad屏障,防止是因为内存可见性导致的消费者消费不了数据(应该极少存在这样的情况吧)
  2. 实时计算一下最慢消费进度Util.getMinimumSequence(gatingSequences, nextValue)
  3. 如果真的套圈了,那么就一直死循环直到RingBuffer上有空间可以申请
  4. 更新【最慢消费进度缓存】

注意,当消费者消费过慢时,可能会导致生产者无限park,这个在编程的时候要特别留意。

填充事件内容没什么好说的,无非就是设值的过程。设值完成之后,就可以发布了:

    public void publish(long sequence)
    {
        cursor.set(sequence);
        waitStrategy.signalAllWhenBlocking();
    }

这里会把cursor的值设置为当前申请的sequence,代表序号为sequence的事件发布成功。这里的cursor表示已经publish的最大事件序号(在多生产者模式中并不是),所以我们在使用过程中需要依次申请,依次发布,不能直接上来就publish(100),这样会导致消费者会认为100以前的序号也都就绪了。另外,由于我们现在看的是单生产者模式,也不需要考虑并发场景。

sequenceBarrier.waitFor

看完了生产者的流程,我们来回顾下在消费者里这一句关键的代码。前面有提到过ProcessingSequenceBarrier,带着这一丝丝的印象我们来看看下面这段获取availableSequence的逻辑:

    // class ProcessingSequenceBarrier
    public long waitFor(final long sequence)
        throws AlertException, InterruptedException, TimeoutException
    {
        checkAlert();

        // waitStrategy派上用场了,这是我们在构造Disruptor的时候的入参(也是构造RingBuffer的入参)
        long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);

        // 理论上没有可能为true,因为当前每种waitStrategy内都保证了availableSequence一定大于等于sequence
        if (availableSequence < sequence)
        {
            return availableSequence;
        }

        // 返回最大的已发布的sequence,在单生产者模式下这个函数返回值就等于availableSequence
        return sequencer.getHighestPublishedSequence(sequence, availableSequence);
    }

跟着上面的注释,相信应该没有什么理解上的难点,上面的代码核心就两步:

  1. 通过WaitStrategy.waitFor()获取availableSequence,下面会分析具体的逻辑
  2. 通过sequencer来得到最大的已发布的sequence(HighestPublishedSequence)

WaitStrategy.waitFor

先看看第一步中的WaitStrategy.waitFor()方法,这里以BlockingWaitStrategy为例:

    // class BlockingWaitStrategy
    // 这里的四个入参我们捋一捋
    // sequence:消费者想要消费的最小sequence(底线)
    // cursorSequence:Sequencer的cursor,也就是当前RingBuffer上已经被申请的最大sequence(在讲生产者逻辑的时候提到了)
    // dependentSequence:在我们当前链路为cursorSequence,不存在消费依赖(如果存在依赖的话,则为依赖消费者消费进度)
    // barrier:这个主要是用了其中一些中断方法,不用太care
    public long waitFor(long sequence, Sequence cursorSequence, Sequence dependentSequence, SequenceBarrier barrier)
        throws AlertException, InterruptedException
    {
        long availableSequence;
        if (cursorSequence.get() < sequence)
        {
            lock.lock();
            try
            {
                while (cursorSequence.get() < sequence)
                {
                    barrier.checkAlert();
                    processorNotifyCondition.await();
                }
            }
            finally
            {
                lock.unlock();
            }
        }

        // 看到了吧,这里已经保证了availableSequence必然大于等于sequence
        // 并且在存在依赖的场景中,被依赖消费者存在慢消费的话,会直接导致下游进入死循环
        while ((availableSequence = dependentSequence.get()) < sequence)
        {
            barrier.checkAlert();
        }

        return availableSequence;
    }

从上面的代码能看到,WaitStrategy.waitFor()获取的是依赖消费者的消费进度sequence(默认依赖RingBuffer上已申请进度的sequence)。需要注意的一点是,当消费者获取可消费事件的过程中,存在两种场景需要等待:

  1. RingBuffer上没有事件可以消费
  2. RingBuffer上有可消费事件,但是依赖的消费者还未消费完该事件

如果是第一种场景,那么消费者会采用WaitStrategy的策略进行等待。而如果是第二种场景的话,只能如上所示一样进入死循环(此时可能造成cpu升高)。

sequencer.getHighestPublishedSequence

WaitStrategy.waitFor()返回后,得到的是RingBuffer上已申请进度sequence或者是依赖消费者消费进度sequence(当然如果把cursorSequence也看成一种依赖的话,理解起来就统一了)。注意一个形容词——“已申请”,而不是“已发布”,“已申请”意味着还不一定“已发布”,也就是还不能消费。所以,SequenceBarrier.waitFor最后还有一步sequencer.getHighestPublishedSequence(sequence, availableSequence)

当然如果你很仔细的看到这里并且对于前面的内容都理解了,你可能会产生疑问:对于单生产者来说,本来就是在publish的时候才更新cursor的啊?那上一步从WaitStrategy.waitFor()获取到的不就是“已发布”的进度sequence吗?是的,你说得很正确。对于单生产者确实如此,所以但生产者对应的实现为:

    public long getHighestPublishedSequence(long lowerBound, long availableSequence)
    {
        return availableSequence;
    }

而对于多生产者的话,逻辑就会相对复杂一点,这个我们下一篇文章再分析

到这里,Disruptor核心的逻辑我们基本上看完了。我们介绍了Disruptor中的单生产者模式的生产逻辑以及默认的单线程批量消费逻辑。当然这只是最基本的模式,为了让我们对Disruptor的逻辑和源代码有一个整体的了解。后面的文章我们会涉及更多的场景,比如

  • 多生产者模式是如何工作的
  • 如何实现消费依赖
  • Log4j2是如何使用Disruptor的
  • Disruptor性能高的原因以及使用过程中的一些心得等

如文中有描述错误,还望指出,以便改正,多谢~


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK