8

SOFAJRaft源码阅读-框架Disruptor浅析 - Akai-yuan

 1 year ago
source link: https://www.cnblogs.com/akai-yuan/p/17065305.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

SOFAJRaft源码阅读-框架Disruptor浅析

Disruptor是英国外汇交易公司LMAX开发的一个高性能队列,研发的初衷是解决内存队列的延迟问题。与Kafka、RabbitMQ用于服务间的消息队列不同,disruptor一般用于线程间消息的传递。基于Disruptor开发的系统单线程能支撑每秒600万订单。因为在SOFAJRaft中使用了该框架,所以此文用于浅析Disruptor,初步了解它的优势与基本原理。

快速理解:
最好的方法去理解Disruptor就是将它和容易理解并且相似的队列,例如BlockingQueue。Disruptor其实就像一个队列一样,用于在不同的线程之间迁移数据,但是Disruptor也实现了一些其他队列没有的特性,如:

  • 同一个“事件”可以有多个消费者,消费者之间既可以并行处理,也可以相互依赖形成处理的先后次序(形成一个依赖图);
  • 预分配用于存储事件内容的内存空间;
  • 针对极高的性能目标而实现的极度优化和无锁的设计;

1. JDK中队列的局限性

队列纵览:
基于数组线程安全的队列,比较典型的是ArrayBlockingQueue,它主要通过加锁的方式来保证线程安全;基于链表线程安全的队列,LinkedBlockingQueueConcurrentLinkedQueue两大类,前者也通过的方式来实现线程安全,而后者以及上面表格中的LinkedTransferQueue都是通过原子变量compare and swap(以下简称“CAS”)这种不加锁的方式来实现的。

image.png

(1)线程安全与否

在我们真实的环境中,我们的机器都是属于多线程,当多线程对同一个队列进行排队操作的时候,如果使用线程不安全会出现,覆盖数据,数据丢失等无法预测的事情,所以我们这个时候只能选择线程安全的队列

(2)数组还是链表

ArrayBlockingQueue,LinkedBlockingQueue两个队列,他们两个都是用ReentrantLock控制的线程安全,他们两个的区别一个是数组,一个是链表,在队列中,一般获取这个队列元素之后紧接着会获取下一个元素,或者一次获取多个队列元素都有可能,而数组在内存中地址是连续的,在操作系统中会有缓存的优化,所以访问的速度会略胜一筹,我们也会尽量去选择ArrayBlockingQueue。

(3)数组还是堆

为了减少Java的垃圾回收对系统性能的影响,会尽量选择array/heap格式的数据结构。这样筛选下来,符合条件的队列就只有ArrayBlockingQueue。

(4)有界还是无界

通过不加锁的方式实现的队列都是无界的(无法保证队列的长度在确定的范围内);而加锁的方式,可以实现有界队列。在稳定性要求特别高的系统中,为了防止生产者速度过快,导致内存溢出,只能选择有界队列。

2. Disruptor的高性能

  • 引入环形的数组结构:数组元素不会被回收,避免频繁的GC
  • 无锁的设计:采用CAS无锁方式,保证线程的安全性
  • 属性填充:通过添加额外的无用信息,避免伪共享问题
  • 元素位置的定位:采用跟一致性哈希一样的方式,一个索引,进行自增

(1)RingBuffer环形的数组结构

环形数组结构是整个Disruptor的核心所在。

image.png

<1>减少垃圾回收:
通过缓存行我们知道,数组中的一个元素加载,相邻的数组元素会被预加载。因此在这样的结构中,CPU无需时不时去主存加载数组中的下一个元素。而且,你可以为数组预先分配内存,使得数组对象一直存在(除非程序终止)。关于缓存行的知识,可以翻阅我之前写过的文章:FalseSharing-伪共享
<2>删除节点时避免像链表那样的内存清理:
环形数组不像链表那样,需要为每一个添加到其上面的对象创造节点对象—对应的,当删除节点时,需要执行相应的内存清理操作。环形数组中的元素采用覆盖方式,避免了jvm的GC。
<3>高效的定位策略:
其次结构作为环形,数组的大小为2的n次方,这样元素定位可以通过位运算效率会更高,这个与一致性哈希中的环形策略类似。在Disruptor中,这个环形结构叫做RingBuffer,而且大小必须是2的n次方。当放置数据填充满队列(即到达2^n-1位置)之后,再填充数据,就会从0开始,覆盖之前的数据,于是就相当于一个环。

