高性能队列Disruptor在测试中应用

| 2019-05-17

最近在研究goreplay的源码的过程中,感觉有些思路还是很值得借鉴。所以自己立了一个flag,实现一个千万级日志回放功能。但是在这个实现的过程中遇到一个棘手的问题:Java自带的LinkedBlockingQueue比较难以直接满足需求场景和性能要求。
 
熟悉goreplay的测友应该清楚Go语言chanel在goreplay这个框架中应用是十分广泛的,加上Go语言自身较高的性能,可以说双剑合并。所以我也想照葫芦画瓢写一个类似思路的实现。这个后面会有专题讲这个。
 
基于此,我搜到了Disruptor这个高性能队列。Disruptor是英国外汇交易公司LMAX开发的一个高性能队列,研发的初衷是解决内存队列的延迟问题(在性能测试中发现竟然与I/O操作处于同样的数量级)。基于Disruptor开发的系统单线程能支撑每秒600万订单。
 
测试使用Disruptor时候不用像Springboot框架中那样,创建各类对象,抽象各种对象方法,我的原则就是怎么简单怎么来,下面分享一下Disruptor在测试中的基础实践和简单案例演示。

依赖

// https://mvnrepository.com/artifact/com.lmax/disruptor
implementation group: 'com.lmax', name: 'disruptor', version: '3.4.2'

只列出了Gradle的,版本建议使用3+,有些Lambda语法支持需要。

Event对象
首先我们要定义一个Event类型,当然也可以直接使用类似java.lang.String使用已经存在的类,但是在设置Event对象时候,需要使用new关键字以及构造新的Event时,使用set方法总比直接=赋值的一致性更好一些。单独写一个Event类,可以更加简单,是的代码逻辑性更强,不用收到其他类的影响。

这里我定义了简单的Event类:

    public static class FunEvent {

        String id;

        public String getId() {
            return id;
        }

        public void setId(String id) {
            this.id = id;
        }

    }

Disruptor创建

Disruptor对象创建首先需要我们定义一个Event类型,调用构造方法,参数有五个。1.Event初始化方法;2.ringbuffsize,这里设置需要是2的整数倍;3.threadfactory,创建线程的工厂类,这里我用了com.funtester.frame.execute.ThreadPoolUtil#getFactory();4.生产者模式,分成单生产者和多生产者枚举类;5.等待策略,官方提供了一些实现类供选择,我使用了com.lmax.disruptor.YieldingWaitStrategy

创建方法如下:

        Disruptor<FunEvent> disruptor = new Disruptor<FunEvent>(
                FunEvent::new,
                1024 * 1024,
                ThreadPoolUtil.getFactory(),
                ProducerType.MULTI,
                new YieldingWaitStrategy()
        );

生产者

对于消息队列来讲,需要两个重要的角色,生产者和消费者。这里先将一下Disruptor生产者,我搜到不少资料,都是需要创建一个生产者的类,然后实现一个方法,这个方法内容基本一致的,内容如下:

           long sequence = ringBuffer.next();
            try {
                FunEvent funEvent = ringBuffer.get(sequence);
                funEvent.setId(orderId);
            } finally {
                ringBuffer.publish(sequence);
            }

然后使用生产者对象调用这个方法,我觉得有点多此一举了,幸好有一篇文章介绍了Disruptor一些新特性的时候提到支持了Lambda语法,这下就可以不用创建生产者对象了。语法如下:

                ringBuffer.publishEvent((Event, sequence) -> Event.setId(StringUtil.getString(10)));

消费者

消费者创建需要实现两个接口com.lmax.disruptor.EventHandlercom.lmax.disruptor.WorkHandler,这俩一个是处理单消费者模式,另外一个多消费者模式。

创建方法如下:

    /**
     * 消费者
     */
    private static class FunEventHandler implements EventHandler<FunEvent>, WorkHandler<FunEvent> {

        public void onEvent(FunEvent Event, long sequence, boolean endOfBatch) {
            output("消费消息:" + Event.getId() + TAB + sequence);
        }

        public void onEvent(FunEvent Event) {
            output("消费消息:" + Event.getId());
        }

    }

