0

Disruptor测试结果运算1亿次,耗时5503ms,吞吐量18171000/s,于是我扒开了Disruptor...

 1 year ago
source link: https://blog.51cto.com/boxuegu/5692566
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Disruptor测试结果运算1亿次,耗时5503ms,吞吐量18171000/s,于是我扒开了Disruptor高性能的外衣

精选 原创

博学谷狂野架构师 2022-09-20 15:28:32 博主文章分类:技术干货 ©著作权

文章标签 后端 java 文章分类 Java 编程语言 yyds干货盘点 阅读数181

hello 小伙伴儿们,昨天搞了一篇Disruptor的入门文章,看大家反馈不错,在大家一再催更下,昨天熬夜至下班,终于续写了第二篇Disruptor的高性能原理剖析的文章,为大家揭开Disruptor高性能的神秘外衣。
如果小伙伴,错过了入门Disruptor的入门篇的文章,在这里自行查看:

 如此狂妄,自称高性能队列的Disruptor有啥来头?

能对比测试

为了直观地感受 Disruptor 有多快,设计了一个性能对比测试:Producer 发布 1 亿次事件,从发布第一个事件开始计时,捕捉 Consumer 处理完所有事件的耗时。

测试用例在 Producer 如何将事件通知到 Consumer 的实现方式上,设计了两种不同的实现:

  1. Producer 的事件发布和 Consumer 的事件处理在不同的线程,通过 ArrayBlockingQueue 传递给 Consumer 进行处理;
  2. Producer 的事件发布和 Consumer 的事件处理在不同的线程,通过 Disruptor 传递给 Consumer 进行处理;

3.1 代码实现

3.1.1 计算代码

进行CAS累加运算

public class CommonUtils {
    private static AtomicLong count = new AtomicLong(0);

    public static void calculation() {
        count.incrementAndGet();
    }

    public static long get() {
        return count.get();
    }
}
3.1.2 抽象类

进行一亿次 CAS运算计算耗时

/**
 * 抽象类
 *
 * @param <T>
 */
public abstract class AbstractTask<T> {
    private static final Logger logger = LoggerFactory.getLogger(AbstractTask.class);
    //线程池
    private static final ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() + 1);
    private static final CountDownLatch countDownLatch = new CountDownLatch(1);
    //一亿次测试
    public static long tasksize = 100000000;


    /**
     * 开始调用测试
     */
    public void invok() {
        //计算当前事件
        long currentTime = System.currentTimeMillis();
        //获取到监听器
        Runnable monitor = monitor();
        if (null != monitor) {
            executor.execute(monitor);
        }
        //启动
        start();

        //执行任务发布
        Runnable runnable = getTask();
        for (long i = 0; i < tasksize; i++) {
            runnable.run();
        }

        //停止任务
        stop();
        //等待任务发布完成
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        executor.shutdown();
        //获取处理结果
        T result = getResult();
        //计算耗时
        long duration = System.currentTimeMillis() - currentTime;
        //计算吞吐量
        long throughput = (tasksize / duration) * 1000;
        logger.info("每秒吞吐量:[{}];({}/{})", throughput, result, duration);
    }


    /**
     * 获取监听器
     *
     * @return
     */
    public Runnable monitor() {
        return null;
    }

    /**
     * 启动任务
     */
    public void start() {

    }

    /**
     * 完成任务
     */
    public void complete() {
        countDownLatch.countDown();
    }

    /**
     * 停止任务
     */
    public void stop() {

    }

    /**
     * 获取需要执行的任务
     *
     * @return
     */
    public abstract Runnable getTask();

    /**
     * 获取运行结果
     *
     * @return
     */
    public abstract T getResult();
}

3.1.3 Disruptor性能测试代码
public class DisruptorTest extends AbstractTask<Long> {
    //定义随机数生成器
    private static final Random r = new Random();
    //定义Disruptor对象
    private Disruptor disruptor = null;
    //定义Disruptor事件发布对象
    private LongEventProducerWithTranslator translator = null;