实际上,在这些框架中取余并不是使用%运算,都是使用的&与运算,这就要求你设置的大小一般是2的N次方也就是,10,100,1000等等,这样减去1的话就是,1,11,111,就能很好的使用index & (size -1),这样利用位运算就增加了访问速度。 如果在Disruptor中你不用2的N次方进行大小设置,他会抛出buffersize必须为2的N次方异常。

(2)CAS乐观锁

原子变量能够保证原子性的操作,意思是某个任务在执行过程中,要么全部成功,要么全部失败回滚,恢复到执行之前的初态,不存在初态和成功之间的中间状态。例如CAS操作,要么比较并交换成功,要么比较并交换失败。由CPU保证原子性。
通过原子变量可以实现线程安全。执行某个任务的时候,先假定不会有冲突,若不发生冲突,则直接执行成功;当发生冲突的时候,则执行失败,回滚再重新操作,直到不发生冲突。

image.png

如图所示,Thread1和Thread2都要把Entry加1。若不加锁,也不使用CAS,有可能Thread1取到了myValue=1,Thread2也取到了myValue=1,然后相加,Entry中的value值为2。这与预期不相符,我们预期的是Entry的值经过两次相加后等于3。
CAS会先把Entry现在的value跟线程当初读出的值相比较,若相同,则赋值;若不相同,则赋值执行失败。一般会通过while/for循环来重新执行,直到赋值成功。

(3)Padding消除伪共享

为了解决缓存行出现的问题,在Disruptor中采用了Padding的方式来消除伪共享。
关于什么是伪共享可以查阅我之前写过的博客:伪共享
其中的Value就被其他一些无用的long变量给填充了。这样你修改Value的时候,就不会影响到其他变量的缓存行。

class LhsPadding
{
    protected long p1, p2, p3, p4, p5, p6, p7;
}

class Value extends LhsPadding
{
    protected volatile long value;
}

class RhsPadding extends Value
{
    protected long p9, p10, p11, p12, p13, p14, p15;
}

最后顺便一提,在jdk8中提供了@Contended的注解,当然一般来说只允许Jdk中内部,如果你自己使用那就得配置Jvm参数_ -RestricContentended = fase_,将限制这个注解置位取消。很多文章分析了ConcurrentHashMap,但是都把这个注解给忽略掉了,在ConcurrentHashMap中就使用了这个注解,在ConcurrentHashMap每个桶都是单独的用计数器去做计算,而这个计数器由于时刻都在变化,所以被用这个注解进行填充缓存行优化,以此来增加性能。

image.png
image.png

3.Disruptor的设计方案

(1)单生产者写

  1. 申请写入m个元素;
  2. 若是有m个元素可以入,则返回最大的序列号。这儿主要判断是否会覆盖未读的元素;
  3. 若是返回的正确,则生产者开始写入元素。
image.png
image.png
image.png

(2)多生产者读

多生产者读需要注意,如何防止读取的时候,读到还未写的元素。Disruptor在多个生产者的情况下,引入了一个与Ring Buffer大小相同的buffer:available Buffer。当某个位置写入成功的时候,便把availble Buffer相应的位置置位,标记为写入成功。读取的时候,会遍历available Buffer,来判断元素是否已经就绪。

  1. 申请读取到序号n;
  2. 若writer cursor >= n,这时仍然无法确定连续可读的最大下标。从reader cursor开始读取available Buffer,一直查到第一个不可用的元素,然后返回最大连续可读元素的位置;
  3. 消费者读取元素。

如下图所示,读线程读到下标为2的元素,三个线程Writer1/Writer2/Writer3正在向RingBuffer相应位置写数据,写线程被分配到的最大元素下标是11。
读线程申请读取到下标从3到11的元素,判断writer cursor>=11。然后开始读取availableBuffer,从3开始,往后读取,发现下标为7的元素没有生产成功,于是WaitFor(11)返回6。
然后,消费者读取下标从3到6共计4个元素。