配置handler

这里分两类:配置单个消费者和配置多个消费者。
 
单个消费者:
 
disruptor.handleEventsWith(new FunEventHandler());
 
多个消费者:
 
disruptor.handleEventsWithWorkerPool(new FunEventHandler(), new FunEventHandler());
 
不管是单个还是多个,每次调用都会产生一个com.lmax.disruptor.dsl.EventHandlerGroup,每个com.lmax.disruptor.dsl.EventHandlerGroup都会完整消费每一个生产者产生的Event,如果设置了5次,那么一个Event就会被消费5次,每个com.lmax.disruptor.dsl.EventHandlerGroup消费一次,而且是阻塞的,加入某一个com.lmax.disruptor.dsl.EventHandlerGroup对象消费慢了,会阻塞其他消费者消费下一个Event。
 
启动
组装完成之后就可以启动了Disruptor了,语法如下:disruptor.start();,关闭语法disruptor.shutdown();,此处的关闭不会清空getRingBuffer已经存在的Event,看官方文档应该是停止生产,然后等待消费。
 
演示Demo

Java版本

    public static void main(String[] args) {
        Disruptor<FunEvent> disruptor = new Disruptor<FunEvent>(
                FunEvent::new,
                1024 * 1024,
                ThreadPoolUtil.getFactory(),
                ProducerType.MULTI,
                new YieldingWaitStrategy()
        );
        disruptor.handleEventsWithWorkerPool(new FunEventHandler(), new FunEventHandler());
        disruptor.handleEventsWith(new FunEventHandler());
        disruptor.start();
        RingBuffer<FunEvent> ringBuffer = disruptor.getRingBuffer();
        for (int i = 0; i < 3; i++) {
            ringBuffer.publishEvent((event, sequence) -> event.setId(StringUtil.getString(10)));
        }
        sleep(5.0);
        disruptor.shutdown();

    }

控制台输出:

INFO-> main 当前用户:oker,工作目录:/Users/oker/IdeaProjects/funtester/,系统编码格式:UTF-8,系统Mac OS X版本:10.16
INFO-> main 
  ###### #     #  #    # ####### ######  #####  ####### ######  #####
  #      #     #  ##   #    #    #       #         #    #       #    #
  ####   #     #  # #  #    #    ####    #####     #    ####    #####
  #      #     #  #  # #    #    #            #    #    #       #   #
  #       #####   #    #    #    ######  #####     #    ######  #    #

INFO-> F-3  消费消息:i3OrH2ZnxD 0
INFO-> F-1  消费消息:i3OrH2ZnxD
INFO-> F-2  消费消息:whhoxoMxmR
INFO-> F-3  消费消息:whhoxoMxmR 1
INFO-> F-2  消费消息:IeP9fIRpKp
INFO-> F-3  消费消息:IeP9fIRpKp 2

Process finished with exit code 0

可以看到,每个消息会消费了两次。其中F-3线程消费量=F-1和F-2线程消费量总和,这就跟家理解了com.lmax.disruptor.dsl.EventHandlerGroup的功能。

Groovy+异步版本

    public static void main(String[] args) {
        Disruptor<FunEvent> disruptor = new Disruptor<FunEvent>(
                FunEvent::new,
                1024 * 1024,
                ThreadPoolUtil.getFactory(),
                ProducerType.MULTI,
                new YieldingWaitStrategy()
        )
        disruptor.handleEventsWithWorkerPool(new FunEventHandler(), new FunEventHandler())
        disruptor.handleEventsWith(new FunEventHandler())
        disruptor.start()
        RingBuffer<FunEvent> ringBuffer = disruptor.getRingBuffer();
        def funtester = {
            fun {
                100.times {ringBuffer.publishEvent((event, sequence) -> event.setId(StringUtil.getString(10)));}
            }
        }
        10.times {funtester()}
        sleep(5.0)
        disruptor.shutdown()
    }

编辑:航网科技 来源:腾讯云 本文版权归原作者所有 转载请注明出处

在线客服

微信扫一扫咨询客服


全国免费服务热线
0755-36300002

返回顶部