|
2019-05-17
// https://mvnrepository.com/artifact/com.lmax/disruptor implementation group: 'com.lmax', name: 'disruptor', version: '3.4.2'
只列出了Gradle的,版本建议使用3+,有些Lambda语法支持需要。
这里我定义了简单的Event类:
public static class FunEvent { String id; public String getId() { return id; } public void setId(String id) { this.id = id; } }
创建方法如下:
Disruptor<FunEvent> disruptor = new Disruptor<FunEvent>( FunEvent::new, 1024 * 1024, ThreadPoolUtil.getFactory(), ProducerType.MULTI, new YieldingWaitStrategy() );
对于消息队列来讲,需要两个重要的角色,生产者和消费者。这里先将一下Disruptor生产者,我搜到不少资料,都是需要创建一个生产者的类,然后实现一个方法,这个方法内容基本一致的,内容如下:
long sequence = ringBuffer.next(); try { FunEvent funEvent = ringBuffer.get(sequence); funEvent.setId(orderId); } finally { ringBuffer.publish(sequence); }
然后使用生产者对象调用这个方法,我觉得有点多此一举了,幸好有一篇文章介绍了Disruptor一些新特性的时候提到支持了Lambda语法,这下就可以不用创建生产者对象了。语法如下:
ringBuffer.publishEvent((Event, sequence) -> Event.setId(StringUtil.getString(10)));
消费者创建需要实现两个接口com.lmax.disruptor.EventHandler
和com.lmax.disruptor.WorkHandler
,这俩一个是处理单消费者模式,另外一个多消费者模式。
创建方法如下:
/** * 消费者 */ private static class FunEventHandler implements EventHandler<FunEvent>, WorkHandler<FunEvent> { public void onEvent(FunEvent Event, long sequence, boolean endOfBatch) { output("消费消息:" + Event.getId() + TAB + sequence); } public void onEvent(FunEvent Event) { output("消费消息:" + Event.getId()); } }
public static void main(String[] args) { Disruptor<FunEvent> disruptor = new Disruptor<FunEvent>( FunEvent::new, 1024 * 1024, ThreadPoolUtil.getFactory(), ProducerType.MULTI, new YieldingWaitStrategy() ); disruptor.handleEventsWithWorkerPool(new FunEventHandler(), new FunEventHandler()); disruptor.handleEventsWith(new FunEventHandler()); disruptor.start(); RingBuffer<FunEvent> ringBuffer = disruptor.getRingBuffer(); for (int i = 0; i < 3; i++) { ringBuffer.publishEvent((event, sequence) -> event.setId(StringUtil.getString(10))); } sleep(5.0); disruptor.shutdown(); }
控制台输出:
INFO-> main 当前用户:oker,工作目录:/Users/oker/IdeaProjects/funtester/,系统编码格式:UTF-8,系统Mac OS X版本:10.16 INFO-> main ###### # # # # ####### ###### ##### ####### ###### ##### # # # ## # # # # # # # # #### # # # # # # #### ##### # #### ##### # # # # # # # # # # # # # # ##### # # # ###### ##### # ###### # # INFO-> F-3 消费消息:i3OrH2ZnxD 0 INFO-> F-1 消费消息:i3OrH2ZnxD INFO-> F-2 消费消息:whhoxoMxmR INFO-> F-3 消费消息:whhoxoMxmR 1 INFO-> F-2 消费消息:IeP9fIRpKp INFO-> F-3 消费消息:IeP9fIRpKp 2 Process finished with exit code 0
可以看到,每个消息会消费了两次。其中F-3线程消费量=F-1和F-2线程消费量总和,这就跟家理解了com.lmax.disruptor.dsl.EventHandlerGroup
的功能。
public static void main(String[] args) { Disruptor<FunEvent> disruptor = new Disruptor<FunEvent>( FunEvent::new, 1024 * 1024, ThreadPoolUtil.getFactory(), ProducerType.MULTI, new YieldingWaitStrategy() ) disruptor.handleEventsWithWorkerPool(new FunEventHandler(), new FunEventHandler()) disruptor.handleEventsWith(new FunEventHandler()) disruptor.start() RingBuffer<FunEvent> ringBuffer = disruptor.getRingBuffer(); def funtester = { fun { 100.times {ringBuffer.publishEvent((event, sequence) -> event.setId(StringUtil.getString(10)));} } } 10.times {funtester()} sleep(5.0) disruptor.shutdown() }
编辑:航网科技 来源:腾讯云 本文版权归原作者所有 转载请注明出处
微信扫一扫咨询客服
全国免费服务热线
0755-36300002