image.png

(3)多生产者写

多个生产者的情况下,会遇到“如何防止多个线程重复写同一个元素”的问题。Disruptor的解决方法是,每个线程获取不同的一段数组空间进行操作。这个通过CAS很容易达到。只需要在分配元素的时候,通过CAS判断一下这段空间是否已经分配出去即可。

  1. 申请写入m个元素;
  2. 若是有m个元素可以写入,则返回最大的序列号。每个生产者会被分配一段独享的空间;
  3. 生产者写入元素,写入元素的同时设置available Buffer里面相应的位置,以标记自己哪些位置是已经写入成功的。

如下图所示,Writer1和Writer2两个线程写入数组,都申请可写的数组空间。Writer1被分配了下标3到下表5的空间,Writer2被分配了下标6到下标9的空间。
Writer1写入下标3位置的元素,同时把available Buffer相应位置置位,标记已经写入成功,往后移一位,开始写下标4位置的元素。Writer2同样的方式。最终都写入完成。

image.png

4.Disruptor的使用

public static void main(String[] args) throws Exception {
        // 队列中的元素
        class Element {
            @Contended
            private String value;

            public String getValue() {
                return value;
            }

            public void setValue(String value) {
                this.value = value;
            }
        }

        // 生产者的线程工厂
        ThreadFactory threadFactory = new ThreadFactory() {
            int i = 0;
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "simpleThread" + String.valueOf(i++));
            }
        };

        // RingBuffer生产工厂,初始化RingBuffer的时候使用
        EventFactory<Element> factory = new EventFactory<Element>() {
            @Override
            public Element newInstance() {
                return new Element();
            }
        };

        // 处理Event的handler
        EventHandler<Element> handler = new EventHandler<Element>() {
            @Override
            public void onEvent(Element element, long sequence, boolean endOfBatch) throws InterruptedException {
                System.out.println("Element: " + Thread.currentThread().getName() + ": " + element.getValue() + ": " + sequence);
//                Thread.sleep(10000000);
            }
        };


        // 阻塞策略
        BlockingWaitStrategy strategy = new BlockingWaitStrategy();

        // 指定RingBuffer的大小
        int bufferSize = 8;

        // 创建disruptor,采用单生产者模式
        Disruptor<Element> disruptor = new Disruptor(factory, bufferSize, threadFactory, ProducerType.SINGLE, strategy);

        // 设置EventHandler
        disruptor.handleEventsWith(handler);

        // 启动disruptor的线程
        disruptor.start();
        for (int i = 0; i < 10; i++) {
            disruptor.publishEvent((element, sequence) -> {
                System.out.println("之前的数据" + element.getValue() + "当前的sequence" + sequence);
                element.setValue("我是第" + sequence + "个");
            });

        }
    }

Disruptor中的关键点:
ThreadFactory:这是一个线程工厂,用于我们Disruptor中生产者消费的时候需要的线程。
EventFactory:事件工厂,用于产生我们队列元素的工厂,在Disruptor中,他会在初始化的时候直接填充满RingBuffer,一次到位。
EventHandler:用于处理Event的handler,这里一个EventHandler可以看做是一个消费者,但是多个EventHandler他们都是独立消费的队列。
WorkHandler:也是用于处理Event的handler,和上面区别在于,多个消费者都是共享同一个队列。 WaitStrategy:等待策略,在Disruptor中有多种策略,来决定消费者获消费时,如果没有数据采取的策略是什么?下面简单列举一下Disruptor中的部分策略

  • BlockingWaitStrategy:通过线程阻塞的方式,等待生产者唤醒,被唤醒后,再循环检查依赖的sequence是否已经消费。
  • BusySpinWaitStrategy:线程一直自旋等待,可能比较耗cpu
  • LiteBlockingWaitStrategy:线程阻塞等待生产者唤醒,与BlockingWaitStrategy相比,区别在signalNeeded.getAndSet,如果两个线程同时访问一个访问waitfor,一个访问signalAll时,可以减少lock加锁次数.
  • LiteTimeoutBlockingWaitStrategy:与LiteBlockingWaitStrategy相比,设置了阻塞时间,超过时间后抛异常。
  • YieldingWaitStrategy:尝试100次,然后Thread.yield()让出cpu

