Exception thrown when the it is not possible to insert a value into - * the ring buffer without it wrapping the consuming sequenes. Used + *
Exception thrown when it is not possible to insert a value into + * the ring buffer without it wrapping the consuming sequences. Used * specifically when claiming with the {@link RingBuffer#tryNext()} call. *
*
For efficiency this exception will not have a stack trace. diff -Nru disruptor-3.3.4/src/main/java/com/lmax/disruptor/LiteBlockingWaitStrategy.java disruptor-3.3.5/src/main/java/com/lmax/disruptor/LiteBlockingWaitStrategy.java --- disruptor-3.3.4/src/main/java/com/lmax/disruptor/LiteBlockingWaitStrategy.java 2016-01-12 22:17:37.000000000 +0000 +++ disruptor-3.3.5/src/main/java/com/lmax/disruptor/LiteBlockingWaitStrategy.java 2016-07-15 04:29:57.000000000 +0000 @@ -87,4 +87,12 @@ } } } + + @Override + public String toString() + { + return "LiteBlockingWaitStrategy{" + + "processorNotifyCondition=" + processorNotifyCondition + + '}'; + } } diff -Nru disruptor-3.3.4/src/main/java/com/lmax/disruptor/LiteTimeoutBlockingWaitStrategy.java disruptor-3.3.5/src/main/java/com/lmax/disruptor/LiteTimeoutBlockingWaitStrategy.java --- disruptor-3.3.4/src/main/java/com/lmax/disruptor/LiteTimeoutBlockingWaitStrategy.java 1970-01-01 00:00:00.000000000 +0000 +++ disruptor-3.3.5/src/main/java/com/lmax/disruptor/LiteTimeoutBlockingWaitStrategy.java 2016-07-15 04:29:57.000000000 +0000 @@ -0,0 +1,91 @@ +package com.lmax.disruptor; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +/** + * Variation of the {@link TimeoutBlockingWaitStrategy} that attempts to elide conditional wake-ups + * when the lock is uncontended. + */ +public class LiteTimeoutBlockingWaitStrategy implements WaitStrategy +{ + private final Lock lock = new ReentrantLock(); + private final Condition processorNotifyCondition = lock.newCondition(); + private final AtomicBoolean signalNeeded = new AtomicBoolean(false); + private final long timeoutInNanos; + + public LiteTimeoutBlockingWaitStrategy(final long timeout, final TimeUnit units) + { + timeoutInNanos = units.toNanos(timeout); + } + + @Override + public long waitFor( + final long sequence, + final Sequence cursorSequence, + final Sequence dependentSequence, + final SequenceBarrier barrier) + throws AlertException, InterruptedException, TimeoutException + { + long nanos = timeoutInNanos; + + long availableSequence; + if (cursorSequence.get() < sequence) + { + lock.lock(); + try + { + while (cursorSequence.get() < sequence) + { + signalNeeded.getAndSet(true); + + barrier.checkAlert(); + nanos = processorNotifyCondition.awaitNanos(nanos); + if (nanos <= 0) + { + throw TimeoutException.INSTANCE; + } + } + } + finally + { + lock.unlock(); + } + } + + while ((availableSequence = dependentSequence.get()) < sequence) + { + barrier.checkAlert(); + } + + return availableSequence; + } + + @Override + public void signalAllWhenBlocking() + { + if (signalNeeded.getAndSet(false)) + { + lock.lock(); + try + { + processorNotifyCondition.signalAll(); + } + finally + { + lock.unlock(); + } + } + } + + @Override + public String toString() + { + return "LiteTimeoutBlockingWaitStrategy{" + + "processorNotifyCondition=" + processorNotifyCondition + + '}'; + } +} diff -Nru disruptor-3.3.4/src/main/java/com/lmax/disruptor/MultiProducerSequencer.java disruptor-3.3.5/src/main/java/com/lmax/disruptor/MultiProducerSequencer.java --- disruptor-3.3.4/src/main/java/com/lmax/disruptor/MultiProducerSequencer.java 2016-01-12 22:17:37.000000000 +0000 +++ disruptor-3.3.5/src/main/java/com/lmax/disruptor/MultiProducerSequencer.java 2016-07-15 04:29:57.000000000 +0000 @@ -133,6 +133,7 @@ if (wrapPoint > gatingSequence) { + waitStrategy.signalAllWhenBlocking(); LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy? continue; } diff -Nru disruptor-3.3.4/src/main/java/com/lmax/disruptor/RingBuffer.java disruptor-3.3.5/src/main/java/com/lmax/disruptor/RingBuffer.java --- disruptor-3.3.4/src/main/java/com/lmax/disruptor/RingBuffer.java 2016-01-12 22:17:37.000000000 +0000 +++ disruptor-3.3.5/src/main/java/com/lmax/disruptor/RingBuffer.java 2016-07-15 04:29:57.000000000 +0000 @@ -405,7 +405,7 @@ } /** - * Get the current cursor value for the ring buffer. The actual value recieved + * Get the current cursor value for the ring buffer. The actual value received * will depend on the type of {@link Sequencer} that is being used. * * @see MultiProducerSequencer @@ -1104,4 +1104,13 @@ sequencer.publish(initialSequence, finalSequence); } } + + @Override + public String toString() + { + return "RingBuffer{" + + "bufferSize=" + bufferSize + + ", sequencer=" + sequencer + + "}"; + } } diff -Nru disruptor-3.3.4/src/main/java/com/lmax/disruptor/SingleProducerSequencer.java disruptor-3.3.5/src/main/java/com/lmax/disruptor/SingleProducerSequencer.java --- disruptor-3.3.4/src/main/java/com/lmax/disruptor/SingleProducerSequencer.java 2016-01-12 22:17:37.000000000 +0000 +++ disruptor-3.3.5/src/main/java/com/lmax/disruptor/SingleProducerSequencer.java 2016-07-15 04:29:57.000000000 +0000 @@ -122,6 +122,7 @@ long minSequence; while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue))) { + waitStrategy.signalAllWhenBlocking(); LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin? } diff -Nru disruptor-3.3.4/src/main/java/com/lmax/disruptor/TimeoutBlockingWaitStrategy.java disruptor-3.3.5/src/main/java/com/lmax/disruptor/TimeoutBlockingWaitStrategy.java --- disruptor-3.3.4/src/main/java/com/lmax/disruptor/TimeoutBlockingWaitStrategy.java 2016-01-12 22:17:37.000000000 +0000 +++ disruptor-3.3.5/src/main/java/com/lmax/disruptor/TimeoutBlockingWaitStrategy.java 2016-07-15 04:29:57.000000000 +0000 @@ -70,4 +70,11 @@ } } + @Override + public String toString() + { + return "TimeoutBlockingWaitStrategy{" + + "processorNotifyCondition=" + processorNotifyCondition + + '}'; + } } diff -Nru disruptor-3.3.4/src/main/java/com/lmax/disruptor/WaitStrategy.java disruptor-3.3.5/src/main/java/com/lmax/disruptor/WaitStrategy.java --- disruptor-3.3.4/src/main/java/com/lmax/disruptor/WaitStrategy.java 2016-01-12 22:17:37.000000000 +0000 +++ disruptor-3.3.5/src/main/java/com/lmax/disruptor/WaitStrategy.java 2016-07-15 04:29:57.000000000 +0000 @@ -24,7 +24,7 @@ /** * Wait for the given sequence to be available. It is possible for this method to return a value * less than the sequence number supplied depending on the implementation of the WaitStrategy. A common - * use for this is to signal a timeout. Any EventProcessor that is using a WaitStragegy to get notifications + * use for this is to signal a timeout. Any EventProcessor that is using a WaitStrategy to get notifications * about message becoming available should remember to handle this case. The {@link BatchEventProcessor} explicitly * handles this case and will signal a timeout if required. * diff -Nru disruptor-3.3.4/src/main/java/com/lmax/disruptor/WorkProcessor.java disruptor-3.3.5/src/main/java/com/lmax/disruptor/WorkProcessor.java --- disruptor-3.3.4/src/main/java/com/lmax/disruptor/WorkProcessor.java 2016-01-12 22:17:37.000000000 +0000 +++ disruptor-3.3.5/src/main/java/com/lmax/disruptor/WorkProcessor.java 2016-07-15 04:29:57.000000000 +0000 @@ -45,6 +45,8 @@ } }; + private final TimeoutHandler timeoutHandler; + /** * Construct a {@link WorkProcessor}. * @@ -72,6 +74,8 @@ { ((EventReleaseAware) this.workHandler).setEventReleaser(eventReleaser); } + + timeoutHandler = (workHandler instanceof TimeoutHandler) ? (TimeoutHandler) workHandler : null; } @Override @@ -144,6 +148,10 @@ cachedAvailableSequence = sequenceBarrier.waitFor(nextSequence); } } + catch (final TimeoutException e) + { + notifyTimeout(sequence.get()); + } catch (final AlertException ex) { if (!running.get()) @@ -164,6 +172,21 @@ running.set(false); } + private void notifyTimeout(final long availableSequence) + { + try + { + if (timeoutHandler != null) + { + timeoutHandler.onTimeout(availableSequence); + } + } + catch (Throwable e) + { + exceptionHandler.handleEventException(e, availableSequence, null); + } + } + private void notifyStart() { if (workHandler instanceof LifecycleAware) diff -Nru disruptor-3.3.4/src/perftest/java/com/lmax/disruptor/queue/OneToOneQueueBatchedThroughputTest.java disruptor-3.3.5/src/perftest/java/com/lmax/disruptor/queue/OneToOneQueueBatchedThroughputTest.java --- disruptor-3.3.4/src/perftest/java/com/lmax/disruptor/queue/OneToOneQueueBatchedThroughputTest.java 1970-01-01 00:00:00.000000000 +0000 +++ disruptor-3.3.5/src/perftest/java/com/lmax/disruptor/queue/OneToOneQueueBatchedThroughputTest.java 2016-07-15 04:29:57.000000000 +0000 @@ -0,0 +1,97 @@ +/* + * Copyright 2011 LMAX Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.lmax.disruptor.queue; + +import com.lmax.disruptor.AbstractPerfTestQueue; +import com.lmax.disruptor.support.ValueAdditionBatchQueueProcessor; +import com.lmax.disruptor.util.DaemonThreadFactory; + +import java.util.concurrent.*; + +import static com.lmax.disruptor.support.PerfTestUtil.failIf; + +/** + *
+ * UniCast a series of items between 1 publisher and 1 event processor. + * + * +----+ +-----+ + * | P1 |--->| EP1 | + * +----+ +-----+ + * + * Queue Based: + * ============ + * + * put take + * +----+ +====+ +-----+ + * | P1 |--->| Q1 |<---| EP1 | + * +----+ +====+ +-----+ + * + * P1 - Publisher 1 + * Q1 - Queue 1 + * EP1 - EventProcessor 1 + * + *+ */ +public final class OneToOneQueueBatchedThroughputTest extends AbstractPerfTestQueue +{ + private static final int BUFFER_SIZE = 1024 * 64; + private static final long ITERATIONS = 1000L * 1000L * 10L; + private final ExecutorService executor = Executors.newSingleThreadExecutor(DaemonThreadFactory.INSTANCE); + private final long expectedResult = ITERATIONS * 3L; + + /////////////////////////////////////////////////////////////////////////////////////////////// + + private final BlockingQueue
* UniCast a series of items between 1 publisher and 1 event processor. @@ -56,7 +50,7 @@ private static final int BUFFER_SIZE = 1024 * 64; private static final long ITERATIONS = 1000L * 1000L * 10L; private final ExecutorService executor = Executors.newSingleThreadExecutor(DaemonThreadFactory.INSTANCE); - private final long expectedResult = PerfTestUtil.accumulatedAddition(ITERATIONS); + private final long expectedResult = ITERATIONS * 3L; /////////////////////////////////////////////////////////////////////////////////////////////// @@ -82,7 +76,7 @@ for (long i = 0; i < ITERATIONS; i++) { - blockingQueue.put(Long.valueOf(i)); + blockingQueue.put(3L); } latch.await(); diff -Nru disruptor-3.3.4/src/perftest/java/com/lmax/disruptor/queue/PingPongQueueLatencyTest.java disruptor-3.3.5/src/perftest/java/com/lmax/disruptor/queue/PingPongQueueLatencyTest.java --- disruptor-3.3.4/src/perftest/java/com/lmax/disruptor/queue/PingPongQueueLatencyTest.java 2016-01-12 22:17:37.000000000 +0000 +++ disruptor-3.3.5/src/perftest/java/com/lmax/disruptor/queue/PingPongQueueLatencyTest.java 2016-07-15 04:29:57.000000000 +0000 @@ -16,13 +16,7 @@ package com.lmax.disruptor.queue; import java.io.PrintStream; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.CyclicBarrier; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.*; import org.HdrHistogram.Histogram; @@ -62,7 +56,7 @@ public final class PingPongQueueLatencyTest { private static final int BUFFER_SIZE = 1024; - private static final long ITERATIONS = 1000L * 1000L * 30L; + private static final long ITERATIONS = 100L * 1000L * 30L; private static final long PAUSE_NANOS = 1000L; private final ExecutorService executor = Executors.newCachedThreadPool(DaemonThreadFactory.INSTANCE); @@ -70,8 +64,8 @@ /////////////////////////////////////////////////////////////////////////////////////////////// - private final BlockingQueuepingQueue = new LinkedBlockingQueue (BUFFER_SIZE); - private final BlockingQueue pongQueue = new LinkedBlockingQueue (BUFFER_SIZE); + private final BlockingQueue pingQueue = new ArrayBlockingQueue (BUFFER_SIZE); + private final BlockingQueue pongQueue = new ArrayBlockingQueue (BUFFER_SIZE); private final QueuePinger qPinger = new QueuePinger(pingQueue, pongQueue, ITERATIONS, PAUSE_NANOS); private final QueuePonger qPonger = new QueuePonger(pingQueue, pongQueue); @@ -152,13 +146,13 @@ Thread.sleep(1000); - long response = -1; + long counter = 0; - while (response < maxEvents) + while (counter < maxEvents) { final long t0 = System.nanoTime(); - pingQueue.put(counter++); - response = pongQueue.take(); + pingQueue.put(1L); + counter += pongQueue.take(); final long t1 = System.nanoTime(); histogram.recordValueWithExpectedInterval(t1 - t0, pauseTimeNs); diff -Nru disruptor-3.3.4/src/perftest/java/com/lmax/disruptor/sequenced/PingPongSequencedLatencyTest.java disruptor-3.3.5/src/perftest/java/com/lmax/disruptor/sequenced/PingPongSequencedLatencyTest.java --- disruptor-3.3.4/src/perftest/java/com/lmax/disruptor/sequenced/PingPongSequencedLatencyTest.java 2016-01-12 22:17:37.000000000 +0000 +++ disruptor-3.3.5/src/perftest/java/com/lmax/disruptor/sequenced/PingPongSequencedLatencyTest.java 2016-07-15 04:29:57.000000000 +0000 @@ -24,14 +24,9 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import com.lmax.disruptor.*; import org.HdrHistogram.Histogram; -import com.lmax.disruptor.BatchEventProcessor; -import com.lmax.disruptor.EventHandler; -import com.lmax.disruptor.LifecycleAware; -import com.lmax.disruptor.RingBuffer; -import com.lmax.disruptor.SequenceBarrier; -import com.lmax.disruptor.YieldingWaitStrategy; import com.lmax.disruptor.support.ValueEvent; import com.lmax.disruptor.util.DaemonThreadFactory; @@ -71,7 +66,7 @@ public final class PingPongSequencedLatencyTest { private static final int BUFFER_SIZE = 1024; - private static final long ITERATIONS = 1000L * 1000L * 30L; + private static final long ITERATIONS = 100L * 1000L * 30L; private static final long PAUSE_NANOS = 1000L; private final ExecutorService executor = Executors.newCachedThreadPool(DaemonThreadFactory.INSTANCE); @@ -80,9 +75,9 @@ /////////////////////////////////////////////////////////////////////////////////////////////// private final RingBuffer pingBuffer = - createSingleProducer(ValueEvent.EVENT_FACTORY, BUFFER_SIZE, new YieldingWaitStrategy()); + createSingleProducer(ValueEvent.EVENT_FACTORY, BUFFER_SIZE, new BlockingWaitStrategy()); private final RingBuffer pongBuffer = - createSingleProducer(ValueEvent.EVENT_FACTORY, BUFFER_SIZE, new YieldingWaitStrategy()); + createSingleProducer(ValueEvent.EVENT_FACTORY, BUFFER_SIZE, new BlockingWaitStrategy()); private final SequenceBarrier pongBarrier = pongBuffer.newBarrier(); private final Pinger pinger = new Pinger(pingBuffer, ITERATIONS, PAUSE_NANOS); diff -Nru disruptor-3.3.4/src/perftest/java/com/lmax/disruptor/support/MultiBufferBatchEventProcessor.java disruptor-3.3.5/src/perftest/java/com/lmax/disruptor/support/MultiBufferBatchEventProcessor.java --- disruptor-3.3.4/src/perftest/java/com/lmax/disruptor/support/MultiBufferBatchEventProcessor.java 2016-01-12 22:17:37.000000000 +0000 +++ disruptor-3.3.5/src/perftest/java/com/lmax/disruptor/support/MultiBufferBatchEventProcessor.java 2016-07-15 04:29:57.000000000 +0000 @@ -1,17 +1,9 @@ package com.lmax.disruptor.support; -import static java.util.Arrays.fill; +import com.lmax.disruptor.*; import java.util.concurrent.atomic.AtomicBoolean; -import com.lmax.disruptor.AlertException; -import com.lmax.disruptor.DataProvider; -import com.lmax.disruptor.EventHandler; -import com.lmax.disruptor.EventProcessor; -import com.lmax.disruptor.Sequence; -import com.lmax.disruptor.SequenceBarrier; -import com.lmax.disruptor.TimeoutException; - public class MultiBufferBatchEventProcessor implements EventProcessor { @@ -57,8 +49,6 @@ } final int barrierLength = barriers.length; - final long[] lastConsumed = new long[barrierLength]; - fill(lastConsumed, -1L); while (true) { @@ -69,16 +59,16 @@ long available = barriers[i].waitFor(-1); Sequence sequence = sequences[i]; - long previous = sequence.get(); + long nextSequence = sequence.get() + 1; - for (long l = previous + 1; l <= available; l++) + for (long l = nextSequence; l <= available; l++) { - handler.onEvent(providers[i].get(l), l, previous == available); + handler.onEvent(providers[i].get(l), l, nextSequence == available); } sequence.set(available); - count += (available - previous); + count += available - nextSequence + 1; } Thread.yield(); diff -Nru disruptor-3.3.4/src/perftest/java/com/lmax/disruptor/support/ValueAdditionBatchQueueProcessor.java disruptor-3.3.5/src/perftest/java/com/lmax/disruptor/support/ValueAdditionBatchQueueProcessor.java --- disruptor-3.3.4/src/perftest/java/com/lmax/disruptor/support/ValueAdditionBatchQueueProcessor.java 1970-01-01 00:00:00.000000000 +0000 +++ disruptor-3.3.5/src/perftest/java/com/lmax/disruptor/support/ValueAdditionBatchQueueProcessor.java 2016-07-15 04:29:57.000000000 +0000 @@ -0,0 +1,106 @@ +/* + * Copyright 2011 LMAX Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.lmax.disruptor.support; + +import java.util.ArrayList; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; + +public final class ValueAdditionBatchQueueProcessor implements Runnable +{ + private volatile boolean running; + private long value; + private long sequence; + private CountDownLatch latch; + + private final BlockingQueue blockingQueue; + private final ArrayList batch = new ArrayList (100); + private final long count; + + public ValueAdditionBatchQueueProcessor(final BlockingQueue blockingQueue, final long count) + { + this.blockingQueue = blockingQueue; + this.count = count; + } + + public long getValue() + { + return value; + } + + public void reset(final CountDownLatch latch) + { + value = 0L; + sequence = 0L; + this.latch = latch; + } + + public void halt() + { + running = false; + } + + @Override + public void run() + { + running = true; + while (true) + { + try + { + long v = blockingQueue.take(); + sequence++; + + this.value += v; + + int c = blockingQueue.drainTo(batch, 100); + sequence += c; + + v = 0; + for (int i = 0, n = batch.size(); i < n; i++) + { + v += batch.get(i); + } + + this.value += v; + + batch.clear(); + + if (sequence == count) + { + latch.countDown(); + } + } + catch (InterruptedException ex) + { + if (!running) + { + break; + } + } + } + } + + @Override + public String toString() + { + return "ValueAdditionBatchQueueProcessor{" + + "value=" + value + + ", sequence=" + sequence + + ", count=" + count + + '}'; + } +} diff -Nru disruptor-3.3.4/src/test/java/com/lmax/disruptor/example/HandleExceptionOnTranslate.java disruptor-3.3.5/src/test/java/com/lmax/disruptor/example/HandleExceptionOnTranslate.java --- disruptor-3.3.4/src/test/java/com/lmax/disruptor/example/HandleExceptionOnTranslate.java 1970-01-01 00:00:00.000000000 +0000 +++ disruptor-3.3.5/src/test/java/com/lmax/disruptor/example/HandleExceptionOnTranslate.java 2016-07-15 04:29:57.000000000 +0000 @@ -0,0 +1,68 @@ +package com.lmax.disruptor.example; + +import com.lmax.disruptor.EventHandler; +import com.lmax.disruptor.EventTranslator; +import com.lmax.disruptor.dsl.Disruptor; +import com.lmax.disruptor.support.LongEvent; +import com.lmax.disruptor.util.DaemonThreadFactory; + +public class HandleExceptionOnTranslate +{ + private static final int NO_VALUE_SPECIFIED = -1; + + private static class MyHandler implements EventHandler + { + + @Override + public void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception + { + if (event.get() == NO_VALUE_SPECIFIED) + { + System.out.printf("Discarded%n"); + } + else + { + System.out.printf("Processed: %s%n", event.get() == sequence); + } + } + } + + public static void main(String[] args) throws InterruptedException + { + Disruptor disruptor = new Disruptor (LongEvent.FACTORY, 1024, DaemonThreadFactory.INSTANCE); + + disruptor.handleEventsWith(new MyHandler()); + + disruptor.start(); + + EventTranslator t = new EventTranslator () + { + @Override + public void translateTo(LongEvent event, long sequence) + { + event.set(NO_VALUE_SPECIFIED); + + if (sequence % 3 == 0) + { + throw new RuntimeException("Skipping"); + } + + event.set(sequence); + } + }; + + for (int i = 0; i < 10; i++) + { + try + { + disruptor.publishEvent(t); + } + catch (RuntimeException e) + { + // Skipping + } + } + + Thread.sleep(5000); + } +} diff -Nru disruptor-3.3.4/src/test/java/com/lmax/disruptor/example/KeyedBatching.java disruptor-3.3.5/src/test/java/com/lmax/disruptor/example/KeyedBatching.java --- disruptor-3.3.4/src/test/java/com/lmax/disruptor/example/KeyedBatching.java 1970-01-01 00:00:00.000000000 +0000 +++ disruptor-3.3.5/src/test/java/com/lmax/disruptor/example/KeyedBatching.java 2016-07-15 04:29:57.000000000 +0000 @@ -0,0 +1,42 @@ +package com.lmax.disruptor.example; + +import com.lmax.disruptor.EventHandler; + +import java.util.ArrayList; +import java.util.List; + +public class KeyedBatching implements EventHandler +{ + private static final int MAX_BATCH_SIZE = 100; + private long key = 0; + private List