diff -Nru disruptor-3.3.4/build.gradle disruptor-3.3.5/build.gradle --- disruptor-3.3.4/build.gradle 2016-01-12 22:17:37.000000000 +0000 +++ disruptor-3.3.5/build.gradle 2016-07-15 04:29:57.000000000 +0000 @@ -24,7 +24,7 @@ defaultTasks 'build' group = 'com.lmax' -version = new Version(major: 3, minor: 3, revision: 4) +version = new Version(major: 3, minor: 3, revision: 5) ext { fullName = 'Disruptor Framework' @@ -176,6 +176,7 @@ baseName = project.name + '-perf' from { configurations.perfCompile.collect { it.isDirectory() ? it : zipTree(it) } } from sourceSets.perf.output + from sourceSets.test.output with jar } diff -Nru disruptor-3.3.4/config/checkstyle/checkstyle.xml disruptor-3.3.5/config/checkstyle/checkstyle.xml --- disruptor-3.3.4/config/checkstyle/checkstyle.xml 2016-01-12 22:17:37.000000000 +0000 +++ disruptor-3.3.5/config/checkstyle/checkstyle.xml 2016-07-15 04:29:57.000000000 +0000 @@ -123,7 +123,7 @@ + value="java.util.GregorianCalendar, java.util.Hashtable, java.util.Vector" /> diff -Nru disruptor-3.3.4/debian/changelog disruptor-3.3.5/debian/changelog --- disruptor-3.3.4/debian/changelog 2016-01-22 15:19:14.000000000 +0000 +++ disruptor-3.3.5/debian/changelog 2016-07-28 06:11:06.000000000 +0000 @@ -1,3 +1,13 @@ +disruptor (3.3.5-1) unstable; urgency=medium + + * Team upload. + * New upstream release. + * Update Vcs- URLs to use https. + * Bump Standards-Version to 3.9.8 (no changes). + * Add to Uploaders. + + -- tony mancill Wed, 27 Jul 2016 23:06:41 -0700 + disruptor (3.3.4-1) unstable; urgency=medium * New upstream release diff -Nru disruptor-3.3.4/debian/control disruptor-3.3.5/debian/control --- disruptor-3.3.4/debian/control 2016-01-22 15:09:00.000000000 +0000 +++ disruptor-3.3.5/debian/control 2016-07-28 06:11:06.000000000 +0000 @@ -2,11 +2,12 @@ Section: java Priority: optional Maintainer: Debian Java Maintainers -Uploaders: Emmanuel Bourg +Uploaders: Emmanuel Bourg , + tony mancill Build-Depends: debhelper (>= 9), default-jdk, maven-debian-helper (>= 1.5) -Standards-Version: 3.9.6 -Vcs-Git: git://anonscm.debian.org/pkg-java/disruptor.git -Vcs-Browser: http://anonscm.debian.org/cgit/pkg-java/disruptor.git +Standards-Version: 3.9.8 +Vcs-Git: https://anonscm.debian.org/git/pkg-java/disruptor.git +Vcs-Browser: https://anonscm.debian.org/cgit/pkg-java/disruptor.git Homepage: http://lmax-exchange.github.com/disruptor Package: libdisruptor-java diff -Nru disruptor-3.3.4/debian/pom.xml disruptor-3.3.5/debian/pom.xml --- disruptor-3.3.4/debian/pom.xml 2016-01-12 22:25:40.000000000 +0000 +++ disruptor-3.3.5/debian/pom.xml 2016-07-28 06:11:06.000000000 +0000 @@ -4,7 +4,7 @@ 4.0.0 com.lmax disruptor - 3.3.4 + 3.3.5 Disruptor Framework Disruptor - Concurrent Programming Framework http://lmax-exchange.github.com/disruptor diff -Nru disruptor-3.3.4/gradle/wrapper/gradle-wrapper.properties disruptor-3.3.5/gradle/wrapper/gradle-wrapper.properties --- disruptor-3.3.4/gradle/wrapper/gradle-wrapper.properties 2016-01-12 22:17:37.000000000 +0000 +++ disruptor-3.3.5/gradle/wrapper/gradle-wrapper.properties 2016-07-15 04:29:57.000000000 +0000 @@ -1,6 +1,6 @@ -#Thu Jul 30 10:01:19 NZST 2015 +#Fri Jul 15 11:56:39 NZST 2016 distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-2.5-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-2.8-bin.zip diff -Nru disruptor-3.3.4/gradlew disruptor-3.3.5/gradlew --- disruptor-3.3.4/gradlew 2016-01-12 22:17:37.000000000 +0000 +++ disruptor-3.3.5/gradlew 2016-07-15 04:29:57.000000000 +0000 @@ -42,11 +42,6 @@ ;; esac -# For Cygwin, ensure paths are in UNIX format before anything is touched. -if $cygwin ; then - [ -n "$JAVA_HOME" ] && JAVA_HOME=`cygpath --unix "$JAVA_HOME"` -fi - # Attempt to set APP_HOME # Resolve links: $0 may be a link PRG="$0" @@ -61,9 +56,9 @@ fi done SAVED="`pwd`" -cd "`dirname \"$PRG\"`/" >&- +cd "`dirname \"$PRG\"`/" >/dev/null APP_HOME="`pwd -P`" -cd "$SAVED" >&- +cd "$SAVED" >/dev/null CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar @@ -114,6 +109,7 @@ if $cygwin ; then APP_HOME=`cygpath --path --mixed "$APP_HOME"` CLASSPATH=`cygpath --path --mixed "$CLASSPATH"` + JAVACMD=`cygpath --unix "$JAVACMD"` # We build the pattern for arguments to be converted via cygpath ROOTDIRSRAW=`find -L / -maxdepth 1 -mindepth 1 -type d 2>/dev/null` diff -Nru disruptor-3.3.4/src/main/java/com/lmax/disruptor/AbstractSequencer.java disruptor-3.3.5/src/main/java/com/lmax/disruptor/AbstractSequencer.java --- disruptor-3.3.4/src/main/java/com/lmax/disruptor/AbstractSequencer.java 2016-01-12 22:17:37.000000000 +0000 +++ disruptor-3.3.5/src/main/java/com/lmax/disruptor/AbstractSequencer.java 2016-07-15 04:29:57.000000000 +0000 @@ -15,6 +15,7 @@ */ package com.lmax.disruptor; +import java.util.Arrays; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import com.lmax.disruptor.util.Util; @@ -122,4 +123,14 @@ { return EventPoller.newInstance(dataProvider, this, new Sequence(), cursor, gatingSequences); } + + @Override + public String toString() + { + return "AbstractSequencer{" + + "waitStrategy=" + waitStrategy + + ", cursor=" + cursor + + ", gatingSequences=" + Arrays.toString(gatingSequences) + + '}'; + } } \ No newline at end of file diff -Nru disruptor-3.3.4/src/main/java/com/lmax/disruptor/BlockingWaitStrategy.java disruptor-3.3.5/src/main/java/com/lmax/disruptor/BlockingWaitStrategy.java --- disruptor-3.3.4/src/main/java/com/lmax/disruptor/BlockingWaitStrategy.java 2016-01-12 22:17:37.000000000 +0000 +++ disruptor-3.3.5/src/main/java/com/lmax/disruptor/BlockingWaitStrategy.java 2016-07-15 04:29:57.000000000 +0000 @@ -72,4 +72,12 @@ lock.unlock(); } } + + @Override + public String toString() + { + return "BlockingWaitStrategy{" + + "processorNotifyCondition=" + processorNotifyCondition + + '}'; + } } diff -Nru disruptor-3.3.4/src/main/java/com/lmax/disruptor/dsl/BasicExecutor.java disruptor-3.3.5/src/main/java/com/lmax/disruptor/dsl/BasicExecutor.java --- disruptor-3.3.4/src/main/java/com/lmax/disruptor/dsl/BasicExecutor.java 2016-01-12 22:17:37.000000000 +0000 +++ disruptor-3.3.5/src/main/java/com/lmax/disruptor/dsl/BasicExecutor.java 2016-07-15 04:29:57.000000000 +0000 @@ -1,11 +1,17 @@ package com.lmax.disruptor.dsl; +import java.lang.management.ManagementFactory; +import java.lang.management.ThreadInfo; +import java.lang.management.ThreadMXBean; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executor; import java.util.concurrent.ThreadFactory; public class BasicExecutor implements Executor { private final ThreadFactory factory; + private final Queue threads = new ConcurrentLinkedQueue(); public BasicExecutor(ThreadFactory factory) { @@ -22,5 +28,35 @@ } thread.start(); + + threads.add(thread); + } + + @Override + public String toString() + { + return "BasicExecutor{" + + "threads=" + dumpThreadInfo() + + '}'; + } + + private String dumpThreadInfo() + { + final StringBuilder sb = new StringBuilder(); + + final ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean(); + + for (Thread t : threads) + { + ThreadInfo threadInfo = threadMXBean.getThreadInfo(t.getId()); + sb.append("{"); + sb.append("name=").append(t.getName()).append(","); + sb.append("id=").append(t.getId()).append(","); + sb.append("state=").append(threadInfo.getThreadState()).append(","); + sb.append("lockInfo=").append(threadInfo.getLockInfo()); + sb.append("}"); + } + + return sb.toString(); } } diff -Nru disruptor-3.3.4/src/main/java/com/lmax/disruptor/dsl/Disruptor.java disruptor-3.3.5/src/main/java/com/lmax/disruptor/dsl/Disruptor.java --- disruptor-3.3.4/src/main/java/com/lmax/disruptor/dsl/Disruptor.java 2016-01-12 22:17:37.000000000 +0000 +++ disruptor-3.3.5/src/main/java/com/lmax/disruptor/dsl/Disruptor.java 2016-07-15 04:29:57.000000000 +0000 @@ -70,7 +70,7 @@ * {@link ProducerType}.MULTI * * @deprecated Use a {@link ThreadFactory} instead of an {@link Executor} as a the ThreadFactory - * is able to report errors then it is unable to construct a thread to run a producer. + * is able to report errors when it is unable to construct a thread to run a producer. * * @param eventFactory the factory to create events in the ring buffer. * @param ringBufferSize the size of the ring buffer. @@ -86,7 +86,7 @@ * Create a new Disruptor. * * @deprecated Use a {@link ThreadFactory} instead of an {@link Executor} as a the ThreadFactory - * is able to report errors then it is unable to construct a thread to run a producer. + * is able to report errors when it is unable to construct a thread to run a producer. * * @param eventFactory the factory to create events in the ring buffer. * @param ringBufferSize the size of the ring buffer, must be power of 2. @@ -556,4 +556,14 @@ throw new IllegalStateException("Disruptor.start() must only be called once."); } } + + @Override + public String toString() + { + return "Disruptor{" + + "ringBuffer=" + ringBuffer + + ", started=" + started + + ", executor=" + executor + + '}'; + } } diff -Nru disruptor-3.3.4/src/main/java/com/lmax/disruptor/InsufficientCapacityException.java disruptor-3.3.5/src/main/java/com/lmax/disruptor/InsufficientCapacityException.java --- disruptor-3.3.4/src/main/java/com/lmax/disruptor/InsufficientCapacityException.java 2016-01-12 22:17:37.000000000 +0000 +++ disruptor-3.3.5/src/main/java/com/lmax/disruptor/InsufficientCapacityException.java 2016-07-15 04:29:57.000000000 +0000 @@ -16,8 +16,8 @@ package com.lmax.disruptor; /** - *

Exception thrown when the it is not possible to insert a value into - * the ring buffer without it wrapping the consuming sequenes. Used + *

Exception thrown when it is not possible to insert a value into + * the ring buffer without it wrapping the consuming sequences. Used * specifically when claiming with the {@link RingBuffer#tryNext()} call. *

*

For efficiency this exception will not have a stack trace. diff -Nru disruptor-3.3.4/src/main/java/com/lmax/disruptor/LiteBlockingWaitStrategy.java disruptor-3.3.5/src/main/java/com/lmax/disruptor/LiteBlockingWaitStrategy.java --- disruptor-3.3.4/src/main/java/com/lmax/disruptor/LiteBlockingWaitStrategy.java 2016-01-12 22:17:37.000000000 +0000 +++ disruptor-3.3.5/src/main/java/com/lmax/disruptor/LiteBlockingWaitStrategy.java 2016-07-15 04:29:57.000000000 +0000 @@ -87,4 +87,12 @@ } } } + + @Override + public String toString() + { + return "LiteBlockingWaitStrategy{" + + "processorNotifyCondition=" + processorNotifyCondition + + '}'; + } } diff -Nru disruptor-3.3.4/src/main/java/com/lmax/disruptor/LiteTimeoutBlockingWaitStrategy.java disruptor-3.3.5/src/main/java/com/lmax/disruptor/LiteTimeoutBlockingWaitStrategy.java --- disruptor-3.3.4/src/main/java/com/lmax/disruptor/LiteTimeoutBlockingWaitStrategy.java 1970-01-01 00:00:00.000000000 +0000 +++ disruptor-3.3.5/src/main/java/com/lmax/disruptor/LiteTimeoutBlockingWaitStrategy.java 2016-07-15 04:29:57.000000000 +0000 @@ -0,0 +1,91 @@ +package com.lmax.disruptor; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +/** + * Variation of the {@link TimeoutBlockingWaitStrategy} that attempts to elide conditional wake-ups + * when the lock is uncontended. + */ +public class LiteTimeoutBlockingWaitStrategy implements WaitStrategy +{ + private final Lock lock = new ReentrantLock(); + private final Condition processorNotifyCondition = lock.newCondition(); + private final AtomicBoolean signalNeeded = new AtomicBoolean(false); + private final long timeoutInNanos; + + public LiteTimeoutBlockingWaitStrategy(final long timeout, final TimeUnit units) + { + timeoutInNanos = units.toNanos(timeout); + } + + @Override + public long waitFor( + final long sequence, + final Sequence cursorSequence, + final Sequence dependentSequence, + final SequenceBarrier barrier) + throws AlertException, InterruptedException, TimeoutException + { + long nanos = timeoutInNanos; + + long availableSequence; + if (cursorSequence.get() < sequence) + { + lock.lock(); + try + { + while (cursorSequence.get() < sequence) + { + signalNeeded.getAndSet(true); + + barrier.checkAlert(); + nanos = processorNotifyCondition.awaitNanos(nanos); + if (nanos <= 0) + { + throw TimeoutException.INSTANCE; + } + } + } + finally + { + lock.unlock(); + } + } + + while ((availableSequence = dependentSequence.get()) < sequence) + { + barrier.checkAlert(); + } + + return availableSequence; + } + + @Override + public void signalAllWhenBlocking() + { + if (signalNeeded.getAndSet(false)) + { + lock.lock(); + try + { + processorNotifyCondition.signalAll(); + } + finally + { + lock.unlock(); + } + } + } + + @Override + public String toString() + { + return "LiteTimeoutBlockingWaitStrategy{" + + "processorNotifyCondition=" + processorNotifyCondition + + '}'; + } +} diff -Nru disruptor-3.3.4/src/main/java/com/lmax/disruptor/MultiProducerSequencer.java disruptor-3.3.5/src/main/java/com/lmax/disruptor/MultiProducerSequencer.java --- disruptor-3.3.4/src/main/java/com/lmax/disruptor/MultiProducerSequencer.java 2016-01-12 22:17:37.000000000 +0000 +++ disruptor-3.3.5/src/main/java/com/lmax/disruptor/MultiProducerSequencer.java 2016-07-15 04:29:57.000000000 +0000 @@ -133,6 +133,7 @@ if (wrapPoint > gatingSequence) { + waitStrategy.signalAllWhenBlocking(); LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy? continue; } diff -Nru disruptor-3.3.4/src/main/java/com/lmax/disruptor/RingBuffer.java disruptor-3.3.5/src/main/java/com/lmax/disruptor/RingBuffer.java --- disruptor-3.3.4/src/main/java/com/lmax/disruptor/RingBuffer.java 2016-01-12 22:17:37.000000000 +0000 +++ disruptor-3.3.5/src/main/java/com/lmax/disruptor/RingBuffer.java 2016-07-15 04:29:57.000000000 +0000 @@ -405,7 +405,7 @@ } /** - * Get the current cursor value for the ring buffer. The actual value recieved + * Get the current cursor value for the ring buffer. The actual value received * will depend on the type of {@link Sequencer} that is being used. * * @see MultiProducerSequencer @@ -1104,4 +1104,13 @@ sequencer.publish(initialSequence, finalSequence); } } + + @Override + public String toString() + { + return "RingBuffer{" + + "bufferSize=" + bufferSize + + ", sequencer=" + sequencer + + "}"; + } } diff -Nru disruptor-3.3.4/src/main/java/com/lmax/disruptor/SingleProducerSequencer.java disruptor-3.3.5/src/main/java/com/lmax/disruptor/SingleProducerSequencer.java --- disruptor-3.3.4/src/main/java/com/lmax/disruptor/SingleProducerSequencer.java 2016-01-12 22:17:37.000000000 +0000 +++ disruptor-3.3.5/src/main/java/com/lmax/disruptor/SingleProducerSequencer.java 2016-07-15 04:29:57.000000000 +0000 @@ -122,6 +122,7 @@ long minSequence; while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue))) { + waitStrategy.signalAllWhenBlocking(); LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin? } diff -Nru disruptor-3.3.4/src/main/java/com/lmax/disruptor/TimeoutBlockingWaitStrategy.java disruptor-3.3.5/src/main/java/com/lmax/disruptor/TimeoutBlockingWaitStrategy.java --- disruptor-3.3.4/src/main/java/com/lmax/disruptor/TimeoutBlockingWaitStrategy.java 2016-01-12 22:17:37.000000000 +0000 +++ disruptor-3.3.5/src/main/java/com/lmax/disruptor/TimeoutBlockingWaitStrategy.java 2016-07-15 04:29:57.000000000 +0000 @@ -70,4 +70,11 @@ } } + @Override + public String toString() + { + return "TimeoutBlockingWaitStrategy{" + + "processorNotifyCondition=" + processorNotifyCondition + + '}'; + } } diff -Nru disruptor-3.3.4/src/main/java/com/lmax/disruptor/WaitStrategy.java disruptor-3.3.5/src/main/java/com/lmax/disruptor/WaitStrategy.java --- disruptor-3.3.4/src/main/java/com/lmax/disruptor/WaitStrategy.java 2016-01-12 22:17:37.000000000 +0000 +++ disruptor-3.3.5/src/main/java/com/lmax/disruptor/WaitStrategy.java 2016-07-15 04:29:57.000000000 +0000 @@ -24,7 +24,7 @@ /** * Wait for the given sequence to be available. It is possible for this method to return a value * less than the sequence number supplied depending on the implementation of the WaitStrategy. A common - * use for this is to signal a timeout. Any EventProcessor that is using a WaitStragegy to get notifications + * use for this is to signal a timeout. Any EventProcessor that is using a WaitStrategy to get notifications * about message becoming available should remember to handle this case. The {@link BatchEventProcessor} explicitly * handles this case and will signal a timeout if required. * diff -Nru disruptor-3.3.4/src/main/java/com/lmax/disruptor/WorkProcessor.java disruptor-3.3.5/src/main/java/com/lmax/disruptor/WorkProcessor.java --- disruptor-3.3.4/src/main/java/com/lmax/disruptor/WorkProcessor.java 2016-01-12 22:17:37.000000000 +0000 +++ disruptor-3.3.5/src/main/java/com/lmax/disruptor/WorkProcessor.java 2016-07-15 04:29:57.000000000 +0000 @@ -45,6 +45,8 @@ } }; + private final TimeoutHandler timeoutHandler; + /** * Construct a {@link WorkProcessor}. * @@ -72,6 +74,8 @@ { ((EventReleaseAware) this.workHandler).setEventReleaser(eventReleaser); } + + timeoutHandler = (workHandler instanceof TimeoutHandler) ? (TimeoutHandler) workHandler : null; } @Override @@ -144,6 +148,10 @@ cachedAvailableSequence = sequenceBarrier.waitFor(nextSequence); } } + catch (final TimeoutException e) + { + notifyTimeout(sequence.get()); + } catch (final AlertException ex) { if (!running.get()) @@ -164,6 +172,21 @@ running.set(false); } + private void notifyTimeout(final long availableSequence) + { + try + { + if (timeoutHandler != null) + { + timeoutHandler.onTimeout(availableSequence); + } + } + catch (Throwable e) + { + exceptionHandler.handleEventException(e, availableSequence, null); + } + } + private void notifyStart() { if (workHandler instanceof LifecycleAware) diff -Nru disruptor-3.3.4/src/perftest/java/com/lmax/disruptor/queue/OneToOneQueueBatchedThroughputTest.java disruptor-3.3.5/src/perftest/java/com/lmax/disruptor/queue/OneToOneQueueBatchedThroughputTest.java --- disruptor-3.3.4/src/perftest/java/com/lmax/disruptor/queue/OneToOneQueueBatchedThroughputTest.java 1970-01-01 00:00:00.000000000 +0000 +++ disruptor-3.3.5/src/perftest/java/com/lmax/disruptor/queue/OneToOneQueueBatchedThroughputTest.java 2016-07-15 04:29:57.000000000 +0000 @@ -0,0 +1,97 @@ +/* + * Copyright 2011 LMAX Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.lmax.disruptor.queue; + +import com.lmax.disruptor.AbstractPerfTestQueue; +import com.lmax.disruptor.support.ValueAdditionBatchQueueProcessor; +import com.lmax.disruptor.util.DaemonThreadFactory; + +import java.util.concurrent.*; + +import static com.lmax.disruptor.support.PerfTestUtil.failIf; + +/** + *

+ * UniCast a series of items between 1 publisher and 1 event processor.
+ *
+ * +----+    +-----+
+ * | P1 |--->| EP1 |
+ * +----+    +-----+
+ *
+ * Queue Based:
+ * ============
+ *
+ *        put      take
+ * +----+    +====+    +-----+
+ * | P1 |--->| Q1 |<---| EP1 |
+ * +----+    +====+    +-----+
+ *
+ * P1  - Publisher 1
+ * Q1  - Queue 1
+ * EP1 - EventProcessor 1
+ *
+ * 
+ */ +public final class OneToOneQueueBatchedThroughputTest extends AbstractPerfTestQueue +{ + private static final int BUFFER_SIZE = 1024 * 64; + private static final long ITERATIONS = 1000L * 1000L * 10L; + private final ExecutorService executor = Executors.newSingleThreadExecutor(DaemonThreadFactory.INSTANCE); + private final long expectedResult = ITERATIONS * 3L; + + /////////////////////////////////////////////////////////////////////////////////////////////// + + private final BlockingQueue blockingQueue = new LinkedBlockingQueue(BUFFER_SIZE); + private final ValueAdditionBatchQueueProcessor queueProcessor = + new ValueAdditionBatchQueueProcessor(blockingQueue, ITERATIONS); + + /////////////////////////////////////////////////////////////////////////////////////////////// + + @Override + protected int getRequiredProcessorCount() + { + return 2; + } + + @Override + protected long runQueuePass() throws InterruptedException + { + final CountDownLatch latch = new CountDownLatch(1); + queueProcessor.reset(latch); + Future future = executor.submit(queueProcessor); + long start = System.currentTimeMillis(); + + for (long i = 0; i < ITERATIONS; i++) + { + blockingQueue.put(3L); + } + + latch.await(); + long opsPerSecond = (ITERATIONS * 1000L) / (System.currentTimeMillis() - start); + queueProcessor.halt(); + future.cancel(true); + + failIf(expectedResult, 0); + + return opsPerSecond; + } + + public static void main(String[] args) throws Exception + { + OneToOneQueueBatchedThroughputTest test = new OneToOneQueueBatchedThroughputTest(); + test.testImplementations(); + } +} diff -Nru disruptor-3.3.4/src/perftest/java/com/lmax/disruptor/queue/OneToOneQueueThroughputTest.java disruptor-3.3.5/src/perftest/java/com/lmax/disruptor/queue/OneToOneQueueThroughputTest.java --- disruptor-3.3.4/src/perftest/java/com/lmax/disruptor/queue/OneToOneQueueThroughputTest.java 2016-01-12 22:17:37.000000000 +0000 +++ disruptor-3.3.5/src/perftest/java/com/lmax/disruptor/queue/OneToOneQueueThroughputTest.java 2016-07-15 04:29:57.000000000 +0000 @@ -15,20 +15,14 @@ */ package com.lmax.disruptor.queue; -import static com.lmax.disruptor.support.PerfTestUtil.failIf; - -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingQueue; - import com.lmax.disruptor.AbstractPerfTestQueue; -import com.lmax.disruptor.support.PerfTestUtil; import com.lmax.disruptor.support.ValueAdditionQueueProcessor; import com.lmax.disruptor.util.DaemonThreadFactory; +import java.util.concurrent.*; + +import static com.lmax.disruptor.support.PerfTestUtil.failIf; + /** *
  * UniCast a series of items between 1 publisher and 1 event processor.
@@ -56,7 +50,7 @@
     private static final int BUFFER_SIZE = 1024 * 64;
     private static final long ITERATIONS = 1000L * 1000L * 10L;
     private final ExecutorService executor = Executors.newSingleThreadExecutor(DaemonThreadFactory.INSTANCE);
-    private final long expectedResult = PerfTestUtil.accumulatedAddition(ITERATIONS);
+    private final long expectedResult = ITERATIONS * 3L;
 
     ///////////////////////////////////////////////////////////////////////////////////////////////
 
@@ -82,7 +76,7 @@
 
         for (long i = 0; i < ITERATIONS; i++)
         {
-            blockingQueue.put(Long.valueOf(i));
+            blockingQueue.put(3L);
         }
 
         latch.await();
diff -Nru disruptor-3.3.4/src/perftest/java/com/lmax/disruptor/queue/PingPongQueueLatencyTest.java disruptor-3.3.5/src/perftest/java/com/lmax/disruptor/queue/PingPongQueueLatencyTest.java
--- disruptor-3.3.4/src/perftest/java/com/lmax/disruptor/queue/PingPongQueueLatencyTest.java	2016-01-12 22:17:37.000000000 +0000
+++ disruptor-3.3.5/src/perftest/java/com/lmax/disruptor/queue/PingPongQueueLatencyTest.java	2016-07-15 04:29:57.000000000 +0000
@@ -16,13 +16,7 @@
 package com.lmax.disruptor.queue;
 
 import java.io.PrintStream;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.CyclicBarrier;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.*;
 
 import org.HdrHistogram.Histogram;
 
@@ -62,7 +56,7 @@
 public final class PingPongQueueLatencyTest
 {
     private static final int BUFFER_SIZE = 1024;
-    private static final long ITERATIONS = 1000L * 1000L * 30L;
+    private static final long ITERATIONS = 100L * 1000L * 30L;
     private static final long PAUSE_NANOS = 1000L;
     private final ExecutorService executor = Executors.newCachedThreadPool(DaemonThreadFactory.INSTANCE);
 
@@ -70,8 +64,8 @@
 
     ///////////////////////////////////////////////////////////////////////////////////////////////
 
-    private final BlockingQueue pingQueue = new LinkedBlockingQueue(BUFFER_SIZE);
-    private final BlockingQueue pongQueue = new LinkedBlockingQueue(BUFFER_SIZE);
+    private final BlockingQueue pingQueue = new ArrayBlockingQueue(BUFFER_SIZE);
+    private final BlockingQueue pongQueue = new ArrayBlockingQueue(BUFFER_SIZE);
     private final QueuePinger qPinger = new QueuePinger(pingQueue, pongQueue, ITERATIONS, PAUSE_NANOS);
     private final QueuePonger qPonger = new QueuePonger(pingQueue, pongQueue);
 
@@ -152,13 +146,13 @@
 
                 Thread.sleep(1000);
 
-                long response = -1;
+                long counter = 0;
 
-                while (response < maxEvents)
+                while (counter < maxEvents)
                 {
                     final long t0 = System.nanoTime();
-                    pingQueue.put(counter++);
-                    response = pongQueue.take();
+                    pingQueue.put(1L);
+                    counter += pongQueue.take();
                     final long t1 = System.nanoTime();
 
                     histogram.recordValueWithExpectedInterval(t1 - t0, pauseTimeNs);
diff -Nru disruptor-3.3.4/src/perftest/java/com/lmax/disruptor/sequenced/PingPongSequencedLatencyTest.java disruptor-3.3.5/src/perftest/java/com/lmax/disruptor/sequenced/PingPongSequencedLatencyTest.java
--- disruptor-3.3.4/src/perftest/java/com/lmax/disruptor/sequenced/PingPongSequencedLatencyTest.java	2016-01-12 22:17:37.000000000 +0000
+++ disruptor-3.3.5/src/perftest/java/com/lmax/disruptor/sequenced/PingPongSequencedLatencyTest.java	2016-07-15 04:29:57.000000000 +0000
@@ -24,14 +24,9 @@
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
+import com.lmax.disruptor.*;
 import org.HdrHistogram.Histogram;
 
-import com.lmax.disruptor.BatchEventProcessor;
-import com.lmax.disruptor.EventHandler;
-import com.lmax.disruptor.LifecycleAware;
-import com.lmax.disruptor.RingBuffer;
-import com.lmax.disruptor.SequenceBarrier;
-import com.lmax.disruptor.YieldingWaitStrategy;
 import com.lmax.disruptor.support.ValueEvent;
 import com.lmax.disruptor.util.DaemonThreadFactory;
 
@@ -71,7 +66,7 @@
 public final class PingPongSequencedLatencyTest
 {
     private static final int BUFFER_SIZE = 1024;
-    private static final long ITERATIONS = 1000L * 1000L * 30L;
+    private static final long ITERATIONS = 100L * 1000L * 30L;
     private static final long PAUSE_NANOS = 1000L;
     private final ExecutorService executor = Executors.newCachedThreadPool(DaemonThreadFactory.INSTANCE);
 
@@ -80,9 +75,9 @@
     ///////////////////////////////////////////////////////////////////////////////////////////////
 
     private final RingBuffer pingBuffer =
-        createSingleProducer(ValueEvent.EVENT_FACTORY, BUFFER_SIZE, new YieldingWaitStrategy());
+        createSingleProducer(ValueEvent.EVENT_FACTORY, BUFFER_SIZE, new BlockingWaitStrategy());
     private final RingBuffer pongBuffer =
-        createSingleProducer(ValueEvent.EVENT_FACTORY, BUFFER_SIZE, new YieldingWaitStrategy());
+        createSingleProducer(ValueEvent.EVENT_FACTORY, BUFFER_SIZE, new BlockingWaitStrategy());
 
     private final SequenceBarrier pongBarrier = pongBuffer.newBarrier();
     private final Pinger pinger = new Pinger(pingBuffer, ITERATIONS, PAUSE_NANOS);
diff -Nru disruptor-3.3.4/src/perftest/java/com/lmax/disruptor/support/MultiBufferBatchEventProcessor.java disruptor-3.3.5/src/perftest/java/com/lmax/disruptor/support/MultiBufferBatchEventProcessor.java
--- disruptor-3.3.4/src/perftest/java/com/lmax/disruptor/support/MultiBufferBatchEventProcessor.java	2016-01-12 22:17:37.000000000 +0000
+++ disruptor-3.3.5/src/perftest/java/com/lmax/disruptor/support/MultiBufferBatchEventProcessor.java	2016-07-15 04:29:57.000000000 +0000
@@ -1,17 +1,9 @@
 package com.lmax.disruptor.support;
 
-import static java.util.Arrays.fill;
+import com.lmax.disruptor.*;
 
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import com.lmax.disruptor.AlertException;
-import com.lmax.disruptor.DataProvider;
-import com.lmax.disruptor.EventHandler;
-import com.lmax.disruptor.EventProcessor;
-import com.lmax.disruptor.Sequence;
-import com.lmax.disruptor.SequenceBarrier;
-import com.lmax.disruptor.TimeoutException;
-
 public class MultiBufferBatchEventProcessor
     implements EventProcessor
 {
@@ -57,8 +49,6 @@
         }
 
         final int barrierLength = barriers.length;
-        final long[] lastConsumed = new long[barrierLength];
-        fill(lastConsumed, -1L);
 
         while (true)
         {
@@ -69,16 +59,16 @@
                     long available = barriers[i].waitFor(-1);
                     Sequence sequence = sequences[i];
 
-                    long previous = sequence.get();
+                    long nextSequence = sequence.get() + 1;
 
-                    for (long l = previous + 1; l <= available; l++)
+                    for (long l = nextSequence; l <= available; l++)
                     {
-                        handler.onEvent(providers[i].get(l), l, previous == available);
+                        handler.onEvent(providers[i].get(l), l, nextSequence == available);
                     }
 
                     sequence.set(available);
 
-                    count += (available - previous);
+                    count += available - nextSequence + 1;
                 }
 
                 Thread.yield();
diff -Nru disruptor-3.3.4/src/perftest/java/com/lmax/disruptor/support/ValueAdditionBatchQueueProcessor.java disruptor-3.3.5/src/perftest/java/com/lmax/disruptor/support/ValueAdditionBatchQueueProcessor.java
--- disruptor-3.3.4/src/perftest/java/com/lmax/disruptor/support/ValueAdditionBatchQueueProcessor.java	1970-01-01 00:00:00.000000000 +0000
+++ disruptor-3.3.5/src/perftest/java/com/lmax/disruptor/support/ValueAdditionBatchQueueProcessor.java	2016-07-15 04:29:57.000000000 +0000
@@ -0,0 +1,106 @@
+/*
+ * Copyright 2011 LMAX Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.lmax.disruptor.support;
+
+import java.util.ArrayList;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+
+public final class ValueAdditionBatchQueueProcessor implements Runnable
+{
+    private volatile boolean running;
+    private long value;
+    private long sequence;
+    private CountDownLatch latch;
+
+    private final BlockingQueue blockingQueue;
+    private final ArrayList batch = new ArrayList(100);
+    private final long count;
+
+    public ValueAdditionBatchQueueProcessor(final BlockingQueue blockingQueue, final long count)
+    {
+        this.blockingQueue = blockingQueue;
+        this.count = count;
+    }
+
+    public long getValue()
+    {
+        return value;
+    }
+
+    public void reset(final CountDownLatch latch)
+    {
+        value = 0L;
+        sequence = 0L;
+        this.latch = latch;
+    }
+
+    public void halt()
+    {
+        running = false;
+    }
+
+    @Override
+    public void run()
+    {
+        running = true;
+        while (true)
+        {
+            try
+            {
+                long v = blockingQueue.take();
+                sequence++;
+
+                this.value += v;
+
+                int c = blockingQueue.drainTo(batch, 100);
+                sequence += c;
+
+                v = 0;
+                for (int i = 0, n = batch.size(); i < n; i++)
+                {
+                    v += batch.get(i);
+                }
+
+                this.value += v;
+
+                batch.clear();
+
+                if (sequence == count)
+                {
+                    latch.countDown();
+                }
+            }
+            catch (InterruptedException ex)
+            {
+                if (!running)
+                {
+                    break;
+                }
+            }
+        }
+    }
+
+    @Override
+    public String toString()
+    {
+        return "ValueAdditionBatchQueueProcessor{" +
+            "value=" + value +
+            ", sequence=" + sequence +
+            ", count=" + count +
+            '}';
+    }
+}
diff -Nru disruptor-3.3.4/src/test/java/com/lmax/disruptor/example/HandleExceptionOnTranslate.java disruptor-3.3.5/src/test/java/com/lmax/disruptor/example/HandleExceptionOnTranslate.java
--- disruptor-3.3.4/src/test/java/com/lmax/disruptor/example/HandleExceptionOnTranslate.java	1970-01-01 00:00:00.000000000 +0000
+++ disruptor-3.3.5/src/test/java/com/lmax/disruptor/example/HandleExceptionOnTranslate.java	2016-07-15 04:29:57.000000000 +0000
@@ -0,0 +1,68 @@
+package com.lmax.disruptor.example;
+
+import com.lmax.disruptor.EventHandler;
+import com.lmax.disruptor.EventTranslator;
+import com.lmax.disruptor.dsl.Disruptor;
+import com.lmax.disruptor.support.LongEvent;
+import com.lmax.disruptor.util.DaemonThreadFactory;
+
+public class HandleExceptionOnTranslate
+{
+    private static final int NO_VALUE_SPECIFIED = -1;
+
+    private static class MyHandler implements EventHandler
+    {
+
+        @Override
+        public void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception
+        {
+            if (event.get() == NO_VALUE_SPECIFIED)
+            {
+                System.out.printf("Discarded%n");
+            }
+            else
+            {
+                System.out.printf("Processed: %s%n", event.get() == sequence);
+            }
+        }
+    }
+
+    public static void main(String[] args) throws InterruptedException
+    {
+        Disruptor disruptor = new Disruptor(LongEvent.FACTORY, 1024, DaemonThreadFactory.INSTANCE);
+
+        disruptor.handleEventsWith(new MyHandler());
+
+        disruptor.start();
+
+        EventTranslator t = new EventTranslator()
+        {
+            @Override
+            public void translateTo(LongEvent event, long sequence)
+            {
+                event.set(NO_VALUE_SPECIFIED);
+
+                if (sequence % 3 == 0)
+                {
+                    throw new RuntimeException("Skipping");
+                }
+
+                event.set(sequence);
+            }
+        };
+
+        for (int i = 0; i < 10; i++)
+        {
+            try
+            {
+                disruptor.publishEvent(t);
+            }
+            catch (RuntimeException e)
+            {
+                // Skipping
+            }
+        }
+
+        Thread.sleep(5000);
+    }
+}
diff -Nru disruptor-3.3.4/src/test/java/com/lmax/disruptor/example/KeyedBatching.java disruptor-3.3.5/src/test/java/com/lmax/disruptor/example/KeyedBatching.java
--- disruptor-3.3.4/src/test/java/com/lmax/disruptor/example/KeyedBatching.java	1970-01-01 00:00:00.000000000 +0000
+++ disruptor-3.3.5/src/test/java/com/lmax/disruptor/example/KeyedBatching.java	2016-07-15 04:29:57.000000000 +0000
@@ -0,0 +1,42 @@
+package com.lmax.disruptor.example;
+
+import com.lmax.disruptor.EventHandler;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class KeyedBatching implements EventHandler
+{
+    private static final int MAX_BATCH_SIZE = 100;
+    private long key = 0;
+    private List batch = new ArrayList();
+
+    @Override
+    public void onEvent(KeyedEvent event, long sequence, boolean endOfBatch) throws Exception
+    {
+        if (!batch.isEmpty() && event.key != key)
+        {
+            processBatch(batch);
+        }
+
+        batch.add(event.data);
+        key = event.key;
+
+        if (endOfBatch || batch.size() >= MAX_BATCH_SIZE)
+        {
+            processBatch(batch);
+        }
+    }
+
+    private void processBatch(List batch)
+    {
+        // do work.
+        batch.clear();
+    }
+
+    public static class KeyedEvent
+    {
+        long key;
+        Object data;
+    }
+}
diff -Nru disruptor-3.3.4/src/test/java/com/lmax/disruptor/example/SequentialThreeConsumers.java disruptor-3.3.5/src/test/java/com/lmax/disruptor/example/SequentialThreeConsumers.java
--- disruptor-3.3.4/src/test/java/com/lmax/disruptor/example/SequentialThreeConsumers.java	1970-01-01 00:00:00.000000000 +0000
+++ disruptor-3.3.5/src/test/java/com/lmax/disruptor/example/SequentialThreeConsumers.java	2016-07-15 04:29:57.000000000 +0000
@@ -0,0 +1,62 @@
+package com.lmax.disruptor.example;
+
+import com.lmax.disruptor.EventFactory;
+import com.lmax.disruptor.EventHandler;
+import com.lmax.disruptor.dsl.Disruptor;
+import com.lmax.disruptor.util.DaemonThreadFactory;
+
+public class SequentialThreeConsumers
+{
+    private static class MyEvent
+    {
+        private Object a;
+        private Object b;
+        private Object c;
+        private Object d;
+    }
+
+    private static EventFactory factory = new EventFactory()
+    {
+        @Override
+        public MyEvent newInstance()
+        {
+            return new MyEvent();
+        }
+    };
+
+    private static EventHandler handler1 = new EventHandler()
+    {
+        @Override
+        public void onEvent(MyEvent event, long sequence, boolean endOfBatch) throws Exception
+        {
+            event.b = event.a;
+        }
+    };
+
+    private static EventHandler handler2 = new EventHandler()
+    {
+        @Override
+        public void onEvent(MyEvent event, long sequence, boolean endOfBatch) throws Exception
+        {
+            event.c = event.b;
+        }
+    };
+
+    private static EventHandler handler3 = new EventHandler()
+    {
+        @Override
+        public void onEvent(MyEvent event, long sequence, boolean endOfBatch) throws Exception
+        {
+            event.d = event.c;
+        }
+    };
+
+    public static void main(String[] args)
+    {
+        Disruptor disruptor = new Disruptor(factory, 1024, DaemonThreadFactory.INSTANCE);
+
+        disruptor.handleEventsWith(handler1).then(handler2).then(handler3);
+
+        disruptor.start();
+    }
+}
diff -Nru disruptor-3.3.4/src/test/java/com/lmax/disruptor/LiteTimeoutBlockingWaitStrategyTest.java disruptor-3.3.5/src/test/java/com/lmax/disruptor/LiteTimeoutBlockingWaitStrategyTest.java
--- disruptor-3.3.4/src/test/java/com/lmax/disruptor/LiteTimeoutBlockingWaitStrategyTest.java	1970-01-01 00:00:00.000000000 +0000
+++ disruptor-3.3.5/src/test/java/com/lmax/disruptor/LiteTimeoutBlockingWaitStrategyTest.java	2016-07-15 04:29:57.000000000 +0000
@@ -0,0 +1,54 @@
+package com.lmax.disruptor;
+
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.junit.Assert.*;
+
+import java.util.concurrent.TimeUnit;
+
+import org.jmock.Expectations;
+import org.jmock.Mockery;
+import org.jmock.integration.junit4.JMock;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+@RunWith(JMock.class)
+public class LiteTimeoutBlockingWaitStrategyTest
+{
+    private final Mockery mockery = new Mockery();
+
+    @Test
+    public void shouldTimeoutWaitFor() throws Exception
+    {
+        final SequenceBarrier sequenceBarrier = mockery.mock(SequenceBarrier.class);
+
+        long theTimeout = 500;
+        LiteTimeoutBlockingWaitStrategy waitStrategy = new LiteTimeoutBlockingWaitStrategy(theTimeout, TimeUnit.MILLISECONDS);
+        Sequence cursor = new Sequence(5);
+        Sequence dependent = cursor;
+
+        mockery.checking(
+            new Expectations()
+            {
+                {
+                    allowing(sequenceBarrier).checkAlert();
+                }
+            });
+
+        long t0 = System.currentTimeMillis();
+
+        try
+        {
+            waitStrategy.waitFor(6, cursor, dependent, sequenceBarrier);
+            fail("TimeoutException should have been thrown");
+        }
+        catch (TimeoutException e)
+        {
+        }
+
+        long t1 = System.currentTimeMillis();
+
+        long timeWaiting = t1 - t0;
+
+        assertThat(timeWaiting, greaterThanOrEqualTo(theTimeout));
+    }
+}