    /**
     * 启动
     */
    @Override
    public void start() {
        //定义事件工厂
        EventFactory<LongEvent> eventFactory = new LongEventFactory();
        // RingBuffer 大小,必须是 2 的 N 次方;
        int ringBufferSize = 1024 * 1024;
        //构建disruptor对象
        disruptor = new Disruptor<LongEvent>(eventFactory,
                ringBufferSize, Executors.defaultThreadFactory(), ProducerType.SINGLE,
                new YieldingWaitStrategy());
        //定义事件处理类
        EventHandler<LongEvent> eventHandler = new LongEventHandler();
        //配置事件处理类
        disruptor.handleEventsWith(eventHandler);
        //启动disruptor
        disruptor.start();
        //创建事件发布对象
        translator = new LongEventProducerWithTranslator(disruptor.getRingBuffer());
    }

    /**
     * 停止任务
     */
    @Override
    public void stop() {
        disruptor.shutdown();
        System.out.println("运算结果:" + CommonUtils.get());
        //完成任务
        complete();
    }

    /**
     * 获取需要执行的任务
     *
     * @return
     */
    @Override
    public Runnable getTask() {
        return () -> {
            publishEvent();
        };
    }

    /**
     * 获取运行结果
     *
     * @return
     */
    @Override
    public Long getResult() {
        return CommonUtils.get();
    }


    /**
     * 发布对象
     */
    private void publishEvent() {
        //获取要通过事件传递的业务数据
        Long data = r.nextLong();
        // 发布事件
        translator.onData(data);
    }


    public static void main(String[] args) {
        DisruptorTest disruptorTest = new DisruptorTest();
        disruptorTest.invok();
    }

}
10:45:22.941 [main] INFO com.heima.task.AbstractTask - 每秒吞吐量:[18171000];(100000000/5503)
ArrayBlockingQueue性能测试代码
public class ArrayBlockingQueueTest extends AbstractTask {
    private static final Random r = new Random();
    private static final ArrayBlockingQueue<Long> queue = new ArrayBlockingQueue(10000000);


    @Override
    public Runnable monitor() {
        return () -> {
            try {
                for (int i = 0; i < tasksize; i++) {
                    //获取一个元素
                    queue.take();
                    //执行计算
                    CommonUtils.calculation();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            complete();
        };
    }

    public static void main(String[] args) {
        ArrayBlockingQueueTest test = new ArrayBlockingQueueTest();
        test.invok();
    }

    @Override
    public Runnable getTask() {
        return () -> {
            publish();
        };
    }

    @Override
    public Object getResult() {
        return CommonUtils.get();
    }

    public void publish() {
        Long data = r.nextLong();
        try {
            queue.put(data);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
10:45:46.379 [main] INFO com.heima.task.AbstractTask - 每秒吞吐量:[6192000];(100000000/16148)

3.2 测试对比

测试类 运算次数 耗时(ms) 吞吐量/s
ArrayBlockingQueue 1亿次 16148 6192000
Disruptor 1亿次 5503 18171000

3.3 Disruptor官方性能测试

Disruptor论文中讲述了一个实验:

  • 这个测试程序调用了一个函数,该函数会对一个64位的计数器循环自增5亿次。
  • 机器环境:2.4G 6核
  • 运算: 64位的计数器累加5亿次
Method Time (ms)
单线程 300
单线程使用 CAS 5,700
单线程使用锁 10,000
单线程使用volatile 4,700
多线程使用 CAS 30,000
多线程使用锁 224,000

4. 高性能原理

  • 引入环形的数组结构:数组元素不会被回收,避免频繁的GC,
  • 无锁的设计:采用CAS无锁方式,保证线程的安全性
  • 属性填充:通过添加额外的无用信息,避免伪共享问题
  • 元素位置的定位:采用跟一致性哈希一样的方式,一个索引,进行自增

4.1 伪共享概念

4.1.1 计算机缓存构成

​ 下图是计算的基本结构。L1、L2、L3分别表示一级缓存、二级缓存、三级缓存,越靠近CPU的缓存,速度越快,容量也越小,所以L1缓存很小但很快,并且紧靠着在使用它的CPU内核;L2大一些,也慢一些,并且仍然只能被一个单独的CPU核使用;L3更大、更慢,并且被单个插槽上的所有CPU核共享;最后是主存,由全部插槽上的所有CPU核共享。

Disruptor测试结果运算1亿次,耗时5503ms,吞吐量18171000/s,于是我扒开了Disruptor高性能的外衣_java

​ 当CPU要读取一个数据时,首先从一级缓存中查找,如果没有找到再从二级缓存中查找,如果还是没有就从三级缓存或内存中查找。一般来说,每级缓存的命中率大概都在80%左右,也就是说全部数据量的80%都可以在一级缓存中找到,只剩下20%的总数据量才需要从二级缓存、三级缓存或内存中读取,由此可见一级缓存是整个CPU缓存架构中最为重要的部分。

Disruptor测试结果运算1亿次,耗时5503ms,吞吐量18171000/s,于是我扒开了Disruptor高性能的外衣_后端_02

下表是一些缓存未命中的消耗数据:

从CPU到 大约需要的CPU周期 大约需要的时间
主存 约60-80ns
QPI总线 约20ns
L3 cache 约40-45cycles 约15ns
L2 cache 约10cycles 约3ns
L1 cache 约3-4cycles 约1ns
寄存器 1cycle

可见CPU读取主存中的数据会比从L1中读取慢了近2个数量级。

4.1.2 什么是缓存行

​ 为了解决计算机系统中主内存与 CPU 之间运行速度差问题,会在 CPU 与主内存之间 添加一级或者多级高速缓冲存储器( Cache)。这个 Cache 一般是被集成到 CPU 内部的, 所以也叫 CPU Cache,如图所示是两级 Cache 结构。

Disruptor测试结果运算1亿次,耗时5503ms,吞吐量18171000/s,于是我扒开了Disruptor高性能的外衣_后端_03

​ Cache内部是按行存储的,其中每一行称为一个cache line,由很多个 Cache line 组成的,Cache line 是 cache 和 RAM 交换数据的最小单位,cache行的大小一般为2的幂次数字节,通常为 64 Byte。Cache line是Cache与主内存进行数据交换的单位。

Disruptor测试结果运算1亿次,耗时5503ms,吞吐量18171000/s,于是我扒开了Disruptor高性能的外衣_java_04

​ 当 CPU 把内存的数据载入 cache 时,会把临近的共 64 Byte 的数据一同放入同一个Cache line,因为空间局部性:临近的数据在将来被访问的可能性大。

linux 查看缓存行大小

more /sys/devices/system/cpu/cpu1/cache/index0/coherency_line_size
64
4.1.3 什么是共享

​ CPU缓存是以缓存行(cache line)为单位存储的。缓存行通常是 64 字节,并且它有效地引用主内存中的一块地址。一个 Java 的 long 类型是 8 字节,因此在一个缓存行中可以存 8 个 long 类型的变量。所以,如果你访问一个 long 数组,当数组中的一个值被加载到缓存中,它会额外加载另外 7 个,以致你能非常快地遍历这个数组。事实上,你可以非常快速的遍历在连续的内存块中分配的任意数据结构。而如果你在数据结构中的项在内存中不是彼此相邻的(如链表),你将得不到免费缓存加载所带来的优势,并且在这些数据结构中的每一个项都可能会出现缓存未命中。下图是一个CPU缓存行的示意图:

Disruptor测试结果运算1亿次,耗时5503ms,吞吐量18171000/s,于是我扒开了Disruptor高性能的外衣_java_05

​ 表面上 X 和 Y 都是被独立线程操作的,而且两操作之间也没有任何关系。只不过它们共享了一个缓存行,但所有竞争冲突都是来源于共享。

4.1.4 什么是伪共享

​ 当CPU访问某一个变量时候,首先会去看CPU Cache内是否有该变量,如果有则直接从中获取,否者就去主内存里面获取该变量,然后把该变量所在内存区域的一个Cache行大小的内存拷贝到Cache(cache行是Cache与主内存进行数据交换的单位)。

​ 由于存放到Cache行的的是内存块而不是单个变量,所以可能会把多个变量存放到了一个cache行。当多个线程同时修改一个缓存行里面的多个变量时候,由于同时只能有一个线程操作缓存行,所以相比每个变量放到一个缓存行性能会有所下降,这就是伪共享。

Disruptor测试结果运算1亿次,耗时5503ms,吞吐量18171000/s,于是我扒开了Disruptor高性能的外衣_java_06

​ 如上图变量x,y同时被放到了CPU的一级和二级缓存,当线程1使用CPU1对变量x进行更新时候,首先会修改cpu1的一级缓存变量x所在缓存行,这时候缓存一致性协议会导致cpu2中变量x对应的缓存行失效,那么线程2写入变量x的时候就只能去二级缓存去查找,这就破坏了一级缓存,而一级缓存比二级缓存更快。更坏的情况下如果cpu只有一级缓存,那么会导致频繁的直接访问主内存。

​ 我们的缓存都是以缓存行作为一个单位来处理的,所以失效x的缓存的同时,也会把y失效,反之亦然。

4.1.5 为何会出现伪共享

​ 伪共享的产生是因为多个变量被放入了一个缓存行,并且多个线程同时去写入缓存行中不同变量。那么为何多个变量会被放入一个缓存行那。其实是因为Cache与内存交换数据的单位就是Cache line,当CPU要访问的变量没有在Cache命中时候,根据程序运行的局部性原理会把该变量在内存中大小为Cache行的内存放如缓存行。

long a;
long b;
long c;
long d;

​ 如上代码,声明了四个long变量,假设cache line的大小为32个字节,那么当cpu访问变量a时候发现该变量没有在cache命中,那么就会去主内存把变量a以及内存地址附近的b,c,d放入缓存行。也就是地址连续的多个变量才有可能会被放到一个缓存行中,当创建数组时候,数组里面的多个元素就会被放入到同一个缓存行。那么单线程下多个变量放入缓存行对性能有影响?其实正常情况下单线程访问时候由于数组元素被放入到了一个或者多个cache行对代码执行是有利的,因为数据都在缓存中,代码执行会更快。

4.1.6 如何解伪共享

​ 解决伪共享最直接的方法就是填充(padding),例如下面的VolatileLong,一个long占8个字节,Java的对象头占用8个字节(32位系统)或者12字节(64位系统,默认开启对象头压缩,不开启占16字节)。一个缓存行64字节,那么我们可以填充6个long(6 * 8 = 48 个字节)。

4.1.6.1 不使用字段填充
public class VolatileData {
    // 占用 8个字节 +48 + 对象头 = 64字节

    //需要操作的数据
    volatile long value;

    public VolatileData() {
    }

    public VolatileData(long defValue) {
        value = defValue;
    }

    public long accumulationAdd() {
        //因为单线程操作不需要加锁
        value++;
        return value;
    }

    public long getValue() {
        return value;
    }
}
Disruptor测试结果运算1亿次,耗时5503ms,吞吐量18171000/s,于是我扒开了Disruptor高性能的外衣_java_07
4.6.1.2 填充字段

因为JDK1.7以后就自动优化代码会删除无用的代码,在JDK1.7以后的版本这些不生效了。

/**
 * 缓存行填充父类
 */
public class DataPadding {
    //填充 5个long类型字段 8*5 = 40 个字节
    private long p1, p2, p3, p4, p5; //jvm 优化 删除无用代码
    //需要操作的数据
    volatile long value;
}
Disruptor测试结果运算1亿次,耗时5503ms,吞吐量18171000/s,于是我扒开了Disruptor高性能的外衣_java_08
4.1.6.3 继承的方式
/**
 * 缓存行填充父类
 */
public class DataPadding {
    //填充 5个long类型字段 8*5 = 40 个字节
    private long p1, p2, p3, p4, p5;
}

继承缓存填充类

/**
 * 继承DataPadding
 */
public class VolatileData extends DataPadding {
    // 占用 8个字节 +48 + 对象头 = 64字节

    public VolatileData() {
    }

    public VolatileData(long defValue) {
        value = defValue;
    }

    public long accumulationAdd() {
        //因为单线程操作不需要加锁
        value++;
        return value;
    }

    public long getValue() {
        return value;
    }
}
Disruptor测试结果运算1亿次,耗时5503ms,吞吐量18171000/s,于是我扒开了Disruptor高性能的外衣_后端_09
4.1.6.4 Disruptor填充方式
class LhsPadding {
    protected long p1, p2, p3, p4, p5, p6, p7;
}

class Value extends LhsPadding {
    protected volatile long value;
}

class RhsPadding extends Value {
    protected long p9, p10, p11, p12, p13, p14, p15;
}

继承填充类

public class VolatileData extends RhsPadding {
    // 占用 8个字节 +48 + 对象头 = 64字节

    //需要操作的数据
    volatile long value;

    public VolatileData() {
    }

    public VolatileData(long defValue) {
        value = defValue;
    }

    public long accumulationAdd() {
        //因为单线程操作不需要加锁
        value++;
        return value;
    }

    public long getValue() {
        return value;
    }
}
Disruptor测试结果运算1亿次,耗时5503ms,吞吐量18171000/s,于是我扒开了Disruptor高性能的外衣_后端_10
4.1.6.5 @Contended注解
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.FIELD, ElementType.TYPE})
public @interface Contended {
    String value() default "";
}

注解填充类

@Contended
public class VolatileData  {
    // 占用 8个字节 +48 + 对象头 = 64字节

    //需要操作的数据
    volatile long value;
    
    public VolatileData() {
    }

    public VolatileData(long defValue) {
        value = defValue;
    }

    public long accumulationAdd() {
        //因为单线程操作不需要加锁
        value++;
        return value;
    }

    public long getValue() {
        return value;
    }
}
Disruptor测试结果运算1亿次,耗时5503ms,吞吐量18171000/s,于是我扒开了Disruptor高性能的外衣_后端_11

注意事项

​ 在Java8中提供了**@sun.misc.Contended来避免伪共享时,在运行时需要设置JVM启动参数-XX:-RestrictContended**否则可能不生效。

4.1.7 性能对比
4.1.7.1 测试代码

使用和不使用缓存行填充的对比

/**
 * 缓存行测试
 */
public class CacheLineTest {
    /**
     * 通过缓存行填充的变量
     */
    private VolatileData volatileData1 = new VolatileData(0);
    private VolatileData volatileData2 = new VolatileData(0);
    private VolatileData volatileData3 = new VolatileData(0);
    private VolatileData volatileData4 = new VolatileData(0);
    private VolatileData volatileData5 = new VolatileData(0);
    private VolatileData volatileData6 = new VolatileData(0);
    private VolatileData volatileData7 = new VolatileData(0);

    /**
     * 循环次数
     */
    private final long size = 100000000;

    /**
     * 进行累加操作
     */
    public void accumulationX(VolatileData volatileData) {
        //计算耗时
        long currentTime = System.currentTimeMillis();
        long value = 0;
        //循环累加
        for (int i = 0; i < size; i++) {
            //使用缓存行填充的方式
            value = volatileData.accumulationAdd();


        }
        //打印
        System.out.println(value);
        //打印耗时
        System.out.println("耗时:" + (System.currentTimeMillis() - currentTime));
    }


    public static void main(String[] args) {
        //创建对象
        CacheLineTest cacheRowTest = new CacheLineTest();
        //创建线程池
        ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
        //启动三个线程个调用他们各自的方法
        executorService.execute(() -> cacheRowTest.accumulationX(cacheRowTest.volatileData1));
        executorService.execute(() -> cacheRowTest.accumulationX(cacheRowTest.volatileData2));
        executorService.execute(() -> cacheRowTest.accumulationX(cacheRowTest.volatileData3));
        executorService.execute(() -> cacheRowTest.accumulationX(cacheRowTest.volatileData4));
        executorService.execute(() -> cacheRowTest.accumulationX(cacheRowTest.volatileData5));
        executorService.execute(() -> cacheRowTest.accumulationX(cacheRowTest.volatileData6));
        executorService.execute(() -> cacheRowTest.accumulationX(cacheRowTest.volatileData7));
        executorService.shutdown();
    }
}
4.1.7.2 测试数据

同样的结构他们之间差了 将近 50倍的速度差距

对象 NoPadding(MS) DataPadding(MS) RhsPadding(MS) Contended(MS)
volatileData1 3751 1323 1307 1291
volatileData2 3790 1383 1311 1314
volatileData3 7551 1400 1311 1333
volatileData4 7669 1407 1317 1356
volatileData5 8577 1447 1327 1361
volatileData6 8705 1479 1339 1375
volatileData6 8741 1512 1368 1389
4.1.8 Disruptor解决伪共享

​ 在Disruptor中有一个重要的类Sequence,该类包装了一个volatile修饰的long类型数据value,无论是Disruptor中的基于数组实现的缓冲区RingBuffer,还是生产者,消费者,都有各自独立的Sequence,RingBuffer缓冲区中,Sequence标示着写入进度,例如每次生产者要写入数据进缓冲区时,都要调用RingBuffer.next()来获得下一个可使用的相对位置。对于生产者和消费者来说,Sequence标示着它们的事件序号,来看看Sequence类的源码:

class LhsPadding {
	protected long p1, p2, p3, p4, p5, p6, p7;
}

class Value extends LhsPadding {
	protected volatile long value;
}

class RhsPadding extends Value {
	protected long p9, p10, p11, p12, p13, p14, p15;
}

public class Sequence extends RhsPadding {
	static final long INITIAL_VALUE = -1L;
	private static final Unsafe UNSAFE;
	private static final long VALUE_OFFSET;
	static {
		UNSAFE = Util.getUnsafe();
		try {
			VALUE_OFFSET = UNSAFE.objectFieldOffset(Value.class.getDeclaredField("value"));
		} catch(final Exception e) {
			 throw new RuntimeException(e);
		}
	}
	


    public Sequence() {
        this(INITIAL_VALUE);
    }

    public Sequence(final long initialValue) {
        UNSAFE.putOrderedLong(this, VALUE_OFFSET, initialValue);
    }

}

​ 从第1到11行可以看到,真正使用到的变量value,它的前后空间都由8个long型的变量填补了,对于一个大小为64字节的缓存行,它刚好被填补满(一个long型变量value,8个字节加上前/后个7long型变量填补,7*8=56,56+8=64字节)。这样做每次把变量value读进高速缓存中时,都能把缓存行填充满(对于大小为64个字节的缓存行来说,如果缓存行大小大于64个字节,那么还是会出现伪共享问题),保证每次处理数据时都不会与其他变量发生冲突。

4.2 无锁的设计

4.2.1 锁机制存在的问题
  • 在多线程竞争下,加锁、释放锁会导致比较多的上下文切换和调度延时,引起性能问题,而且在上下文切换的时候,cpu之前缓存的指令和数据都将失效,对性能有很大的损失,用户态的锁虽然避免了这些问题,但是其实它们只是在没有真实的竞争时才有效。

  • 一个线程持有锁会导致其它所有需要此锁的线程挂起直至该锁释放。

  • 如果一个优先级高的线程等待一个优先级低的线程释放锁会导致导致优先级反转(Priority Inversion),引起性能风险。

4.2.2 CAS无锁算法

​ 实现无锁(lock-free)的非阻塞算法有多种实现方法,其中 CAS(比较与交换,Compare and swap) 是一种有名的无锁算法。CAS的语义是“我认为V的值应该为A,如果是,那么将V的值更新为B,否则不修改并告诉V的值实际为多少”,CAS是一种 乐观锁 技术,当多个线程尝试使用CAS同时更新同一个变量时,只有其中一个线程能更新变量的值,而其它线程都失败,失败的线程并不会被挂起,而是被告知这次竞争中失败,并可以再次尝试。CAS有3个操作数,内存值V,旧的预期值A,要修改的新值B。当且仅当预期值A和内存值V相同时,将内存值V修改为B,否则什么都不做。

​ 这是一个CPU级别的指令,在我的意识中,它的工作方式有点像乐观锁——CPU去更新一个值,但如果想改的值不再是原来的值,操作就失败,因为很明显,有其它操作先改变了这个值。

Disruptor测试结果运算1亿次,耗时5503ms,吞吐量18171000/s,于是我扒开了Disruptor高性能的外衣_java_12

注意,这可以是CPU的两个不同的核心,但不会是两个独立的CPU。

​ CAS操作比锁消耗资源少的多,因为它们不牵涉操作系统,它们直接在CPU上操作。但它们并非没有代价——在上面的试验中,单线程无锁耗时300ms,单线程有锁耗时10000ms,单线程使用CAS耗时5700ms。所以它比使用锁耗时少,但比不需要考虑竞争的单线程耗时多。

4.2.3 传统队列问题

队列的底层数据结构一般分成三种:数组、链表和堆。其中,堆这里是为了实现带有优先级特性的队列,暂且不考虑。

队列 有界性 数据结构
ArrayBlockingQueue bounded 加锁 arraylist
LinkedBlockingQueue optionally-bounded 加锁 linkedlist
ConcurrentLinkedQueue unbounded 无锁 linkedlist
LinkedTransferQueue unbounded 无锁 linkedlist
PriorityBlockingQueue unbounded 加锁 heap
DelayQueue unbounded 加锁 heap

​ 在稳定性和性能要求特别高的系统中,为了防止生产者速度过快,导致内存溢出,只能选择有界队列;

​ 同时,为了减少Java的垃圾回收对系统性能的影响,会尽量选择array/heap格式的数据结构。这样筛选下来,符合条件的队列就只有ArrayBlockingQueue,但是ArrayBlockingQueue是通过加锁的方式保证线程安全,而且ArrayBlockingQueue还存在伪共享问题,这两个问题严重影响了性能。

4.2.3.1 Disruptor的无锁设计

​ 多线程环境下,多个生产者通过do/while循环的条件CAS,来判断每次申请的空间是否已经被其他生产者占据。假如已经被占据,该函数会返回失败,While循环重新执行,申请写入空间。

do
{
    current = cursor.get();
    next = current + n;

    if (!hasAvailableCapacity(gatingSequences, n, current))
    {
        throw InsufficientCapacityException.INSTANCE;
    }
}
while (!cursor.compareAndSet(current, next));
//next 类比于ArrayBlockQueue的数组索引index
return next;

4.3 环形数组结构

环形数组结构是整个Disruptor的核心所在。

4.3.1 什么是环形数组

​ RingBuffer 是一个环(首尾相连的环),用做在不同上下文(线程)间传递数据的buffer,RingBuffer 拥有一个序号,这个序号指向数组中下一个可用元素。

Disruptor测试结果运算1亿次,耗时5503ms,吞吐量18171000/s,于是我扒开了Disruptor高性能的外衣_后端_13
4.3.2 为什么使用环形数组

为了避免垃圾回收,采用数组而非链表。同时,数组对处理器的缓存机制更加友好

​ 首先因为是数组,所以要比链表快,而且根据我们对上面缓存行的解释知道,数组中的一个元素加载,相邻的数组元素也是会被预加载的,因此在这样的结构中,cpu无需时不时去主存加载数组中的下一个元素。

​ 而且,你可以为数组预先分配内存,使得数组对象一直存在(除非程序终止)。这就意味着不需要花大量的时间用于垃圾回收。

​ 此外,不像链表那样,需要为每一个添加到其上面的对象创造节点对象—对应的,当删除节点时,需要执行相应的内存清理操作。环形数组中的元素采用覆盖方式,避免了jvm的GC。

​ 其次结构作为环形,数组的大小为2的n次方,这样元素定位可以通过位运算效率会更高,这个跟一致性哈希中的环形策略有点像。在disruptor中,这个牛逼的环形结构就是RingBuffer,既然是数组,那么就有大小,而且这个大小必须是2的n次方,结构如下:

Disruptor测试结果运算1亿次,耗时5503ms,吞吐量18171000/s,于是我扒开了Disruptor高性能的外衣_java_14

​ 其实质只是一个普通的数组,只是当放置数据填充满队列(即到达2^n-1位置)之后,再填充数据,就会从0开始,覆盖之前的数据,于是就相当于一个环。

4.4 元素位置定位

​ 数组长度2^n,通过位运算,加快定位的速度。下标采取递增的形式。不用担心index溢出的问题。index是long类型,即使100万QPS的处理速度,也需要30万年才能用完。

4.5 等待策略

​ 定义 Consumer 如何进行等待下一个事件的策略。 (注:Disruptor 定义了多种不同的策略,针对不同的场景,提供了不一样的性能表现)根据实际运行环境的 CPU 的硬件特点选择恰当的策略,并配合特定的 JVM 的配置参数,能够实现不同的性能提升。

4.5.1 BlockingWaitStrategy

​ Disruptor的默认策略是BlockingWaitStrategy,在BlockingWaitStrategy内部是使用锁和condition来控制线程的唤醒

​ BlockingWaitStrategy是最低效的策略,但其对CPU的消耗最小并且在各种不同部署环境中能提供更加一致的性能表现。

4.5.2 SleepingWaitStrategy

​ SleepingWaitStrategy 的性能表现跟 BlockingWaitStrategy 差不多,对 CPU 的消耗也类似,但其对生产者线程的影响最小,通过使用LockSupport.parkNanos(1)来实现循环等待,适合用于异步日志类似的场景;

4.5.3 YieldingWaitStrategy

​ YieldingWaitStrategy是可以使用在低延迟系统的策略之一,YieldingWaitStrategy将自旋以等待序列增加到适当的值。在循环体内,将调用Thread.yield()以允许其他排队的线程运行。在要求极高性能且事件处理线数小于 CPU 逻辑核心数的场景中,推荐使用此策略;

4.5.4 BusySpinWaitStrategy

​ 性能最好,适合用于低延迟的系统。在要求极高性能且事件处理线程数小于CPU逻辑核心数的场景中,推荐使用此策略;

4.5.5 PhasedBackoffWaitStrategy

​ 自旋 + yield + 自定义策略,CPU资源紧缺,吞吐量和延迟并不 的场景。

本文由育博学谷狂野架构师发布
如果本文对您有帮助,欢迎关注和点赞;如果您有任何建议也可留言评论或私信,您的支持是我坚持创作的动力
转载请注明出处!

  • 收藏
  • 评论
  • 分享
  • 举报

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK