disruptor颠末几年的成长,好像已经成为机能优化的大杀器,险些每个想优化机能的项目宣称本身用上了disruptor,机能城市泛起质的跃进。究竟,最好的例子就是LMAX本身的架构设计,支撑了600w/s的吞吐。
本文试图从代码层面将要害问题做些解答。
根基观念
Disruptor: 实际上就是整个基于ringBuffer实现的出产者消费者模式的容器。
RingBuffer: 著名的环形行列,可以类比为BlockingQueue之类的行列,ringBuffer的利用,使得内存被轮回利用,淘汰了某些场景的内存分派接纳扩容等耗时操纵。
苏州软件定制开拓 ex * SCALE) + BASE; UNSAFE.putOrderedInt(availableBuffer" src="http://www.importnew.com/https:/img-blog.csdn.net/20171219141927339" />
EventProcessor: 事件处理惩罚器,实际上可以领略为消费者模子的框架,实现了线程Runnable的run要领,将轮回判定等操纵封在了内里。
苏州软件定制开拓 ex * SCALE) + BASE; UNSAFE.putOrderedInt(availableBuffer" src="http://www.importnew.com/https:/img-blog.csdn.net/20171219175751608" />
EventHandler: 事件处理器,与前面处理惩罚器的差异是,事件处理器不认真框架内的行为,仅仅是EventProcessor作为消费者框架对外预留的扩展点而已。
Sequencer: 作为RingBuffer出产者的父接口,其直接实现有SingleProducerSequencer和MultiProducerSequencer。
苏州软件定制开拓 ex * SCALE) + BASE; UNSAFE.putOrderedInt(availableBuffer" src="http://www.importnew.com/https:/img-blog.csdn.net/20171219141940748" />
EventTranslator: 事件转换器。实际上就是新事件向往事件包围的接口界说。
SequenceBarrier: 消费者路障。划定了消费者如何向下走。都说disruptor无锁,事实上,该路障算是变向的锁。
WaitStrategy: 当出产者出产得太快而消费者消费得太慢时的期待计策。
苏州软件定制开拓 ex * SCALE) + BASE; UNSAFE.putOrderedInt(availableBuffer" src="http://www.importnew.com/https:/img-blog.csdn.net/20171219142244617" />
把上面几个要害观念画个图,昆山软件公司,或许长这样:
苏州软件定制开拓 ex * SCALE) + BASE; UNSAFE.putOrderedInt(availableBuffer" src="http://www.importnew.com/https:/img-blog.csdn.net/20171221134247821" width="635" height="494" />
所以接下来主要也就从出产者,消费者以及ringBuffer3个维度去看disruptor是如何玩的。
出产者
出产者宣布动静的进程从disruptor的publish要领为进口,实际挪用了ringBuffer的publish要领。publish要领主要做了几件事,一是先确保能拿到后头的n个sequence;二是利用translator来填充新数据到相应的位置;三是真正的声明这些位置已经宣布完成。
public void publishEvent(EventTranslator<E> translator)
{
final long sequence = sequencer.next();
translateAndPublish(translator, sequence);
}
public void publishEvents(EventTranslator<E>[] translators, int batchStartsAt, int batchSize)
{
checkBounds(translators, batchStartsAt, batchSize);
final long finalSequence = sequencer.next(batchSize);
translateAndPublishBatch(translators, batchStartsAt, batchSize, finalSequence);
}
获取出产者下一个sequence的要领,细节已经注释,实际上最终目标就是确保出产者和消费者相互不越界。
public long next(int n)
{
if (n < 1)
{
throw new IllegalArgumentException("n must be > 0");
}
//该出产者宣布的最大序列号
long nextValue = this.nextValue;
//该出产者欲宣布的序列号
long nextSequence = nextValue + n;
//包围点,即该出产者假如宣布了这次的序列号,那它最终会落在哪个位置,实际上是nextSequence做了算术处理惩罚今后的值,最终目标是统一计较,不然就要去判绝对值以及取模等贫苦操纵
long wrapPoint = nextSequence - bufferSize;
//所有消费者中消费得最慢谁人的前一个序列号
long cachedGatingSequence = this.cachedValue;
//这里两个判定条件:一是看出产者出产是不是高出了消费者,所以判定的是包围点是否高出了最慢消费者;二是看消费者是否高出了当前出产者的最大序号,判定的是消费者是不是比出产者还快这种异常环境
if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue)
{
cursor.setVolatile(nextValue); // StoreLoad fence
long minSequence;
//包围点是不是已经高出了最慢消费者和当前出产者序列号的最小者(这两个有点难领略,实际上就是包围点不能高出最慢谁人出产者,也不能高出当前自身,好比一次宣布高出bufferSize),gatingSequences的处理惩罚也是雷同算术处理惩罚,也可以当作是相对付原点是正照旧负
while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue)))
{
//叫醒阻塞的消费者
waitStrategy.signalAllWhenBlocking();
//等上1纳秒
LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin?
}
//把这个最慢消费者缓存下来,以便下一次利用
this.cachedValue = minSequence;
}
//把当前序列号更新为欲宣布序列号
this.nextValue = nextSequence;
return nextSequence;
}