EventTranslator:实现这个接口可以将我们的其他数据结构转换为在Disruptor中流通的Event。

5.Disruptor的工作原理

对于生产者来说,可以分为多生产者和单生产者,用ProducerType.Single,和ProducerType.MULTI区分,多生产者和单生产者来说多了CAS,因为单生产者由于是单线程,所以不需要保证线程安全。
在disruptor中通常用disruptor.publishEvent和disruptor.publishEvents()进行单发和群发。
在disruptor发布一个事件进入队列需要下面几个步骤:

  1. 首先获取RingBuffer中下一个在RingBuffer上可以发布的位置,这个可以分为两类:
  • 从来没有写过的位置
  • 已经被所有消费者读过,可以在写的位置。 如果没有读取到会一直尝试去读,disruptor做的很巧妙,并没有一直占据CPU,而是通过LockSuport.park(),进行了一下将线程阻塞挂起操作,为的是不让CPU一直进行这种空循环,不然其他线程都抢不到CPU时间片。
image.png
  1. 接下来调用我们上面所介绍的EventTranslator将第一步中RingBuffer中那个位置的event交给EventTranslator进行重写。
  2. 进行发布,在disruptor还有一个额外的数组用来记录当前ringBuffer所在位置目前最新的序列号是多少,拿上面那个0,10,20举例,写到10的时候这个avliableBuffer就在对应的位置上记录目前这个是属于10,有什么用呢后面会介绍。进行发布的时候需要更新这个avliableBuffer,然后进行唤醒所有阻塞的生产者。

下面简单画一下流程,上面我们拿10举例是不对的,因为bufferSize必须要2的N次方,所以我们这里拿Buffersize=8来举例:下面介绍了当我们已经push了8个event也就是一圈的时候,接下来再push 3条消息的一些过程: 1.首先调用next(3),我们当前在7这个位置上所以接下来三条是8,9,10,取余也就是0,1,2。 2.重写0,1,2这三个内存区域的数据。 3.写avaliableBuffer。

image.png

对于消费者来说,上面介绍了分为两种,一种是多个消费者独立消费,一种是多个消费者消费同一个队列,这里介绍一下较为复杂的多个消费者消费同一个队列,能理解这个也就能理解独立消费。 在我们的disruptor.strat()方法中会启动我们的消费者线程以此来进行后台消费。在消费者中有两个队列需要我们关注,一个是所有消费者共享的进度队列,还有个是每个消费者独立消费进度队列。
1.对消费者共享队列进行下一个Next的CAS抢占,以及对自己消费进度的队列标记当前进度。
2.为自己申请可读的RingBuffer的Next位置,这里的申请不仅仅是申请到next,有可能会申请到比Next大的一个范围,阻塞策略的申请的过程如下:

  • 获取生产者对RingBuffer最新写的位置
  • 判断其是否小于我要申请读的位置
  • 如果大于则证明这个位置已经写了,返回给生产者。
  • 如果小于证明还没有写到这个位置,在阻塞策略中会进行阻塞,其会在生产者提交阶段进行唤醒。

3.对这个位置进行可读校验,因为你申请的位置可能是连续的,比如生产者目前在7,接下来申请读,如果消费者已经把8和10这个序列号的位置写进去了,但是9这个位置还没来得及写入,由于第一步会返回10,但是9其实是不能读的,所以得把位置向下收缩到8。

579cd318c79da64b5d8b965cd66902b.jpg

4.如果收缩完了之后比当前next要小,则继续循环申请。
5.交给handler.onEvent()处理
举个例子:
我们要申请next=8这个位置。
1.首先在共享队列抢占进度8,在独立队列写入进度7
2.获取8的可读的最大位置,这里根据不同的策略进行,我们选择阻塞,由于生产者生产了8,9,10,所以返回的是10,这样和后续就不需要再次和avaliableBuffer进行对比了。
3.最后交给handler进行处理。

a7a52186e31351545bebc48ab018cb5.jpg

参考文献:
你应该知道的高性能无锁队列Disruptor
美团技术团队文章-高性能队列Disruptor


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK