1

Disruptor 系列一

 2 years ago
source link: https://nicksxs.me/2022/02/13/Disruptor-%E7%B3%BB%E5%88%97%E4%B8%80/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Disruptor 系列一

发表于 2022-02-13 分类于 Java 阅读次数: 10 阅读次数: 12 Disqus: 0 Comments

很久之前就听说过这个框架,不过之前有点跟消息队列混起来,这个也是种队列,但不是跟 rocketmq,nsq 那种一样的,而是在进程内部提供队列服务的,偏向于取代ArrayBlockingQueue,因为这个阻塞队列是使用了锁来控制阻塞,关于并发其实有一些通用的最佳实践,就是用锁,即使是 JDK 提供的锁,也是比较耗资源的,当然这是跟不加锁的对比,同样是锁,JDK 的实现还是性能比较优秀的。常见的阻塞队列中例如 ArrayBlockingQueueLinkedBlockingQueue 都有锁的身影的存在,区别在于 ArrayBlockingQueue 是一把锁,后者是两把锁,不过重点不在几把锁,这里其实是两个问题,一个是所谓的 lock free, 对于一个单生产者的 disruptor 来说,因为写入是只有一个线程的,是可以不用加锁,多生产者的时候使用的是 cas 来获取对应的写入坑位,另一个是解决“伪共享”问题,后面可以详细点分析,先介绍下使用
首先是数据源

public class LongEvent {
    private long value;

    public void set(long value) {
        this.value = value;
    }

    public long getValue() {
        return value;
    }

    public void setValue(long value) {
        this.value = value;
    }
}
public class LongEventFactory implements EventFactory<LongEvent>
{
    public LongEvent newInstance()
    {
        return new LongEvent();
    }
}

事件处理器

public class LongEventHandler implements EventHandler<LongEvent> {

    // event 事件,
    // sequence 当前的序列 
    // 是否当前批次最后一个数据
    public void onEvent(LongEvent event, long sequence, boolean endOfBatch)
    {
        String str = String.format("long event : %s l:%s b:%s", event.getValue(), sequence, endOfBatch);
        System.out.println(str);
    }
}

主方法代码

package disruptor;

import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.util.DaemonThreadFactory;

import java.nio.ByteBuffer;

public class LongEventMain
{
    public static void main(String[] args) throws Exception
    {
        // 这个需要是 2 的幂次,这样在定位的时候只需要位移操作,也能减少各种计算操作
        int bufferSize = 1024; 

        Disruptor<LongEvent> disruptor = 
                new Disruptor<>(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE);

        // 类似于注册处理器
        disruptor.handleEventsWith(new LongEventHandler());
        // 或者直接用 lambda
        disruptor.handleEventsWith((event, sequence, endOfBatch) ->
                System.out.println("Event: " + event));
        // 启动我们的 disruptor
        disruptor.start(); 


        RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer(); 
        ByteBuffer bb = ByteBuffer.allocate(8);
        for (long l = 0; true; l++)
        {
            bb.putLong(0, l);
            // 生产事件
            ringBuffer.publishEvent((event, sequence, buffer) -> event.set(buffer.getLong(0)), bb);
            Thread.sleep(1000);
        }
    }
}

运行下可以看到运行结果

这里其实就只是最简单的使用,生产者只有一个,然后也不是批量的。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK