diff -Nru disruptor-3.2.1/build.gradle disruptor-3.3.0/build.gradle --- disruptor-3.2.1/build.gradle 2014-03-10 01:07:35.000000000 +0000 +++ disruptor-3.3.0/build.gradle 2014-07-21 00:57:02.000000000 +0000 @@ -24,7 +24,7 @@ defaultTasks 'build' group = 'com.lmax' -version = new Version(major: 3, minor: 2, revision: 1) +version = new Version(major: 3, minor: 3, revision: 0) ext { fullName = 'Disruptor Framework' @@ -48,6 +48,7 @@ idea.module { testSourceDirs += sourceSets.perf.allSource.getSrcDirs() + scopes.TEST.plus += sourceSets.perf.compileClasspath } repositories { @@ -55,15 +56,17 @@ } dependencies { - testCompile 'junit:junit:4.5', 'org.jmock:jmock-junit4:2.5.1', 'org.jmock:jmock-legacy:2.5.1', files('lib/test/hdrhistogram-1.0-SNAPSHOT.jar') - perfCompile files('lib/test/hdrhistogram-1.0-SNAPSHOT.jar') + testCompile 'junit:junit:4.5', + 'org.jmock:jmock-junit4:2.5.1', + 'org.jmock:jmock-legacy:2.5.1' + perfCompile 'org.hdrhistogram:HdrHistogram:1.2.1' } sourceCompatibility = 1.6 targetCompatibility = 1.6 tasks.withType(Compile) { - options.compilerArgs << '-XDignore.symbol.file' << '-Xlint:unchecked' + options.compilerArgs << '-XDignore.symbol.file' << '-Xlint:unchecked' << '-Xlint:deprecation' options.debug = true options.fork = true options.forkOptions.executable = javaCompilerExecutable @@ -108,7 +111,7 @@ options.bottom = "Copyright © 2011 - ${Calendar.instance[Calendar.YEAR]} LMAX Ltd. All Rights Reserved." options.use = true options.version = true - options.windowTitle = 'Disruptor API' + options.showFromPublic() } jar { @@ -193,7 +196,7 @@ } task wrapper(type: Wrapper) { - gradleVersion = '1.5' + gradleVersion = '1.12' } class Version { @@ -204,4 +207,4 @@ String toString() { "$major.$minor.$revision${stage ? '.' + stage : ''}${snapshot ? '-SNAPSHOT' : ''}" } -} +} \ No newline at end of file diff -Nru disruptor-3.2.1/debian/changelog disruptor-3.3.0/debian/changelog --- disruptor-3.2.1/debian/changelog 2014-03-19 11:58:14.000000000 +0000 +++ disruptor-3.3.0/debian/changelog 2014-07-28 11:03:20.000000000 +0000 @@ -1,3 +1,10 @@ +disruptor (3.3.0-2) unstable; urgency=medium + + * New upstream release + * Updated debian/pom.xml + + -- Emmanuel Bourg Mon, 28 Jul 2014 13:03:11 +0200 + disruptor (3.2.1-1) unstable; urgency=medium * New upstream release diff -Nru disruptor-3.2.1/debian/pom.xml disruptor-3.3.0/debian/pom.xml --- disruptor-3.2.1/debian/pom.xml 2014-03-19 11:56:47.000000000 +0000 +++ disruptor-3.3.0/debian/pom.xml 2014-07-21 01:33:29.000000000 +0000 @@ -4,7 +4,7 @@ 4.0.0 com.lmax disruptor - 3.2.1 + 3.3.0 Disruptor Framework Disruptor - Concurrent Programming Framework http://lmax-exchange.github.com/disruptor diff -Nru disruptor-3.2.1/gradle/wrapper/gradle-wrapper.properties disruptor-3.3.0/gradle/wrapper/gradle-wrapper.properties --- disruptor-3.2.1/gradle/wrapper/gradle-wrapper.properties 2014-03-10 01:07:35.000000000 +0000 +++ disruptor-3.3.0/gradle/wrapper/gradle-wrapper.properties 2014-07-21 00:57:02.000000000 +0000 @@ -1,6 +1,6 @@ -#Mon Apr 08 20:09:58 BST 2013 +#Tue Jul 08 15:04:48 NZST 2014 distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists -distributionUrl=http\://services.gradle.org/distributions/gradle-1.5-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-1.12-bin.zip diff -Nru disruptor-3.2.1/gradlew disruptor-3.3.0/gradlew --- disruptor-3.2.1/gradlew 2014-03-10 01:07:35.000000000 +0000 +++ disruptor-3.3.0/gradlew 2014-07-21 00:57:02.000000000 +0000 @@ -61,9 +61,9 @@ fi done SAVED="`pwd`" -cd "`dirname \"$PRG\"`/" +cd "`dirname \"$PRG\"`/" >&- APP_HOME="`pwd -P`" -cd "$SAVED" +cd "$SAVED" >&- CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar diff -Nru disruptor-3.2.1/README.md disruptor-3.3.0/README.md --- disruptor-3.2.1/README.md 2014-03-10 01:07:35.000000000 +0000 +++ disruptor-3.3.0/README.md 2014-07-21 00:57:02.000000000 +0000 @@ -1,15 +1,25 @@ -LMAX Disruptor -============== +## LMAX Disruptor A High Performance Inter-Thread Messaging Library -Maintainer -========== +## Maintainer [Michael Barker](https://github.com/mikeb01) -Changelog -========== +## Documentation + +* [Introduction](https://github.com/LMAX-Exchange/disruptor/wiki/Introduction) +* [Getting Started](https://github.com/LMAX-Exchange/disruptor/wiki/Getting-Started) + +## Changelog + +### 3.3.0 + +- Inheritence based Padding for RingBuffer and Sequencers. +- Better DSL support for adding custom EventProcessors. +- Remove deprecated methods (slightly breaking change) +- Experimental LiteBlockingWaitStrategy +- Experimental EventPoller for polling for data instead of waiting. ### 3.2.1 Released (10-Mar-2014) @@ -46,7 +56,7 @@ - Fix off by one bug in MultiProducerSequencer.publish(lo, hi). - Improve testing for Sequencers. -## 3.0.0 Released (10-Apr-2013) +### 3.0.0 Released (10-Apr-2013) - Add remaining capacity to RingBuffer - Add batch publish methods to Sequencer @@ -78,9 +88,9 @@ - Remove claim strategies and replace with Publishers/Sequences, remove pluggability of claim strategies. - Introduce new multi-producer publisher algorithm (faster and more scalable). -- Introduce more flexible EventPublisher interface that allow for static definition of translators +- Introduce more flexible EventPublisher interface that allow for static definition of translators that can handle local values. -- Allow for dynamic addition of gating sequences to ring buffer. Default it to empty, will allow +- Allow for dynamic addition of gating sequences to ring buffer. Default it to empty, will allow messages to be sent and the ring buffer to wrap if there are no gating sequences defined. - Remove batch writes to the ring buffer. - Remove timeout read methods. @@ -107,7 +117,7 @@ - Bug fix, correct OSGI metadata. - Remove unnecessary code in wait strategies. -## 2.10 (13-May-2012) +### 2.10 (13-May-2012) - Remove deprecated timeout methods. - Added OSGI metadata to jar file. @@ -116,13 +126,13 @@ - Change Sequence implementation to work around IBM JDK bug and improve performance by ~10%. - Add a remainingCapacity() call to the Sequencer class. -## 2.9 (8-Apr-2012) +### 2.9 (8-Apr-2012) - Deprecate timeout methods for publishing. - Add tryNext and tryPublishEvent for events that shouldn't block during delivery. - Small performance enhancement for MultithreadClaimStrategy. -## 2.8 (6-Feb-2012) +### 2.8 (6-Feb-2012) - Create new MultithreadClaimStrategy that works between when threads are highly contended. Previous implementation is now called MultithreadLowContentionClaimStrategy - Fix for bug where EventProcessors weren't being added as gating sequences to the ring buffer. @@ -132,7 +142,7 @@ - Artefacts made available via maven central repository. (groupId:com.googlecode.disruptor, artifactId:disruptor) See UsingDisruptorInYourProject for details. -## 2.7 (12-Nov-2011) +### 2.7 (12-Nov-2011) - Changed construction API to allow user supplied claim and wait strategies - Added AggregateEventHandler to support multiple EventHandlers from a single BatchEventProcessor @@ -142,7 +152,7 @@ - Reworked performance tests to better support profiling and use LinkedBlockingQueue for comparison because it performs better on the latest processors - Minor bugfixes -## 2.6 +### 2.6 - Introduced WorkerPool to allow the one time consumption of events by a worker in a pool of EventProcessors. - New internal implementation of SequenceGroup which is lock free at all times and garbage free for get and set operations. @@ -156,7 +166,7 @@ - Change SequenceBarrier to always check alert status before entering waitFor cycle. Previously this was only checked when the requested sequence was not available. - Change ClaimStrategy to not spin when the buffer has no available capacity, instead go straight to yielding to allow event processors to catch up. -## 2.5 +### 2.5 - Changed RingBuffer and publisher API so any mutable object can be placed in the RingBuffer without having to extend AbstractEvent - Added EventPublisher implementation to allow the publishing steps to be combined into one action @@ -177,7 +187,7 @@ - Fixed bug in YieldingStrategy that was busy spinning more than yielding and introduced SleepingStrategy - Removed code duplication in Unicast perf tests for expected result -## 2.0.0 +### 2.0.0 - New API to reflect naming changes - Producer -> Publisher @@ -200,12 +210,12 @@ - Bug fix for setting the sequence in the ForceFillProducerBarrier. - Code syntax tidy up. -## 1.2.0 +### 1.2.0 - Bug fix for regression introduced inlining multi-thread producer commit tracking code. This was a critical bug for the multi-threaded producer scenario. - Added new ProducerBarrier method for claiming a batch of sequences. This feature can give a significant throughput increase. -## 1.1.0 +### 1.1.0 - Off by one regression bug in ProducerBarrier introduced in 1.0.9. - Clarified the algorithm for initial cursor value in the ClaimStrategy. diff -Nru disruptor-3.2.1/src/main/java/com/lmax/disruptor/AbstractSequencer.java disruptor-3.3.0/src/main/java/com/lmax/disruptor/AbstractSequencer.java --- disruptor-3.2.1/src/main/java/com/lmax/disruptor/AbstractSequencer.java 2014-03-10 01:07:35.000000000 +0000 +++ disruptor-3.3.0/src/main/java/com/lmax/disruptor/AbstractSequencer.java 2014-07-21 00:57:02.000000000 +0000 @@ -108,4 +108,18 @@ { return new ProcessingSequenceBarrier(this, waitStrategy, cursor, sequencesToTrack); } + + /** + * Creates an event poller for this sequence that will use the supplied data provider and + * gating sequences. + * + * @param dataProvider The data source for users of this event poller + * @param gatingSequences Sequence to be gated on. + * @return A poller that will gate on this ring buffer and the supplied sequences. + */ + @Override + public EventPoller newPoller(DataProvider dataProvider, Sequence... gatingSequences) + { + return EventPoller.newInstance(dataProvider, this, new Sequence(), cursor, gatingSequences); + } } \ No newline at end of file diff -Nru disruptor-3.2.1/src/main/java/com/lmax/disruptor/BatchEventProcessor.java disruptor-3.3.0/src/main/java/com/lmax/disruptor/BatchEventProcessor.java --- disruptor-3.2.1/src/main/java/com/lmax/disruptor/BatchEventProcessor.java 2014-03-10 01:07:35.000000000 +0000 +++ disruptor-3.3.0/src/main/java/com/lmax/disruptor/BatchEventProcessor.java 2014-07-21 00:57:02.000000000 +0000 @@ -34,7 +34,7 @@ private ExceptionHandler exceptionHandler = new FatalExceptionHandler(); private final DataProvider dataProvider; private final SequenceBarrier sequenceBarrier; - private final EventHandler eventHandler; + private final EventHandler eventHandler; private final Sequence sequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE); private final TimeoutHandler timeoutHandler; @@ -48,7 +48,7 @@ */ public BatchEventProcessor(final DataProvider dataProvider, final SequenceBarrier sequenceBarrier, - final EventHandler eventHandler) + final EventHandler eventHandler) { this.dataProvider = dataProvider; this.sequenceBarrier = sequenceBarrier; diff -Nru disruptor-3.2.1/src/main/java/com/lmax/disruptor/dsl/ConsumerRepository.java disruptor-3.3.0/src/main/java/com/lmax/disruptor/dsl/ConsumerRepository.java --- disruptor-3.2.1/src/main/java/com/lmax/disruptor/dsl/ConsumerRepository.java 2014-03-10 01:07:35.000000000 +0000 +++ disruptor-3.3.0/src/main/java/com/lmax/disruptor/dsl/ConsumerRepository.java 2014-07-21 00:57:02.000000000 +0000 @@ -31,7 +31,7 @@ private final Collection consumerInfos = new ArrayList(); public void add(final EventProcessor eventprocessor, - final EventHandler handler, + final EventHandler handler, final SequenceBarrier barrier) { final EventProcessorInfo consumerInfo = new EventProcessorInfo(eventprocessor, handler, barrier); @@ -96,6 +96,7 @@ } } + @Override public Iterator iterator() { return consumerInfos.iterator(); diff -Nru disruptor-3.2.1/src/main/java/com/lmax/disruptor/dsl/Disruptor.java disruptor-3.3.0/src/main/java/com/lmax/disruptor/dsl/Disruptor.java --- disruptor-3.2.1/src/main/java/com/lmax/disruptor/dsl/Disruptor.java 2014-03-10 01:07:35.000000000 +0000 +++ disruptor-3.3.0/src/main/java/com/lmax/disruptor/dsl/Disruptor.java 2014-07-21 00:57:02.000000000 +0000 @@ -15,10 +15,6 @@ */ package com.lmax.disruptor.dsl; -import java.util.concurrent.Executor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - import com.lmax.disruptor.BatchEventProcessor; import com.lmax.disruptor.EventFactory; import com.lmax.disruptor.EventHandler; @@ -35,6 +31,10 @@ import com.lmax.disruptor.WorkerPool; import com.lmax.disruptor.util.Util; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + /** * A DSL-style API for setting up the disruptor pattern around a ring buffer (aka the Builder pattern). * @@ -109,14 +109,40 @@ * @return a {@link EventHandlerGroup} that can be used to chain dependencies. */ @SuppressWarnings("varargs") - public EventHandlerGroup handleEventsWith(final EventHandler... handlers) + public EventHandlerGroup handleEventsWith(final EventHandler... handlers) { return createEventProcessors(new Sequence[0], handlers); } /** - * Set up custom event processors to handle events from the ring buffer. The Disruptor will - * automatically start this processors when {@link #start()} is called. + *

Set up custom event processors to handle events from the ring buffer. The Disruptor will + * automatically start these processors when {@link #start()} is called.

+ * + *

This method can be used as the start of a chain. For example if the handler A must + * process events before handler B:

+ *
dw.handleEventsWith(A).then(B);
+ * + *

Since this is the start of the chain, the processor factories will always be passed an empty Sequence + * array, so the factory isn't necessary in this case. This method is provided for consistency with + * {@link EventHandlerGroup#handleEventsWith(EventProcessorFactory...)} and {@link EventHandlerGroup#then(EventProcessorFactory...)} + * which do have barrier sequences to provide.

+ * + * @param eventProcessorFactories the event processor factories to use to create the event processors that will process events. + * @return a {@link EventHandlerGroup} that can be used to chain dependencies. + */ + public EventHandlerGroup handleEventsWith(final EventProcessorFactory... eventProcessorFactories) + { + final Sequence[] barrierSequences = new Sequence[0]; + return createEventProcessors(barrierSequences, eventProcessorFactories); + } + + /** + *

Set up custom event processors to handle events from the ring buffer. The Disruptor will + * automatically start this processors when {@link #start()} is called.

+ * + *

This method can be used as the start of a chain. For example if the processor A must + * process events before handler B:

+ *
dw.handleEventsWith(A).then(B);
* * @param processors the event processors that will process events. * @return a {@link EventHandlerGroup} that can be used to chain dependencies. @@ -393,7 +419,7 @@ } EventHandlerGroup createEventProcessors(final Sequence[] barrierSequences, - final EventHandler[] eventHandlers) + final EventHandler[] eventHandlers) { checkNotStarted(); @@ -402,7 +428,7 @@ for (int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++) { - final EventHandler eventHandler = eventHandlers[i]; + final EventHandler eventHandler = eventHandlers[i]; final BatchEventProcessor batchEventProcessor = new BatchEventProcessor(ringBuffer, barrier, eventHandler); @@ -423,7 +449,17 @@ return new EventHandlerGroup(this, consumerRepository, processorSequences); } - EventHandlerGroup createWorkerPool(final Sequence[] barrierSequences, final WorkHandler[] workHandlers) + EventHandlerGroup createEventProcessors(final Sequence[] barrierSequences, final EventProcessorFactory[] processorFactories) + { + final EventProcessor[] eventProcessors = new EventProcessor[processorFactories.length]; + for (int i = 0; i < processorFactories.length; i++) + { + eventProcessors[i] = processorFactories[i].createEventProcessor(ringBuffer, barrierSequences); + } + return handleEventsWith(eventProcessors); + } + + EventHandlerGroup createWorkerPool(final Sequence[] barrierSequences, final WorkHandler[] workHandlers) { final SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(barrierSequences); final WorkerPool workerPool = new WorkerPool(ringBuffer, sequenceBarrier, exceptionHandler, workHandlers); diff -Nru disruptor-3.2.1/src/main/java/com/lmax/disruptor/dsl/EventHandlerGroup.java disruptor-3.3.0/src/main/java/com/lmax/disruptor/dsl/EventHandlerGroup.java --- disruptor-3.2.1/src/main/java/com/lmax/disruptor/dsl/EventHandlerGroup.java 2014-03-10 01:07:35.000000000 +0000 +++ disruptor-3.3.0/src/main/java/com/lmax/disruptor/dsl/EventHandlerGroup.java 2014-07-21 00:57:02.000000000 +0000 @@ -15,14 +15,14 @@ */ package com.lmax.disruptor.dsl; -import java.util.Arrays; - import com.lmax.disruptor.EventHandler; import com.lmax.disruptor.EventProcessor; import com.lmax.disruptor.Sequence; import com.lmax.disruptor.SequenceBarrier; import com.lmax.disruptor.WorkHandler; +import java.util.Arrays; + /** * A group of {@link EventProcessor}s used as part of the {@link Disruptor}. * @@ -89,12 +89,27 @@ * @param handlers the batch handlers that will process events. * @return a {@link EventHandlerGroup} that can be used to set up a event processor barrier over the created event processors. */ - public EventHandlerGroup then(final EventHandler... handlers) + public EventHandlerGroup then(final EventHandler... handlers) { return handleEventsWith(handlers); } /** + *

Set up custom event processors to handle events from the ring buffer. The Disruptor will + * automatically start these processors when {@link Disruptor#start()} is called.

+ * + *

This method is generally used as part of a chain. For example if the handler A must + * process events before handler B:

+ * + * @param eventProcessorFactories the event processor factories to use to create the event processors that will process events. + * @return a {@link EventHandlerGroup} that can be used to chain dependencies. + */ + public EventHandlerGroup then(final EventProcessorFactory... eventProcessorFactories) + { + return handleEventsWith(eventProcessorFactories); + } + + /** * Set up a worker pool to handle events from the ring buffer. The worker pool will only process events * after every {@link EventProcessor} in this group has processed the event. Each event will be processed * by one of the work handler instances. @@ -107,7 +122,7 @@ * @param handlers the work handlers that will process events. Each work handler instance will provide an extra thread in the worker pool. * @return a {@link EventHandlerGroup} that can be used to set up a event processor barrier over the created event processors. */ - public EventHandlerGroup thenHandleEventsWithWorkerPool(final WorkHandler... handlers) + public EventHandlerGroup thenHandleEventsWithWorkerPool(final WorkHandler... handlers) { return handleEventsWithWorkerPool(handlers); } @@ -116,20 +131,37 @@ * Set up batch handlers to handle events from the ring buffer. These handlers will only process events * after every {@link EventProcessor} in this group has processed the event. * - *

This method is generally used as part of a chain. For example if the handler A must - * process events before handler B:

+ *

This method is generally used as part of a chain. For example if A must + * process events before B:

* *
dw.after(A).handleEventsWith(B);
* * @param handlers the batch handlers that will process events. * @return a {@link EventHandlerGroup} that can be used to set up a event processor barrier over the created event processors. */ - public EventHandlerGroup handleEventsWith(final EventHandler... handlers) + public EventHandlerGroup handleEventsWith(final EventHandler... handlers) { return disruptor.createEventProcessors(sequences, handlers); } /** + *

Set up custom event processors to handle events from the ring buffer. The Disruptor will + * automatically start these processors when {@link Disruptor#start()} is called.

+ * + *

This method is generally used as part of a chain. For example if A must + * process events before B:

+ * + *
dw.after(A).handleEventsWith(B);
+ * + * @param eventProcessorFactories the event processor factories to use to create the event processors that will process events. + * @return a {@link EventHandlerGroup} that can be used to chain dependencies. + */ + public EventHandlerGroup handleEventsWith(final EventProcessorFactory... eventProcessorFactories) + { + return disruptor.createEventProcessors(sequences, eventProcessorFactories); + } + + /** * Set up a worker pool to handle events from the ring buffer. The worker pool will only process events * after every {@link EventProcessor} in this group has processed the event. Each event will be processed * by one of the work handler instances. @@ -142,7 +174,7 @@ * @param handlers the work handlers that will process events. Each work handler instance will provide an extra thread in the worker pool. * @return a {@link EventHandlerGroup} that can be used to set up a event processor barrier over the created event processors. */ - public EventHandlerGroup handleEventsWithWorkerPool(final WorkHandler... handlers) + public EventHandlerGroup handleEventsWithWorkerPool(final WorkHandler... handlers) { return disruptor.createWorkerPool(sequences, handlers); } diff -Nru disruptor-3.2.1/src/main/java/com/lmax/disruptor/dsl/EventProcessorFactory.java disruptor-3.3.0/src/main/java/com/lmax/disruptor/dsl/EventProcessorFactory.java --- disruptor-3.2.1/src/main/java/com/lmax/disruptor/dsl/EventProcessorFactory.java 1970-01-01 00:00:00.000000000 +0000 +++ disruptor-3.3.0/src/main/java/com/lmax/disruptor/dsl/EventProcessorFactory.java 2014-07-21 00:57:02.000000000 +0000 @@ -0,0 +1,23 @@ +package com.lmax.disruptor.dsl; + +import com.lmax.disruptor.EventProcessor; +import com.lmax.disruptor.RingBuffer; +import com.lmax.disruptor.Sequence; + +/** + * A factory interface to make it possible to include custom event processors in a chain: + * + *

+ * disruptor.handleEventsWith(handler1).then((ringBuffer, barrierSequences) -> new CustomEventProcessor(ringBuffer, barrierSequences));
+ * 
+ */ +public interface EventProcessorFactory +{ + /** + * Create a new event processor that gates on barrierSequences. + * + * @param barrierSequences the sequences to gate on + * @return a new EventProcessor that gates on barrierSequences before processing events + */ + EventProcessor createEventProcessor(RingBuffer ringBuffer, Sequence[] barrierSequences); +} diff -Nru disruptor-3.2.1/src/main/java/com/lmax/disruptor/dsl/EventProcessorInfo.java disruptor-3.3.0/src/main/java/com/lmax/disruptor/dsl/EventProcessorInfo.java --- disruptor-3.2.1/src/main/java/com/lmax/disruptor/dsl/EventProcessorInfo.java 2014-03-10 01:07:35.000000000 +0000 +++ disruptor-3.3.0/src/main/java/com/lmax/disruptor/dsl/EventProcessorInfo.java 2014-07-21 00:57:02.000000000 +0000 @@ -32,11 +32,11 @@ class EventProcessorInfo implements ConsumerInfo { private final EventProcessor eventprocessor; - private final EventHandler handler; + private final EventHandler handler; private final SequenceBarrier barrier; private boolean endOfChain = true; - EventProcessorInfo(final EventProcessor eventprocessor, final EventHandler handler, final SequenceBarrier barrier) + EventProcessorInfo(final EventProcessor eventprocessor, final EventHandler handler, final SequenceBarrier barrier) { this.eventprocessor = eventprocessor; this.handler = handler; @@ -54,7 +54,7 @@ return new Sequence[] { eventprocessor.getSequence() }; } - public EventHandler getHandler() + public EventHandler getHandler() { return handler; } diff -Nru disruptor-3.2.1/src/main/java/com/lmax/disruptor/EventPoller.java disruptor-3.3.0/src/main/java/com/lmax/disruptor/EventPoller.java --- disruptor-3.2.1/src/main/java/com/lmax/disruptor/EventPoller.java 1970-01-01 00:00:00.000000000 +0000 +++ disruptor-3.3.0/src/main/java/com/lmax/disruptor/EventPoller.java 2014-07-21 00:57:02.000000000 +0000 @@ -0,0 +1,101 @@ +package com.lmax.disruptor; + +/** + * Experimental poll-based interface for the Disruptor. + */ +public class EventPoller +{ + private final DataProvider dataProvider; + private final Sequencer sequencer; + private final Sequence sequence; + private final Sequence gatingSequence; + + public interface Handler + { + boolean onEvent(T event, long sequence, boolean endOfBatch) throws Exception; + } + + public enum PollState + { + PROCESSING, GATING, IDLE + } + + public EventPoller(final DataProvider dataProvider, + final Sequencer sequencer, + final Sequence sequence, + final Sequence gatingSequence) + { + this.dataProvider = dataProvider; + this.sequencer = sequencer; + this.sequence = sequence; + this.gatingSequence = gatingSequence; + } + + public PollState poll(final Handler eventHandler) throws Exception + { + final long currentSequence = sequence.get(); + long nextSequence = currentSequence + 1; + final long availableSequence = sequencer.getHighestPublishedSequence(nextSequence, gatingSequence.get()); + + if (nextSequence <= availableSequence) + { + boolean processNextEvent; + long processedSequence = currentSequence; + + try + { + do + { + final T event = dataProvider.get(nextSequence); + processNextEvent = eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence); + processedSequence = nextSequence; + nextSequence++; + + } + while (nextSequence <= availableSequence & processNextEvent); + } + finally + { + sequence.set(processedSequence); + } + + return PollState.PROCESSING; + } + else if (sequencer.getCursor() >= nextSequence) + { + return PollState.GATING; + } + else + { + return PollState.IDLE; + } + } + + public static EventPoller newInstance(final DataProvider dataProvider, + final Sequencer sequencer, + final Sequence sequence, + final Sequence cursorSequence, + final Sequence...gatingSequences) + { + Sequence gatingSequence; + if (gatingSequences.length == 0) + { + gatingSequence = cursorSequence; + } + else if (gatingSequences.length == 1) + { + gatingSequence = gatingSequences[0]; + } + else + { + gatingSequence = new FixedSequenceGroup(gatingSequences); + } + + return new EventPoller(dataProvider, sequencer, sequence, gatingSequence); + } + + public Sequence getSequence() + { + return sequence; + } +} diff -Nru disruptor-3.2.1/src/main/java/com/lmax/disruptor/EventSequencer.java disruptor-3.3.0/src/main/java/com/lmax/disruptor/EventSequencer.java --- disruptor-3.2.1/src/main/java/com/lmax/disruptor/EventSequencer.java 1970-01-01 00:00:00.000000000 +0000 +++ disruptor-3.3.0/src/main/java/com/lmax/disruptor/EventSequencer.java 2014-07-21 00:57:02.000000000 +0000 @@ -0,0 +1,6 @@ +package com.lmax.disruptor; + +public interface EventSequencer extends DataProvider, Sequenced +{ + +} diff -Nru disruptor-3.2.1/src/main/java/com/lmax/disruptor/EventSink.java disruptor-3.3.0/src/main/java/com/lmax/disruptor/EventSink.java --- disruptor-3.2.1/src/main/java/com/lmax/disruptor/EventSink.java 1970-01-01 00:00:00.000000000 +0000 +++ disruptor-3.3.0/src/main/java/com/lmax/disruptor/EventSink.java 2014-07-21 00:57:02.000000000 +0000 @@ -0,0 +1,366 @@ +package com.lmax.disruptor; + +public interface EventSink +{ + /** + * Publishes an event to the ring buffer. It handles + * claiming the next sequence, getting the current (uninitialised) + * event from the ring buffer and publishing the claimed sequence + * after translation. + * + * @param translator The user specified translation for the event + */ + void publishEvent(EventTranslator translator); + + /** + * Attempts to publish an event to the ring buffer. It handles + * claiming the next sequence, getting the current (uninitialised) + * event from the ring buffer and publishing the claimed sequence + * after translation. Will return false if specified capacity + * was not available. + * + * @param translator The user specified translation for the event + * @return true if the value was published, false if there was insufficient + * capacity. + */ + boolean tryPublishEvent(EventTranslator translator); + + /** + * Allows one user supplied argument. + * + * @see #publishEvent(EventTranslator) + * @param translator The user specified translation for the event + * @param arg0 A user supplied argument. + */ + void publishEvent(EventTranslatorOneArg translator, A arg0); + + /** + * Allows one user supplied argument. + * + * @see #tryPublishEvent(EventTranslator) + * @param translator The user specified translation for the event + * @param arg0 A user supplied argument. + * @return true if the value was published, false if there was insufficient + * capacity. + */ + boolean tryPublishEvent(EventTranslatorOneArg translator, A arg0); + + /** + * Allows two user supplied arguments. + * + * @see #publishEvent(EventTranslator) + * @param translator The user specified translation for the event + * @param arg0 A user supplied argument. + * @param arg1 A user supplied argument. + */ + void publishEvent(EventTranslatorTwoArg translator, A arg0, B arg1); + + /** + * Allows two user supplied arguments. + * + * @see #tryPublishEvent(EventTranslator) + * @param translator The user specified translation for the event + * @param arg0 A user supplied argument. + * @param arg1 A user supplied argument. + * @return true if the value was published, false if there was insufficient + * capacity. + */ + boolean tryPublishEvent(EventTranslatorTwoArg translator, A arg0, B arg1); + + /** + * Allows three user supplied arguments + * + * @see #publishEvent(EventTranslator) + * @param translator The user specified translation for the event + * @param arg0 A user supplied argument. + * @param arg1 A user supplied argument. + * @param arg2 A user supplied argument. + */ + void publishEvent(EventTranslatorThreeArg translator, A arg0, B arg1, C arg2); + + /** + * Allows three user supplied arguments + * + * @see #publishEvent(EventTranslator) + * @param translator The user specified translation for the event + * @param arg0 A user supplied argument. + * @param arg1 A user supplied argument. + * @param arg2 A user supplied argument. + * @return true if the value was published, false if there was insufficient + * capacity. + */ + boolean tryPublishEvent(EventTranslatorThreeArg translator, A arg0, B arg1, C arg2); + + /** + * Allows a variable number of user supplied arguments + * + * @see #publishEvent(EventTranslator) + * @param translator The user specified translation for the event + * @param args User supplied arguments. + */ + void publishEvent(EventTranslatorVararg translator, Object... args); + + /** + * Allows a variable number of user supplied arguments + * + * @see #publishEvent(EventTranslator) + * @param translator The user specified translation for the event + * @param args User supplied arguments. + * @return true if the value was published, false if there was insufficient + * capacity. + */ + boolean tryPublishEvent(EventTranslatorVararg translator, Object... args); + + /** + * Publishes multiple events to the ring buffer. It handles + * claiming the next sequence, getting the current (uninitialised) + * event from the ring buffer and publishing the claimed sequence + * after translation. + *

+ * With this call the data that is to be inserted into the ring + * buffer will be a field (either explicitly or captured anonymously), + * therefore this call will require an instance of the translator + * for each value that is to be inserted into the ring buffer. + * + * @param translators The user specified translation for each event + */ + void publishEvents(EventTranslator[] translators); + + /** + * Publishes multiple events to the ring buffer. It handles + * claiming the next sequence, getting the current (uninitialised) + * event from the ring buffer and publishing the claimed sequence + * after translation. + *

+ * With this call the data that is to be inserted into the ring + * buffer will be a field (either explicitly or captured anonymously), + * therefore this call will require an instance of the translator + * for each value that is to be inserted into the ring buffer. + * + * @param translators The user specified translation for each event + * @param batchStartsAt The first element of the array which is within the batch. + * @param batchSize The actual size of the batch + */ + void publishEvents(EventTranslator[] translators, int batchStartsAt, int batchSize); + + /** + * Attempts to publish multiple events to the ring buffer. It handles + * claiming the next sequence, getting the current (uninitialised) + * event from the ring buffer and publishing the claimed sequence + * after translation. Will return false if specified capacity + * was not available. + * + * @param translators The user specified translation for the event + * @return true if the value was published, false if there was insufficient + * capacity. + */ + boolean tryPublishEvents(EventTranslator[] translators); + + /** + * Attempts to publish multiple events to the ring buffer. It handles + * claiming the next sequence, getting the current (uninitialised) + * event from the ring buffer and publishing the claimed sequence + * after translation. Will return false if specified capacity + * was not available. + * + * @param translators The user specified translation for the event + * @param batchStartsAt The first element of the array which is within the batch. + * @param batchSize The actual size of the batch + * @return true if all the values were published, false if there was insufficient + * capacity. + */ + boolean tryPublishEvents(EventTranslator[] translators, int batchStartsAt, int batchSize); + + /** + * Allows one user supplied argument per event. + * + * @param translator The user specified translation for the event + * @param arg0 A user supplied argument. + * @see #publishEvents(com.lmax.disruptor.EventTranslator[]) + */ + void publishEvents(EventTranslatorOneArg translator, A[] arg0); + + /** + * Allows one user supplied argument per event. + * + * @param translator The user specified translation for each event + * @param batchStartsAt The first element of the array which is within the batch. + * @param batchSize The actual size of the batch + * @param arg0 An array of user supplied arguments, one element per event. + * @see #publishEvents(EventTranslator[]) + */ + void publishEvents(EventTranslatorOneArg translator, int batchStartsAt, int batchSize, A[] arg0); + + /** + * Allows one user supplied argument. + * + * @param translator The user specified translation for each event + * @param arg0 An array of user supplied arguments, one element per event. + * @return true if the value was published, false if there was insufficient + * capacity. + * @see #tryPublishEvents(com.lmax.disruptor.EventTranslator[]) + */ + boolean tryPublishEvents(EventTranslatorOneArg translator, A[] arg0); + + /** + * Allows one user supplied argument. + * + * @param translator The user specified translation for each event + * @param batchStartsAt The first element of the array which is within the batch. + * @param batchSize The actual size of the batch + * @param arg0 An array of user supplied arguments, one element per event. + * @return true if the value was published, false if there was insufficient + * capacity. + * @see #tryPublishEvents(EventTranslator[]) + */ + boolean tryPublishEvents(EventTranslatorOneArg translator, int batchStartsAt, int batchSize, A[] arg0); + + /** + * Allows two user supplied arguments per event. + * + * @param translator The user specified translation for the event + * @param arg0 An array of user supplied arguments, one element per event. + * @param arg1 An array of user supplied arguments, one element per event. + * @see #publishEvents(com.lmax.disruptor.EventTranslator[]) + */ + void publishEvents(EventTranslatorTwoArg translator, A[] arg0, B[] arg1); + + /** + * Allows two user supplied arguments per event. + * + * @param translator The user specified translation for the event + * @param batchStartsAt The first element of the array which is within the batch. + * @param batchSize The actual size of the batch. + * @param arg0 An array of user supplied arguments, one element per event. + * @param arg1 An array of user supplied arguments, one element per event. + * @see #publishEvents(EventTranslator[]) + */ + void publishEvents(EventTranslatorTwoArg translator, int batchStartsAt, int batchSize, A[] arg0, + B[] arg1); + + /** + * Allows two user supplied arguments per event. + * + * @param translator The user specified translation for the event + * @param arg0 An array of user supplied arguments, one element per event. + * @param arg1 An array of user supplied arguments, one element per event. + * @return true if the value was published, false if there was insufficient + * capacity. + * @see #tryPublishEvents(com.lmax.disruptor.EventTranslator[]) + */ + boolean tryPublishEvents(EventTranslatorTwoArg translator, A[] arg0, B[] arg1); + + /** + * Allows two user supplied arguments per event. + * + * @param translator The user specified translation for the event + * @param batchStartsAt The first element of the array which is within the batch. + * @param batchSize The actual size of the batch. + * @param arg0 An array of user supplied arguments, one element per event. + * @param arg1 An array of user supplied arguments, one element per event. + * @return true if the value was published, false if there was insufficient + * capacity. + * @see #tryPublishEvents(EventTranslator[]) + */ + boolean tryPublishEvents(EventTranslatorTwoArg translator, int batchStartsAt, int batchSize, + A[] arg0, B[] arg1); + + /** + * Allows three user supplied arguments per event. + * + * @param translator The user specified translation for the event + * @param arg0 An array of user supplied arguments, one element per event. + * @param arg1 An array of user supplied arguments, one element per event. + * @param arg2 An array of user supplied arguments, one element per event. + * @see #publishEvents(com.lmax.disruptor.EventTranslator[]) + */ + void publishEvents(EventTranslatorThreeArg translator, A[] arg0, B[] arg1, C[] arg2); + + /** + * Allows three user supplied arguments per event. + * + * @param translator The user specified translation for the event + * @param batchStartsAt The first element of the array which is within the batch. + * @param batchSize The number of elements in the batch. + * @param arg0 An array of user supplied arguments, one element per event. + * @param arg1 An array of user supplied arguments, one element per event. + * @param arg2 An array of user supplied arguments, one element per event. + * @see #publishEvents(EventTranslator[]) + */ + void publishEvents(EventTranslatorThreeArg translator, int batchStartsAt, int batchSize, + A[] arg0, B[] arg1, C[] arg2); + + /** + * Allows three user supplied arguments per event. + * + * @param translator The user specified translation for the event + * @param arg0 An array of user supplied arguments, one element per event. + * @param arg1 An array of user supplied arguments, one element per event. + * @param arg2 An array of user supplied arguments, one element per event. + * @return true if the value was published, false if there was insufficient + * capacity. + * @see #publishEvents(com.lmax.disruptor.EventTranslator[]) + */ + boolean tryPublishEvents(EventTranslatorThreeArg translator, A[] arg0, B[] arg1, C[] arg2); + + /** + * Allows three user supplied arguments per event. + * + * @param translator The user specified translation for the event + * @param batchStartsAt The first element of the array which is within the batch. + * @param batchSize The actual size of the batch. + * @param arg0 An array of user supplied arguments, one element per event. + * @param arg1 An array of user supplied arguments, one element per event. + * @param arg2 An array of user supplied arguments, one element per event. + * @return true if the value was published, false if there was insufficient + * capacity. + * @see #publishEvents(EventTranslator[]) + */ + boolean tryPublishEvents(EventTranslatorThreeArg translator, int batchStartsAt, + int batchSize, A[] arg0, B[] arg1, C[] arg2); + + /** + * Allows a variable number of user supplied arguments per event. + * + * @param translator The user specified translation for the event + * @param args User supplied arguments, one Object[] per event. + * @see #publishEvents(com.lmax.disruptor.EventTranslator[]) + */ + void publishEvents(EventTranslatorVararg translator, Object[]... args); + + /** + * Allows a variable number of user supplied arguments per event. + * + * @param translator The user specified translation for the event + * @param batchStartsAt The first element of the array which is within the batch. + * @param batchSize The actual size of the batch + * @param args User supplied arguments, one Object[] per event. + * @see #publishEvents(EventTranslator[]) + */ + void publishEvents(EventTranslatorVararg translator, int batchStartsAt, int batchSize, Object[]... args); + + /** + * Allows a variable number of user supplied arguments per event. + * + * @param translator The user specified translation for the event + * @param args User supplied arguments, one Object[] per event. + * @return true if the value was published, false if there was insufficient + * capacity. + * @see #publishEvents(com.lmax.disruptor.EventTranslator[]) + */ + boolean tryPublishEvents(EventTranslatorVararg translator, Object[]... args); + + /** + * Allows a variable number of user supplied arguments per event. + * + * @param translator The user specified translation for the event + * @param batchStartsAt The first element of the array which is within the batch. + * @param batchSize The actual size of the batch. + * @param args User supplied arguments, one Object[] per event. + * @return true if the value was published, false if there was insufficient + * capacity. + * @see #publishEvents(EventTranslator[]) + */ + boolean tryPublishEvents(EventTranslatorVararg translator, int batchStartsAt, int batchSize, Object[]... args); + +} \ No newline at end of file diff -Nru disruptor-3.2.1/src/main/java/com/lmax/disruptor/LiteBlockingWaitStrategy.java disruptor-3.3.0/src/main/java/com/lmax/disruptor/LiteBlockingWaitStrategy.java --- disruptor-3.2.1/src/main/java/com/lmax/disruptor/LiteBlockingWaitStrategy.java 1970-01-01 00:00:00.000000000 +0000 +++ disruptor-3.3.0/src/main/java/com/lmax/disruptor/LiteBlockingWaitStrategy.java 2014-07-21 00:57:02.000000000 +0000 @@ -0,0 +1,90 @@ +/* + * Copyright 2011 LMAX Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.lmax.disruptor; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +/** + * Variation of the {@link BlockingWaitStrategy} that attempts to elide conditional wake-ups when + * the lock is uncontended. Shows performance improvements on microbenchmarks. However this + * wait strategy should be considered experimental as I have not full proved the correctness of + * the lock elision code. + */ +public final class LiteBlockingWaitStrategy implements WaitStrategy +{ + private final Lock lock = new ReentrantLock(); + private final Condition processorNotifyCondition = lock.newCondition(); + private final AtomicBoolean signalNeeded = new AtomicBoolean(false); + + @Override + public long waitFor(long sequence, Sequence cursorSequence, Sequence dependentSequence, SequenceBarrier barrier) + throws AlertException, InterruptedException + { + long availableSequence; + if ((availableSequence = cursorSequence.get()) < sequence) + { + lock.lock(); + + try + { + do + { + signalNeeded.getAndSet(true); + + if ((availableSequence = cursorSequence.get()) >= sequence) + { + break; + } + + barrier.checkAlert(); + processorNotifyCondition.await(); + } + while ((availableSequence = cursorSequence.get()) < sequence); + } + finally + { + lock.unlock(); + } + } + + while ((availableSequence = dependentSequence.get()) < sequence) + { + barrier.checkAlert(); + } + + return availableSequence; + } + + @Override + public void signalAllWhenBlocking() + { + if (signalNeeded.getAndSet(false)) + { + lock.lock(); + try + { + processorNotifyCondition.signalAll(); + } + finally + { + lock.unlock(); + } + } + } +} diff -Nru disruptor-3.2.1/src/main/java/com/lmax/disruptor/MultiProducerSequencer.java disruptor-3.3.0/src/main/java/com/lmax/disruptor/MultiProducerSequencer.java --- disruptor-3.2.1/src/main/java/com/lmax/disruptor/MultiProducerSequencer.java 2014-03-10 01:07:35.000000000 +0000 +++ disruptor-3.3.0/src/main/java/com/lmax/disruptor/MultiProducerSequencer.java 2014-07-21 00:57:02.000000000 +0000 @@ -23,9 +23,12 @@ /** - * Coordinator for claiming sequences for access to a data structure while tracking dependent {@link Sequence}s + *

Coordinator for claiming sequences for access to a data structure while tracking dependent {@link Sequence}s. + * Suitable for use for sequencing across multiple publisher threads.

* - * Suitable for use for sequencing across multiple publisher threads. + *

*

Note on {@link Sequencer#getCursor()}: With this sequencer the cursor value is updated after the call + * to {@link Sequencer#next()}, to determine the highest available sequence that can be read, then + * {@link Sequencer#getHighestPublishedSequence(long, long)} should be used. */ public final class MultiProducerSequencer extends AbstractSequencer { diff -Nru disruptor-3.2.1/src/main/java/com/lmax/disruptor/PhasedBackoffWaitStrategy.java disruptor-3.3.0/src/main/java/com/lmax/disruptor/PhasedBackoffWaitStrategy.java --- disruptor-3.2.1/src/main/java/com/lmax/disruptor/PhasedBackoffWaitStrategy.java 2014-03-10 01:07:35.000000000 +0000 +++ disruptor-3.3.0/src/main/java/com/lmax/disruptor/PhasedBackoffWaitStrategy.java 2014-07-21 00:57:02.000000000 +0000 @@ -52,6 +52,17 @@ } /** + * Block with wait/notifyAll semantics + */ + public static PhasedBackoffWaitStrategy withLiteLock(long spinTimeout, + long yieldTimeout, + TimeUnit units) + { + return new PhasedBackoffWaitStrategy(spinTimeout, yieldTimeout, + units, new LiteBlockingWaitStrategy()); + } + + /** * Block by sleeping in a loop */ public static PhasedBackoffWaitStrategy withSleep(long spinTimeout, diff -Nru disruptor-3.2.1/src/main/java/com/lmax/disruptor/RingBuffer.java disruptor-3.3.0/src/main/java/com/lmax/disruptor/RingBuffer.java --- disruptor-3.2.1/src/main/java/com/lmax/disruptor/RingBuffer.java 2014-03-10 01:07:35.000000000 +0000 +++ disruptor-3.3.0/src/main/java/com/lmax/disruptor/RingBuffer.java 2014-07-21 00:57:02.000000000 +0000 @@ -16,7 +16,81 @@ package com.lmax.disruptor; +import sun.misc.Unsafe; + import com.lmax.disruptor.dsl.ProducerType; +import com.lmax.disruptor.util.Util; + +abstract class RingBufferPad +{ + protected long p1, p2, p3, p4, p5, p6, p7; +} + +abstract class RingBufferFields extends RingBufferPad +{ + private static final int BUFFER_PAD; + private static final long REF_ARRAY_BASE; + private static final int REF_ELEMENT_SHIFT; + private static final Unsafe UNSAFE = Util.getUnsafe(); + static + { + final int scale = UNSAFE.arrayIndexScale(Object[].class); + if (4 == scale) + { + REF_ELEMENT_SHIFT = 2; + } + else if (8 == scale) + { + REF_ELEMENT_SHIFT = 3; + } + else + { + throw new IllegalStateException("Unknown pointer size"); + } + BUFFER_PAD = 128 / scale; + // Including the buffer pad in the array base offset + REF_ARRAY_BASE = UNSAFE.arrayBaseOffset(Object[].class) + (BUFFER_PAD << REF_ELEMENT_SHIFT); + } + + private final long indexMask; + private final Object[] entries; + protected final int bufferSize; + protected final Sequencer sequencer; + + RingBufferFields(EventFactory eventFactory, + Sequencer sequencer) + { + this.sequencer = sequencer; + this.bufferSize = sequencer.getBufferSize(); + + if (bufferSize < 1) + { + throw new IllegalArgumentException("bufferSize must not be less than 1"); + } + if (Integer.bitCount(bufferSize) != 1) + { + throw new IllegalArgumentException("bufferSize must be a power of 2"); + } + + this.indexMask = bufferSize - 1; + this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD]; + fill(eventFactory); + } + + private void fill(EventFactory eventFactory) + { + for (int i = 0; i < bufferSize; i++) + { + entries[BUFFER_PAD + i] = eventFactory.newInstance(); + } + } + + @SuppressWarnings("unchecked") + protected final E elementAt(long sequence) + { + return (E) UNSAFE.getObject(entries, REF_ARRAY_BASE + ((sequence & indexMask) << REF_ELEMENT_SHIFT)); + } +} /** * Ring based store of reusable entries containing the data representing @@ -24,14 +98,10 @@ * * @param implementation storing the data for sharing during exchange or parallel coordination of an event. */ -public final class RingBuffer implements Cursored, DataProvider +public final class RingBuffer extends RingBufferFields implements Cursored, EventSequencer, EventSink { public static final long INITIAL_CURSOR_VALUE = Sequence.INITIAL_VALUE; - - private final int indexMask; - private final Object[] entries; - private final int bufferSize; - private final Sequencer sequencer; + protected long p1, p2, p3, p4, p5, p6, p7; /** * Construct a RingBuffer with the full option set. @@ -43,21 +113,7 @@ RingBuffer(EventFactory eventFactory, Sequencer sequencer) { - this.sequencer = sequencer; - this.bufferSize = sequencer.getBufferSize(); - - if (bufferSize < 1) - { - throw new IllegalArgumentException("bufferSize must not be less than 1"); - } - if (Integer.bitCount(bufferSize) != 1) - { - throw new IllegalArgumentException("bufferSize must be a power of 2"); - } - - this.indexMask = bufferSize - 1; - this.entries = new Object[sequencer.getBufferSize()]; - fill(eventFactory); + super(eventFactory, sequencer); } /** @@ -162,28 +218,10 @@ * @param sequence for the event * @return the event for the given sequence */ - @SuppressWarnings("unchecked") + @Override public E get(long sequence) { - return (E)entries[(int)sequence & indexMask]; - } - - /** - * @deprecated Use {@link RingBuffer#get(long)} - */ - @Deprecated - public E getPreallocated(long sequence) - { - return get(sequence); - } - - /** - * @deprecated Use {@link RingBuffer#get(long)} - */ - @Deprecated - public E getPublished(long sequence) - { - return get(sequence); + return elementAt(sequence); } /** @@ -202,6 +240,7 @@ * @see RingBuffer#get(long) * @return The next sequence to publish to. */ + @Override public long next() { return sequencer.next(); @@ -215,6 +254,7 @@ * @param n number of slots to claim * @return sequence number of the highest slot claimed */ + @Override public long next(int n) { return sequencer.next(n); @@ -241,6 +281,7 @@ * @return The next sequence to publish to. * @throws InsufficientCapacityException if the necessary space in the ring buffer is not available */ + @Override public long tryNext() throws InsufficientCapacityException { return sequencer.tryNext(); @@ -254,14 +295,15 @@ * @return sequence number of the highest slot claimed * @throws InsufficientCapacityException if the necessary space in the ring buffer is not available */ + @Override public long tryNext(int n) throws InsufficientCapacityException { return sequencer.tryNext(n); } /** - * Resets the cursor to a specific value. This can be applied at any time, but it is worth not - * that it is a racy thing to do and should only be used in controlled circumstances. E.g. during + * Resets the cursor to a specific value. This can be applied at any time, but it is worth noting + * that it can cause a data race and should only be used in controlled circumstances. E.g. during * initialisation. * * @param sequence The sequence to reset too. @@ -275,7 +317,7 @@ /** * Sets the cursor to a specific sequence and returns the preallocated entry that is stored there. This - * is another deliberately racy call, that should only be done in controlled circumstances, e.g. initialisation. + * can cause a data race and should only be done in controlled circumstances, e.g. during initialisation. * * @param sequence The sequence to claim. * @return The preallocated event. @@ -345,10 +387,24 @@ } /** - * Get the current cursor value for the ring buffer. The cursor value is - * the last value that was published, or the highest available sequence - * that can be consumed. + * Creates an event poller for this ring buffer gated on the supplied sequences. + * + * @param gatingSequences + * @return A poller that will gate on this ring buffer and the supplied sequences. + */ + public EventPoller newPoller(Sequence... gatingSequences) + { + return sequencer.newPoller(this, gatingSequences); + } + + /** + * Get the current cursor value for the ring buffer. The actual value recieved + * will depend on the type of {@link Sequencer} that is being used. + * + * @see MultiProducerSequencer + * @see SingleProducerSequencer */ + @Override public long getCursor() { return sequencer.getCursor(); @@ -379,13 +435,9 @@ /** - * Publishes an event to the ring buffer. It handles - * claiming the next sequence, getting the current (uninitialised) - * event from the ring buffer and publishing the claimed sequence - * after translation. - * - * @param translator The user specified translation for the event + * @see com.lmax.disruptor.EventSink#publishEvent(com.lmax.disruptor.EventTranslator) */ + @Override public void publishEvent(EventTranslator translator) { final long sequence = sequencer.next(); @@ -393,16 +445,9 @@ } /** - * Attempts to publish an event to the ring buffer. It handles - * claiming the next sequence, getting the current (uninitialised) - * event from the ring buffer and publishing the claimed sequence - * after translation. Will return false if specified capacity - * was not available. - * - * @param translator The user specified translation for the event - * @return true if the value was published, false if there was insufficient - * capacity. + * @see com.lmax.disruptor.EventSink#tryPublishEvent(com.lmax.disruptor.EventTranslator) */ + @Override public boolean tryPublishEvent(EventTranslator translator) { try @@ -418,12 +463,10 @@ } /** - * Allows one user supplied argument. - * - * @see #publishEvent(EventTranslator) - * @param translator The user specified translation for the event - * @param arg0 A user supplied argument. + * @see com.lmax.disruptor.EventSink#publishEvent(com.lmax.disruptor.EventTranslatorOneArg, Object) + * com.lmax.disruptor.EventSink#publishEvent(com.lmax.disruptor.EventTranslatorOneArg, A) */ + @Override public void publishEvent(EventTranslatorOneArg translator, A arg0) { final long sequence = sequencer.next(); @@ -431,14 +474,10 @@ } /** - * Allows one user supplied argument. - * - * @see #tryPublishEvent(EventTranslator) - * @param translator The user specified translation for the event - * @param arg0 A user supplied argument. - * @return true if the value was published, false if there was insufficient - * capacity. + * @see com.lmax.disruptor.EventSink#tryPublishEvent(com.lmax.disruptor.EventTranslatorOneArg, Object) + * com.lmax.disruptor.EventSink#tryPublishEvent(com.lmax.disruptor.EventTranslatorOneArg, A) */ + @Override public boolean tryPublishEvent(EventTranslatorOneArg translator, A arg0) { try @@ -454,13 +493,10 @@ } /** - * Allows two user supplied arguments. - * - * @see #publishEvent(EventTranslator) - * @param translator The user specified translation for the event - * @param arg0 A user supplied argument. - * @param arg1 A user supplied argument. + * @see com.lmax.disruptor.EventSink#publishEvent(com.lmax.disruptor.EventTranslatorTwoArg, Object, Object) + * com.lmax.disruptor.EventSink#publishEvent(com.lmax.disruptor.EventTranslatorTwoArg, A, B) */ + @Override public void publishEvent(EventTranslatorTwoArg translator, A arg0, B arg1) { final long sequence = sequencer.next(); @@ -468,15 +504,10 @@ } /** - * Allows two user supplied arguments. - * - * @see #tryPublishEvent(EventTranslator) - * @param translator The user specified translation for the event - * @param arg0 A user supplied argument. - * @param arg1 A user supplied argument. - * @return true if the value was published, false if there was insufficient - * capacity. + * @see com.lmax.disruptor.EventSink#tryPublishEvent(com.lmax.disruptor.EventTranslatorTwoArg, Object, Object) + * com.lmax.disruptor.EventSink#tryPublishEvent(com.lmax.disruptor.EventTranslatorTwoArg, A, B) */ + @Override public boolean tryPublishEvent(EventTranslatorTwoArg translator, A arg0, B arg1) { try @@ -492,14 +523,10 @@ } /** - * Allows three user supplied arguments - * - * @see #publishEvent(EventTranslator) - * @param translator The user specified translation for the event - * @param arg0 A user supplied argument. - * @param arg1 A user supplied argument. - * @param arg2 A user supplied argument. + * @see com.lmax.disruptor.EventSink#publishEvent(com.lmax.disruptor.EventTranslatorThreeArg, Object, Object, Object) + * com.lmax.disruptor.EventSink#publishEvent(com.lmax.disruptor.EventTranslatorThreeArg, A, B, C) */ + @Override public void publishEvent(EventTranslatorThreeArg translator, A arg0, B arg1, C arg2) { final long sequence = sequencer.next(); @@ -507,16 +534,10 @@ } /** - * Allows three user supplied arguments - * - * @see #publishEvent(EventTranslator) - * @param translator The user specified translation for the event - * @param arg0 A user supplied argument. - * @param arg1 A user supplied argument. - * @param arg2 A user supplied argument. - * @return true if the value was published, false if there was insufficient - * capacity. + * @see com.lmax.disruptor.EventSink#tryPublishEvent(com.lmax.disruptor.EventTranslatorThreeArg, Object, Object, Object) + * com.lmax.disruptor.EventSink#tryPublishEvent(com.lmax.disruptor.EventTranslatorThreeArg, A, B, C) */ + @Override public boolean tryPublishEvent(EventTranslatorThreeArg translator, A arg0, B arg1, C arg2) { try @@ -532,12 +553,9 @@ } /** - * Allows a variable number of user supplied arguments - * - * @see #publishEvent(EventTranslator) - * @param translator The user specified translation for the event - * @param args User supplied arguments. + * @see com.lmax.disruptor.EventSink#publishEvent(com.lmax.disruptor.EventTranslatorVararg, java.lang.Object...) */ + @Override public void publishEvent(EventTranslatorVararg translator, Object...args) { final long sequence = sequencer.next(); @@ -545,14 +563,9 @@ } /** - * Allows a variable number of user supplied arguments - * - * @see #publishEvent(EventTranslator) - * @param translator The user specified translation for the event - * @param args User supplied arguments. - * @return true if the value was published, false if there was insufficient - * capacity. + * @see com.lmax.disruptor.EventSink#tryPublishEvent(com.lmax.disruptor.EventTranslatorVararg, java.lang.Object...) */ + @Override public boolean tryPublishEvent(EventTranslatorVararg translator, Object...args) { try @@ -569,28 +582,18 @@ /** - * Publishes multiple events to the ring buffer. It handles - * claiming the next sequence, getting the current (uninitialised) - * event from the ring buffer and publishing the claimed sequence - * after translation. - * - * @param translators The user specified translation for each event + * @see com.lmax.disruptor.EventSink#publishEvents(com.lmax.disruptor.EventTranslator[]) */ + @Override public void publishEvents(EventTranslator[] translators) { publishEvents(translators, 0, translators.length); } /** - * Publishes multiple events to the ring buffer. It handles - * claiming the next sequence, getting the current (uninitialised) - * event from the ring buffer and publishing the claimed sequence - * after translation. - * - * @param translators The user specified translation for each event - * @param batchStartsAt The first element of the array which is within the batch. - * @param batchSize The actual size of the batch + * @see com.lmax.disruptor.EventSink#publishEvents(com.lmax.disruptor.EventTranslator[], int, int) */ + @Override public void publishEvents(EventTranslator[] translators, int batchStartsAt, int batchSize) { checkBounds(translators, batchStartsAt, batchSize); @@ -599,34 +602,18 @@ } /** - * Attempts to publish multiple events to the ring buffer. It handles - * claiming the next sequence, getting the current (uninitialised) - * event from the ring buffer and publishing the claimed sequence - * after translation. Will return false if specified capacity - * was not available. - * - * @param translators The user specified translation for the event - * @return true if the value was published, false if there was insufficient - * capacity. + * @see com.lmax.disruptor.EventSink#tryPublishEvents(com.lmax.disruptor.EventTranslator[]) */ + @Override public boolean tryPublishEvents(EventTranslator[] translators) { return tryPublishEvents(translators, 0, translators.length); } /** - * Attempts to publish multiple events to the ring buffer. It handles - * claiming the next sequence, getting the current (uninitialised) - * event from the ring buffer and publishing the claimed sequence - * after translation. Will return false if specified capacity - * was not available. - * - * @param translators The user specified translation for the event - * @param batchStartsAt The first element of the array which is within the batch. - * @param batchSize The actual size of the batch - * @return true if all the values were published, false if there was insufficient - * capacity. + * @see com.lmax.disruptor.EventSink#tryPublishEvents(com.lmax.disruptor.EventTranslator[], int, int) */ + @Override public boolean tryPublishEvents(EventTranslator[] translators, int batchStartsAt, int batchSize) { checkBounds(translators, batchStartsAt, batchSize); @@ -643,26 +630,20 @@ } /** - * Allows one user supplied argument per event. - * - * @param translator The user specified translation for the event - * @param arg0 A user supplied argument. - * @see #publishEvents(com.lmax.disruptor.EventTranslator[]) + * @see com.lmax.disruptor.EventSink#publishEvents(com.lmax.disruptor.EventTranslatorOneArg, Object[]) + * com.lmax.disruptor.EventSink#publishEvents(com.lmax.disruptor.EventTranslatorOneArg, A[]) */ + @Override public void publishEvents(EventTranslatorOneArg translator, A[] arg0) { publishEvents(translator, 0, arg0.length, arg0); } /** - * Allows one user supplied argument per event. - * - * @param translator The user specified translation for each event - * @param batchStartsAt The first element of the array which is within the batch. - * @param batchSize The actual size of the batch - * @param arg0 An array of user supplied arguments, one element per event. - * @see #publishEvents(EventTranslator[]) + * @see com.lmax.disruptor.EventSink#publishEvents(com.lmax.disruptor.EventTranslatorOneArg, int, int, Object[]) + * com.lmax.disruptor.EventSink#publishEvents(com.lmax.disruptor.EventTranslatorOneArg, int, int, A[]) */ + @Override public void publishEvents(EventTranslatorOneArg translator, int batchStartsAt, int batchSize, A[] arg0) { checkBounds(arg0, batchStartsAt, batchSize); @@ -671,30 +652,20 @@ } /** - * Allows one user supplied argument. - * - * @param translator The user specified translation for each event - * @param arg0 An array of user supplied arguments, one element per event. - * @return true if the value was published, false if there was insufficient - * capacity. - * @see #tryPublishEvents(com.lmax.disruptor.EventTranslator[]) + * @see com.lmax.disruptor.EventSink#tryPublishEvents(com.lmax.disruptor.EventTranslatorOneArg, Object[]) + * com.lmax.disruptor.EventSink#tryPublishEvents(com.lmax.disruptor.EventTranslatorOneArg, A[]) */ + @Override public boolean tryPublishEvents(EventTranslatorOneArg translator, A[] arg0) { return tryPublishEvents(translator, 0, arg0.length, arg0); } /** - * Allows one user supplied argument. - * - * @param translator The user specified translation for each event - * @param batchStartsAt The first element of the array which is within the batch. - * @param batchSize The actual size of the batch - * @param arg0 An array of user supplied arguments, one element per event. - * @return true if the value was published, false if there was insufficient - * capacity. - * @see #tryPublishEvents(EventTranslator[]) + * @see com.lmax.disruptor.EventSink#tryPublishEvents(com.lmax.disruptor.EventTranslatorOneArg, int, int, Object[]) + * com.lmax.disruptor.EventSink#tryPublishEvents(com.lmax.disruptor.EventTranslatorOneArg, int, int, A[]) */ + @Override public boolean tryPublishEvents(EventTranslatorOneArg translator, int batchStartsAt, int batchSize, A[] arg0) { checkBounds(arg0, batchStartsAt, batchSize); @@ -711,28 +682,20 @@ } /** - * Allows two user supplied arguments per event. - * - * @param translator The user specified translation for the event - * @param arg0 An array of user supplied arguments, one element per event. - * @param arg1 An array of user supplied arguments, one element per event. - * @see #publishEvents(com.lmax.disruptor.EventTranslator[]) + * @see com.lmax.disruptor.EventSink#publishEvents(com.lmax.disruptor.EventTranslatorTwoArg, Object[], Object[]) + * com.lmax.disruptor.EventSink#publishEvents(com.lmax.disruptor.EventTranslatorTwoArg, A[], B[]) */ + @Override public void publishEvents(EventTranslatorTwoArg translator, A[] arg0, B[] arg1) { publishEvents(translator, 0, arg0.length, arg0, arg1); } /** - * Allows two user supplied arguments per event. - * - * @param translator The user specified translation for the event - * @param batchStartsAt The first element of the array which is within the batch. - * @param batchSize The actual size of the batch. - * @param arg0 An array of user supplied arguments, one element per event. - * @param arg1 An array of user supplied arguments, one element per event. - * @see #publishEvents(EventTranslator[]) + * @see com.lmax.disruptor.EventSink#publishEvents(com.lmax.disruptor.EventTranslatorTwoArg, int, int, Object[], Object[]) + * com.lmax.disruptor.EventSink#publishEvents(com.lmax.disruptor.EventTranslatorTwoArg, int, int, A[], B[]) */ + @Override public void publishEvents(EventTranslatorTwoArg translator, int batchStartsAt, int batchSize, A[] arg0, B[] arg1) { checkBounds(arg0, arg1, batchStartsAt, batchSize); @@ -741,32 +704,20 @@ } /** - * Allows two user supplied arguments per event. - * - * @param translator The user specified translation for the event - * @param arg0 An array of user supplied arguments, one element per event. - * @param arg1 An array of user supplied arguments, one element per event. - * @return true if the value was published, false if there was insufficient - * capacity. - * @see #tryPublishEvents(com.lmax.disruptor.EventTranslator[]) + * @see com.lmax.disruptor.EventSink#tryPublishEvents(com.lmax.disruptor.EventTranslatorTwoArg, Object[], Object[]) + * com.lmax.disruptor.EventSink#tryPublishEvents(com.lmax.disruptor.EventTranslatorTwoArg, A[], B[]) */ + @Override public boolean tryPublishEvents(EventTranslatorTwoArg translator, A[] arg0, B[] arg1) { return tryPublishEvents(translator, 0, arg0.length, arg0, arg1); } /** - * Allows two user supplied arguments per event. - * - * @param translator The user specified translation for the event - * @param batchStartsAt The first element of the array which is within the batch. - * @param batchSize The actual size of the batch. - * @param arg0 An array of user supplied arguments, one element per event. - * @param arg1 An array of user supplied arguments, one element per event. - * @return true if the value was published, false if there was insufficient - * capacity. - * @see #tryPublishEvents(EventTranslator[]) + * @see com.lmax.disruptor.EventSink#tryPublishEvents(com.lmax.disruptor.EventTranslatorTwoArg, int, int, Object[], Object[]) + * com.lmax.disruptor.EventSink#tryPublishEvents(com.lmax.disruptor.EventTranslatorTwoArg, int, int, A[], B[]) */ + @Override public boolean tryPublishEvents(EventTranslatorTwoArg translator, int batchStartsAt, int batchSize, A[] arg0, B[] arg1) { checkBounds(arg0, arg1, batchStartsAt, batchSize); @@ -783,30 +734,20 @@ } /** - * Allows three user supplied arguments per event. - * - * @param translator The user specified translation for the event - * @param arg0 An array of user supplied arguments, one element per event. - * @param arg1 An array of user supplied arguments, one element per event. - * @param arg2 An array of user supplied arguments, one element per event. - * @see #publishEvents(com.lmax.disruptor.EventTranslator[]) + * @see com.lmax.disruptor.EventSink#publishEvents(com.lmax.disruptor.EventTranslatorThreeArg, Object[], Object[], Object[]) + * com.lmax.disruptor.EventSink#publishEvents(com.lmax.disruptor.EventTranslatorThreeArg, A[], B[], C[]) */ + @Override public void publishEvents(EventTranslatorThreeArg translator, A[] arg0, B[] arg1, C[] arg2) { publishEvents(translator, 0, arg0.length, arg0, arg1, arg2); } /** - * Allows three user supplied arguments per event. - * - * @param translator The user specified translation for the event - * @param batchStartsAt The first element of the array which is within the batch. - * @param batchSize The number of elements in the batch. - * @param arg0 An array of user supplied arguments, one element per event. - * @param arg1 An array of user supplied arguments, one element per event. - * @param arg2 An array of user supplied arguments, one element per event. - * @see #publishEvents(EventTranslator[]) + * @see com.lmax.disruptor.EventSink#publishEvents(com.lmax.disruptor.EventTranslatorThreeArg, int, int, Object[], Object[], Object[]) + * com.lmax.disruptor.EventSink#publishEvents(com.lmax.disruptor.EventTranslatorThreeArg, int, int, A[], B[], C[]) */ + @Override public void publishEvents(EventTranslatorThreeArg translator, int batchStartsAt, int batchSize, A[] arg0, B[] arg1, C[] arg2) { checkBounds(arg0, arg1, arg2, batchStartsAt, batchSize); @@ -815,34 +756,20 @@ } /** - * Allows three user supplied arguments per event. - * - * @param translator The user specified translation for the event - * @param arg0 An array of user supplied arguments, one element per event. - * @param arg1 An array of user supplied arguments, one element per event. - * @param arg2 An array of user supplied arguments, one element per event. - * @return true if the value was published, false if there was insufficient - * capacity. - * @see #publishEvents(com.lmax.disruptor.EventTranslator[]) + * @see com.lmax.disruptor.EventSink#tryPublishEvents(com.lmax.disruptor.EventTranslatorThreeArg, Object[], Object[], Object[]) + * com.lmax.disruptor.EventSink#tryPublishEvents(com.lmax.disruptor.EventTranslatorThreeArg, A[], B[], C[]) */ + @Override public boolean tryPublishEvents(EventTranslatorThreeArg translator, A[] arg0, B[] arg1, C[] arg2) { return tryPublishEvents(translator, 0, arg0.length, arg0, arg1, arg2); } /** - * Allows three user supplied arguments per event. - * - * @param translator The user specified translation for the event - * @param batchStartsAt The first element of the array which is within the batch. - * @param batchSize The actual size of the batch. - * @param arg0 An array of user supplied arguments, one element per event. - * @param arg1 An array of user supplied arguments, one element per event. - * @param arg2 An array of user supplied arguments, one element per event. - * @return true if the value was published, false if there was insufficient - * capacity. - * @see #publishEvents(EventTranslator[]) + * @see com.lmax.disruptor.EventSink#tryPublishEvents(com.lmax.disruptor.EventTranslatorThreeArg, int, int, Object[], Object[], Object[]) + * com.lmax.disruptor.EventSink#tryPublishEvents(com.lmax.disruptor.EventTranslatorThreeArg, int, int, A[], B[], C[]) */ + @Override public boolean tryPublishEvents(EventTranslatorThreeArg translator, int batchStartsAt, int batchSize, A[] arg0, B[] arg1, C[] arg2) { checkBounds(arg0, arg1, arg2, batchStartsAt, batchSize); @@ -859,26 +786,18 @@ } /** - * Allows a variable number of user supplied arguments per event. - * - * @param translator The user specified translation for the event - * @param args User supplied arguments, one Object[] per event. - * @see #publishEvents(com.lmax.disruptor.EventTranslator[]) + * @see com.lmax.disruptor.EventSink#publishEvents(com.lmax.disruptor.EventTranslatorVararg, java.lang.Object[][]) */ + @Override public void publishEvents(EventTranslatorVararg translator, Object[]... args) { publishEvents(translator, 0, args.length, args); } /** - * Allows a variable number of user supplied arguments per event. - * - * @param translator The user specified translation for the event - * @param batchStartsAt The first element of the array which is within the batch. - * @param batchSize The actual size of the batch - * @param args User supplied arguments, one Object[] per event. - * @see #publishEvents(EventTranslator[]) + * @see com.lmax.disruptor.EventSink#publishEvents(com.lmax.disruptor.EventTranslatorVararg, int, int, java.lang.Object[][]) */ + @Override public void publishEvents(EventTranslatorVararg translator, int batchStartsAt, int batchSize, Object[]... args) { checkBounds(batchStartsAt, batchSize, args); @@ -887,30 +806,18 @@ } /** - * Allows a variable number of user supplied arguments per event. - * - * @param translator The user specified translation for the event - * @param args User supplied arguments, one Object[] per event. - * @return true if the value was published, false if there was insufficient - * capacity. - * @see #publishEvents(com.lmax.disruptor.EventTranslator[]) + * @see com.lmax.disruptor.EventSink#tryPublishEvents(com.lmax.disruptor.EventTranslatorVararg, java.lang.Object[][]) */ + @Override public boolean tryPublishEvents(EventTranslatorVararg translator, Object[]... args) { return tryPublishEvents(translator, 0, args.length, args); } /** - * Allows a variable number of user supplied arguments per event. - * - * @param translator The user specified translation for the event - * @param batchStartsAt The first element of the array which is within the batch. - * @param batchSize The actual size of the batch. - * @param args User supplied arguments, one Object[] per event. - * @return true if the value was published, false if there was insufficient - * capacity. - * @see #publishEvents(EventTranslator[]) + * @see com.lmax.disruptor.EventSink#tryPublishEvents(com.lmax.disruptor.EventTranslatorVararg, int, int, java.lang.Object[][]) */ + @Override public boolean tryPublishEvents(EventTranslatorVararg translator, int batchStartsAt, int batchSize, Object[]... args) { checkBounds(args, batchStartsAt, batchSize); @@ -932,6 +839,7 @@ * * @param sequence the sequence to publish. */ + @Override public void publish(long sequence) { sequencer.publish(sequence); @@ -945,6 +853,7 @@ * @param lo the lowest sequence number to be published * @param hi the highest sequence number to be published */ + @Override public void publish(long lo, long hi) { sequencer.publish(lo, hi); @@ -1172,12 +1081,4 @@ sequencer.publish(initialSequence, finalSequence); } } - - private void fill(EventFactory eventFactory) - { - for (int i = 0; i < entries.length; i++) - { - entries[i] = eventFactory.newInstance(); - } - } } diff -Nru disruptor-3.2.1/src/main/java/com/lmax/disruptor/Sequenced.java disruptor-3.3.0/src/main/java/com/lmax/disruptor/Sequenced.java --- disruptor-3.2.1/src/main/java/com/lmax/disruptor/Sequenced.java 1970-01-01 00:00:00.000000000 +0000 +++ disruptor-3.3.0/src/main/java/com/lmax/disruptor/Sequenced.java 2014-07-21 00:57:02.000000000 +0000 @@ -0,0 +1,85 @@ +package com.lmax.disruptor; + +public interface Sequenced +{ + /** + * The capacity of the data structure to hold entries. + * + * @return the size of the RingBuffer. + */ + int getBufferSize(); + + /** + * Has the buffer got capacity to allocate another sequence. This is a concurrent + * method so the response should only be taken as an indication of available capacity. + * @param requiredCapacity in the buffer + * @return true if the buffer has the capacity to allocate the next sequence otherwise false. + */ + boolean hasAvailableCapacity(final int requiredCapacity); + + /** + * Get the remaining capacity for this sequencer. + * @return The number of slots remaining. + */ + long remainingCapacity(); + + /** + * Claim the next event in sequence for publishing. + * @return the claimed sequence value + */ + long next(); + + /** + * Claim the next n events in sequence for publishing. This is for batch event producing. Using batch producing + * requires a little care and some math. + *

+     * int n = 10;
+     * long hi = sequencer.next(n);
+     * long lo = hi - (n - 1);
+     * for (long sequence = lo; sequence <= hi; sequence++) {
+     *     // Do work.
+     * }
+     * sequencer.publish(lo, hi);
+     * 
+ * + * @param n the number of sequences to claim + * @return the highest claimed sequence value + */ + long next(int n); + + /** + * Attempt to claim the next event in sequence for publishing. Will return the + * number of the slot if there is at least requiredCapacity slots + * available. + * @return the claimed sequence value + * @throws InsufficientCapacityException + */ + long tryNext() throws InsufficientCapacityException; + + /** + * Attempt to claim the next n events in sequence for publishing. Will return the + * highest numbered slot if there is at least requiredCapacity slots + * available. Have a look at {@link Sequencer#next()} for a description on how to + * use this method. + * + * @param n the number of sequences to claim + * @return the claimed sequence value + * @throws InsufficientCapacityException + */ + long tryNext(int n) throws InsufficientCapacityException; + + /** + * Publishes a sequence. Call when the event has been filled. + * + * @param sequence + */ + void publish(long sequence); + + /** + * Batch publish sequences. Called when all of the events have been filled. + * + * @param lo first sequence number to publish + * @param hi last sequence number to publish + */ + void publish(long lo, long hi); +} \ No newline at end of file diff -Nru disruptor-3.2.1/src/main/java/com/lmax/disruptor/Sequence.java disruptor-3.3.0/src/main/java/com/lmax/disruptor/Sequence.java --- disruptor-3.2.1/src/main/java/com/lmax/disruptor/Sequence.java 2014-03-10 01:07:35.000000000 +0000 +++ disruptor-3.3.0/src/main/java/com/lmax/disruptor/Sequence.java 2014-07-21 00:57:02.000000000 +0000 @@ -15,9 +15,25 @@ */ package com.lmax.disruptor; +import sun.misc.Unsafe; + import com.lmax.disruptor.util.Util; -import sun.misc.Unsafe; + +class LhsPadding +{ + protected long p1, p2, p3, p4, p5, p6, p7; +} + +class Value extends LhsPadding +{ + protected volatile long value; +} + +class RhsPadding extends Value +{ + protected long p9, p10, p11, p12, p13, p14, p15; +} /** *

Concurrent sequence class used for tracking the progress of @@ -27,7 +43,7 @@ *

Also attempts to be more efficient with regards to false * sharing by adding padding around the volatile field. */ -public class Sequence +public class Sequence extends RhsPadding { static final long INITIAL_VALUE = -1L; private static final Unsafe UNSAFE; @@ -36,13 +52,16 @@ static { UNSAFE = Util.getUnsafe(); - final int base = UNSAFE.arrayBaseOffset(long[].class); - final int scale = UNSAFE.arrayIndexScale(long[].class); - VALUE_OFFSET = base + (scale * 7); + try + { + VALUE_OFFSET = UNSAFE.objectFieldOffset(Value.class.getDeclaredField("value")); + } + catch (final Exception e) + { + throw new RuntimeException(e); + } } - private final long[] paddedValue = new long[15]; - /** * Create a sequence initialised to -1. */ @@ -58,7 +77,7 @@ */ public Sequence(final long initialValue) { - UNSAFE.putOrderedLong(paddedValue, VALUE_OFFSET, initialValue); + UNSAFE.putOrderedLong(this, VALUE_OFFSET, initialValue); } /** @@ -68,7 +87,7 @@ */ public long get() { - return UNSAFE.getLongVolatile(paddedValue, VALUE_OFFSET); + return value; } /** @@ -80,7 +99,7 @@ */ public void set(final long value) { - UNSAFE.putOrderedLong(paddedValue, VALUE_OFFSET, value); + UNSAFE.putOrderedLong(this, VALUE_OFFSET, value); } /** @@ -93,7 +112,7 @@ */ public void setVolatile(final long value) { - UNSAFE.putLongVolatile(paddedValue, VALUE_OFFSET, value); + UNSAFE.putLongVolatile(this, VALUE_OFFSET, value); } /** @@ -105,7 +124,7 @@ */ public boolean compareAndSet(final long expectedValue, final long newValue) { - return UNSAFE.compareAndSwapLong(paddedValue, VALUE_OFFSET, expectedValue, newValue); + return UNSAFE.compareAndSwapLong(this, VALUE_OFFSET, expectedValue, newValue); } /** @@ -139,6 +158,7 @@ return newValue; } + @Override public String toString() { return Long.toString(get()); diff -Nru disruptor-3.2.1/src/main/java/com/lmax/disruptor/Sequencer.java disruptor-3.3.0/src/main/java/com/lmax/disruptor/Sequencer.java --- disruptor-3.2.1/src/main/java/com/lmax/disruptor/Sequencer.java 2014-03-10 01:07:35.000000000 +0000 +++ disruptor-3.3.0/src/main/java/com/lmax/disruptor/Sequencer.java 2014-07-21 00:57:02.000000000 +0000 @@ -18,78 +18,12 @@ /** * Coordinates claiming sequences for access to a data structure while tracking dependent {@link Sequence}s */ -public interface Sequencer extends Cursored +public interface Sequencer extends Cursored, Sequenced { /** Set to -1 as sequence starting point */ long INITIAL_CURSOR_VALUE = -1L; /** - * The capacity of the data structure to hold entries. - * - * @return the size of the RingBuffer. - */ - int getBufferSize(); - - /** - * Has the buffer got capacity to allocate another sequence. This is a concurrent - * method so the response should only be taken as an indication of available capacity. - * @param requiredCapacity in the buffer - * @return true if the buffer has the capacity to allocate the next sequence otherwise false. - */ - boolean hasAvailableCapacity(final int requiredCapacity); - - /** - * Claim the next event in sequence for publishing. - * @return the claimed sequence value - */ - long next(); - - /** - * Claim the next n events in sequence for publishing. This is for batch event producing. Using batch producing - * requires a little care and some math. - *

-     * int n = 10;
-     * long hi = sequencer.next(n);
-     * long lo = hi - (n - 1);
-     * for (long sequence = lo; sequence <= hi; sequence++) {
-     *     // Do work.
-     * }
-     * sequencer.publish(lo, hi);
-     * 
- * - * @param n the number of sequences to claim - * @return the highest claimed sequence value - */ - long next(int n); - - /** - * Attempt to claim the next event in sequence for publishing. Will return the - * number of the slot if there is at least requiredCapacity slots - * available. - * @return the claimed sequence value - * @throws InsufficientCapacityException - */ - long tryNext() throws InsufficientCapacityException; - - /** - * Attempt to claim the next n events in sequence for publishing. Will return the - * highest numbered slot if there is at least requiredCapacity slots - * available. Have a look at {@link Sequencer#next()} for a description on how to - * use this method. - * - * @param n the number of sequences to claim - * @return the claimed sequence value - * @throws InsufficientCapacityException - */ - long tryNext(int n) throws InsufficientCapacityException; - - /** - * Get the remaining capacity for this sequencer. - * @return The number of slots remaining. - */ - long remainingCapacity(); - - /** * Claim a specific sequence. Only used if initialising the ring buffer to * a specific value. * @@ -98,21 +32,6 @@ void claim(long sequence); /** - * Publishes a sequence. Call when the event has been filled. - * - * @param sequence - */ - void publish(long sequence); - - /** - * Batch publish sequences. Called when all of the events have been filled. - * - * @param lo first sequence number to publish - * @param hi last sequence number to publish - */ - void publish(long lo, long hi); - - /** * Confirms if a sequence is published and the event is available for use; non-blocking. * * @param sequence of the buffer to check @@ -155,5 +74,19 @@ */ long getMinimumSequence(); - long getHighestPublishedSequence(long sequence, long availableSequence); + /** + * Get the highest sequence number that can be safely read from the ring buffer. Depending + * on the implementation of the Sequencer this call may need to scan a number of values + * in the Sequencer. The scan will range from nextSequence to availableSequence. If + * there are no available values >= nextSequence the return value will be + * nextSequence - 1. To work correctly a consumer should pass a value that + * it 1 higher than the last sequence that was successfully processed. + * + * @param nextSequence The sequence to start scanning from. + * @param availableSequence The sequence to scan to. + * @return The highest value that can be safely read, will be at least nextSequence - 1. + */ + long getHighestPublishedSequence(long nextSequence, long availableSequence); + + EventPoller newPoller(DataProvider provider, Sequence...gatingSequences); } \ No newline at end of file diff -Nru disruptor-3.2.1/src/main/java/com/lmax/disruptor/SingleProducerSequencer.java disruptor-3.3.0/src/main/java/com/lmax/disruptor/SingleProducerSequencer.java --- disruptor-3.2.1/src/main/java/com/lmax/disruptor/SingleProducerSequencer.java 2014-03-10 01:07:35.000000000 +0000 +++ disruptor-3.3.0/src/main/java/com/lmax/disruptor/SingleProducerSequencer.java 2014-07-21 00:57:02.000000000 +0000 @@ -19,22 +19,38 @@ import com.lmax.disruptor.util.Util; +abstract class SingleProducerSequencerPad extends AbstractSequencer +{ + protected long p1, p2, p3, p4, p5, p6, p7; + public SingleProducerSequencerPad(int bufferSize, WaitStrategy waitStrategy) + { + super(bufferSize, waitStrategy); + } +} -/** - *

Coordinator for claiming sequences for access to a data structure while tracking dependent {@link Sequence}s.

- * - *

Generally not safe for use from multiple threads as it does not implement any barriers.

- */ -public final class SingleProducerSequencer extends AbstractSequencer +abstract class SingleProducerSequencerFields extends SingleProducerSequencerPad { - @SuppressWarnings("unused") - private static class Padding + public SingleProducerSequencerFields(int bufferSize, WaitStrategy waitStrategy) { - /** Set to -1 as sequence starting point */ - public long nextValue = Sequence.INITIAL_VALUE, cachedValue = Sequence.INITIAL_VALUE, p2, p3, p4, p5, p6, p7; + super(bufferSize, waitStrategy); } - private final Padding pad = new Padding(); + /** Set to -1 as sequence starting point */ + protected long nextValue = Sequence.INITIAL_VALUE; + protected long cachedValue = Sequence.INITIAL_VALUE; +} + +/** + *

Coordinator for claiming sequences for access to a data structure while tracking dependent {@link Sequence}s. + * Not safe for use from multiple threads as it does not implement any barriers.

+ * + *

Note on {@link Sequencer#getCursor()}: With this sequencer the cursor value is updated after the call + * to {@link Sequencer#publish(long)} is made. + */ + +public final class SingleProducerSequencer extends SingleProducerSequencerFields +{ + protected long p1, p2, p3, p4, p5, p6, p7; /** * Construct a Sequencer with the selected wait strategy and buffer size. @@ -53,15 +69,15 @@ @Override public boolean hasAvailableCapacity(final int requiredCapacity) { - long nextValue = pad.nextValue; + long nextValue = this.nextValue; long wrapPoint = (nextValue + requiredCapacity) - bufferSize; - long cachedGatingSequence = pad.cachedValue; + long cachedGatingSequence = this.cachedValue; if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue) { long minSequence = Util.getMinimumSequence(gatingSequences, nextValue); - pad.cachedValue = minSequence; + this.cachedValue = minSequence; if (wrapPoint > minSequence) { @@ -92,11 +108,11 @@ throw new IllegalArgumentException("n must be > 0"); } - long nextValue = pad.nextValue; + long nextValue = this.nextValue; long nextSequence = nextValue + n; long wrapPoint = nextSequence - bufferSize; - long cachedGatingSequence = pad.cachedValue; + long cachedGatingSequence = this.cachedValue; if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue) { @@ -106,10 +122,10 @@ LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin? } - pad.cachedValue = minSequence; + this.cachedValue = minSequence; } - pad.nextValue = nextSequence; + this.nextValue = nextSequence; return nextSequence; } @@ -139,7 +155,7 @@ throw InsufficientCapacityException.INSTANCE; } - long nextSequence = pad.nextValue += n; + long nextSequence = this.nextValue += n; return nextSequence; } @@ -150,7 +166,7 @@ @Override public long remainingCapacity() { - long nextValue = pad.nextValue; + long nextValue = this.nextValue; long consumed = Util.getMinimumSequence(gatingSequences, nextValue); long produced = nextValue; @@ -163,7 +179,7 @@ @Override public void claim(long sequence) { - pad.nextValue = sequence; + this.nextValue = sequence; } /** diff -Nru disruptor-3.2.1/src/main/java/com/lmax/disruptor/WorkerPool.java disruptor-3.3.0/src/main/java/com/lmax/disruptor/WorkerPool.java --- disruptor-3.2.1/src/main/java/com/lmax/disruptor/WorkerPool.java 2014-03-10 01:07:35.000000000 +0000 +++ disruptor-3.3.0/src/main/java/com/lmax/disruptor/WorkerPool.java 2014-07-21 00:57:02.000000000 +0000 @@ -48,7 +48,7 @@ public WorkerPool(final RingBuffer ringBuffer, final SequenceBarrier sequenceBarrier, final ExceptionHandler exceptionHandler, - final WorkHandler... workHandlers) + final WorkHandler... workHandlers) { this.ringBuffer = ringBuffer; final int numWorkers = workHandlers.length; @@ -75,7 +75,7 @@ */ public WorkerPool(final EventFactory eventFactory, final ExceptionHandler exceptionHandler, - final WorkHandler... workHandlers) + final WorkHandler... workHandlers) { ringBuffer = RingBuffer.createMultiProducer(eventFactory, 1024, new BlockingWaitStrategy()); final SequenceBarrier barrier = ringBuffer.newBarrier(); diff -Nru disruptor-3.2.1/src/main/java/com/lmax/disruptor/WorkProcessor.java disruptor-3.3.0/src/main/java/com/lmax/disruptor/WorkProcessor.java --- disruptor-3.2.1/src/main/java/com/lmax/disruptor/WorkProcessor.java 2014-03-10 01:07:35.000000000 +0000 +++ disruptor-3.3.0/src/main/java/com/lmax/disruptor/WorkProcessor.java 2014-07-21 00:57:02.000000000 +0000 @@ -32,7 +32,7 @@ private final Sequence sequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE); private final RingBuffer ringBuffer; private final SequenceBarrier sequenceBarrier; - private final WorkHandler workHandler; + private final WorkHandler workHandler; private final ExceptionHandler exceptionHandler; private final Sequence workSequence; @@ -57,7 +57,7 @@ */ public WorkProcessor(final RingBuffer ringBuffer, final SequenceBarrier sequenceBarrier, - final WorkHandler workHandler, + final WorkHandler workHandler, final ExceptionHandler exceptionHandler, final Sequence workSequence) { diff -Nru disruptor-3.2.1/src/perftest/java/com/lmax/disruptor/queue/PingPongQueueLatencyTest.java disruptor-3.3.0/src/perftest/java/com/lmax/disruptor/queue/PingPongQueueLatencyTest.java --- disruptor-3.2.1/src/perftest/java/com/lmax/disruptor/queue/PingPongQueueLatencyTest.java 2014-03-10 01:07:35.000000000 +0000 +++ disruptor-3.3.0/src/perftest/java/com/lmax/disruptor/queue/PingPongQueueLatencyTest.java 2014-07-21 00:57:02.000000000 +0000 @@ -93,20 +93,20 @@ } } - private static void dumpHistogram(Histogram histogram, final PrintStream out) + private static void dumpHistogram(final Histogram histogram, final PrintStream out) { - histogram.getHistogramData().outputPercentileDistribution(out, 1, 1000.0); + histogram.outputPercentileDistribution(out, 1, 1000.0); } private void runQueuePass() throws Exception { - CountDownLatch latch = new CountDownLatch(1); - CyclicBarrier barrier = new CyclicBarrier(3); + final CountDownLatch latch = new CountDownLatch(1); + final CyclicBarrier barrier = new CyclicBarrier(3); qPinger.reset(barrier, latch, histogram); qPonger.reset(barrier); - Future pingFuture = executor.submit(qPinger); - Future pongFuture = executor.submit(qPonger); + final Future pingFuture = executor.submit(qPinger); + final Future pongFuture = executor.submit(qPonger); barrier.await(); latch.await(); @@ -115,9 +115,9 @@ pongFuture.cancel(true); } - public static void main(String[] args) throws Exception + public static void main(final String[] args) throws Exception { - PingPongQueueLatencyTest test = new PingPongQueueLatencyTest(); + final PingPongQueueLatencyTest test = new PingPongQueueLatencyTest(); test.testImplementation(); } @@ -133,7 +133,7 @@ private long counter; private final long maxEvents; - public QueuePinger(BlockingQueue pingQueue, BlockingQueue pongQueue, long maxEvents, long pauseTimeNs) + public QueuePinger(final BlockingQueue pingQueue, final BlockingQueue pongQueue, final long maxEvents, final long pauseTimeNs) { this.pingQueue = pingQueue; this.pongQueue = pongQueue; @@ -154,12 +154,12 @@ while (response < maxEvents) { - long t0 = System.nanoTime(); + final long t0 = System.nanoTime(); pingQueue.put(counter++); response = pongQueue.take(); - long t1 = System.nanoTime(); + final long t1 = System.nanoTime(); - histogram.recordValue(t1 - t0, pauseTimeNs); + histogram.recordValueWithExpectedInterval(t1 - t0, pauseTimeNs); while (pauseTimeNs > (System.nanoTime() - t1)) { @@ -169,14 +169,14 @@ latch.countDown(); } - catch (Exception e) + catch (final Exception e) { e.printStackTrace(); return; } } - public void reset(CyclicBarrier barrier, CountDownLatch latch, Histogram histogram) + public void reset(final CyclicBarrier barrier, final CountDownLatch latch, final Histogram histogram) { this.histogram = histogram; this.barrier = barrier; @@ -192,7 +192,7 @@ private final BlockingQueue pongQueue; private CyclicBarrier barrier; - public QueuePonger(BlockingQueue pingQueue, BlockingQueue pongQueue) + public QueuePonger(final BlockingQueue pingQueue, final BlockingQueue pongQueue) { this.pingQueue = pingQueue; this.pongQueue = pongQueue; @@ -201,28 +201,28 @@ @Override public void run() { - Thread thread = Thread.currentThread(); + final Thread thread = Thread.currentThread(); try { barrier.await(); while (!thread.isInterrupted()) { - Long value = pingQueue.take(); + final Long value = pingQueue.take(); pongQueue.put(value); } } - catch (InterruptedException e) + catch (final InterruptedException e) { // do-nothing. } - catch (Exception e) + catch (final Exception e) { e.printStackTrace(); } } - public void reset(CyclicBarrier barrier) + public void reset(final CyclicBarrier barrier) { this.barrier = barrier; } diff -Nru disruptor-3.2.1/src/perftest/java/com/lmax/disruptor/raw/OneToOneRawBatchThroughputTest.java disruptor-3.3.0/src/perftest/java/com/lmax/disruptor/raw/OneToOneRawBatchThroughputTest.java --- disruptor-3.2.1/src/perftest/java/com/lmax/disruptor/raw/OneToOneRawBatchThroughputTest.java 2014-03-10 01:07:35.000000000 +0000 +++ disruptor-3.3.0/src/perftest/java/com/lmax/disruptor/raw/OneToOneRawBatchThroughputTest.java 2014-07-21 00:57:02.000000000 +0000 @@ -22,6 +22,7 @@ import com.lmax.disruptor.AbstractPerfTestDisruptor; import com.lmax.disruptor.Sequence; import com.lmax.disruptor.SequenceBarrier; +import com.lmax.disruptor.Sequenced; import com.lmax.disruptor.Sequencer; import com.lmax.disruptor.SingleProducerSequencer; import com.lmax.disruptor.YieldingWaitStrategy; @@ -102,7 +103,7 @@ executor.submit(myRunnable); long start = System.currentTimeMillis(); - final Sequencer sequencer = this.sequencer; + final Sequenced sequencer = this.sequencer; for (long i = 0; i < ITERATIONS; i++) { diff -Nru disruptor-3.2.1/src/perftest/java/com/lmax/disruptor/raw/OneToOneRawThroughputTest.java disruptor-3.3.0/src/perftest/java/com/lmax/disruptor/raw/OneToOneRawThroughputTest.java --- disruptor-3.2.1/src/perftest/java/com/lmax/disruptor/raw/OneToOneRawThroughputTest.java 2014-03-10 01:07:35.000000000 +0000 +++ disruptor-3.3.0/src/perftest/java/com/lmax/disruptor/raw/OneToOneRawThroughputTest.java 2014-07-21 00:57:02.000000000 +0000 @@ -22,6 +22,7 @@ import com.lmax.disruptor.AbstractPerfTestDisruptor; import com.lmax.disruptor.Sequence; import com.lmax.disruptor.SequenceBarrier; +import com.lmax.disruptor.Sequenced; import com.lmax.disruptor.Sequencer; import com.lmax.disruptor.SingleProducerSequencer; import com.lmax.disruptor.YieldingWaitStrategy; @@ -101,7 +102,7 @@ executor.submit(myRunnable); long start = System.currentTimeMillis(); - final Sequencer sequencer = this.sequencer; + final Sequenced sequencer = this.sequencer; for (long i = 0; i < ITERATIONS; i++) { diff -Nru disruptor-3.2.1/src/perftest/java/com/lmax/disruptor/sequenced/OneToOneSequencedBatchThroughputTest.java disruptor-3.3.0/src/perftest/java/com/lmax/disruptor/sequenced/OneToOneSequencedBatchThroughputTest.java --- disruptor-3.2.1/src/perftest/java/com/lmax/disruptor/sequenced/OneToOneSequencedBatchThroughputTest.java 2014-03-10 01:07:35.000000000 +0000 +++ disruptor-3.3.0/src/perftest/java/com/lmax/disruptor/sequenced/OneToOneSequencedBatchThroughputTest.java 2014-07-21 00:57:02.000000000 +0000 @@ -63,8 +63,8 @@ */ public final class OneToOneSequencedBatchThroughputTest extends AbstractPerfTestDisruptor { - public static final int BATCH_SIZE = 2048; - private static final int BUFFER_SIZE = 1024 * 1024; + public static final int BATCH_SIZE = 10; + private static final int BUFFER_SIZE = 1024 * 64; private static final long ITERATIONS = 1000L * 1000L * 100L; private final ExecutorService executor = Executors.newSingleThreadExecutor(DaemonThreadFactory.INSTANCE); private final long expectedResult = PerfTestUtil.accumulatedAddition(ITERATIONS) * BATCH_SIZE; diff -Nru disruptor-3.2.1/src/perftest/java/com/lmax/disruptor/sequenced/OneToOneSequencedPollerThroughputTest.java disruptor-3.3.0/src/perftest/java/com/lmax/disruptor/sequenced/OneToOneSequencedPollerThroughputTest.java --- disruptor-3.2.1/src/perftest/java/com/lmax/disruptor/sequenced/OneToOneSequencedPollerThroughputTest.java 1970-01-01 00:00:00.000000000 +0000 +++ disruptor-3.3.0/src/perftest/java/com/lmax/disruptor/sequenced/OneToOneSequencedPollerThroughputTest.java 2014-07-21 00:57:02.000000000 +0000 @@ -0,0 +1,195 @@ +/* + * Copyright 2011 LMAX Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.lmax.disruptor.sequenced; + +import static com.lmax.disruptor.RingBuffer.createSingleProducer; +import static com.lmax.disruptor.support.PerfTestUtil.failIfNot; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import com.lmax.disruptor.AbstractPerfTestDisruptor; +import com.lmax.disruptor.EventPoller; +import com.lmax.disruptor.EventPoller.PollState; +import com.lmax.disruptor.RingBuffer; +import com.lmax.disruptor.YieldingWaitStrategy; +import com.lmax.disruptor.support.PerfTestUtil; +import com.lmax.disruptor.support.ValueEvent; +import com.lmax.disruptor.util.DaemonThreadFactory; +import com.lmax.disruptor.util.PaddedLong; + +/** + *

+ * UniCast a series of items between 1 publisher and 1 event processor.
+ *
+ * +----+    +-----+
+ * | P1 |--->| EP1 |
+ * +----+    +-----+
+ *
+ * Disruptor:
+ * ==========
+ *              track to prevent wrap
+ *              +------------------+
+ *              |                  |
+ *              |                  v
+ * +----+    +====+    +====+   +-----+
+ * | P1 |--->| RB |<---| SB |   | EP1 |
+ * +----+    +====+    +====+   +-----+
+ *      claim      get    ^        |
+ *                        |        |
+ *                        +--------+
+ *                          waitFor
+ *
+ * P1  - Publisher 1
+ * RB  - RingBuffer
+ * SB  - SequenceBarrier
+ * EP1 - EventProcessor 1
+ *
+ * 
+ */ +public final class OneToOneSequencedPollerThroughputTest extends AbstractPerfTestDisruptor +{ + private static final int BUFFER_SIZE = 1024 * 64; + private static final long ITERATIONS = 1000L * 1000L * 100L; + private final ExecutorService executor = Executors.newSingleThreadExecutor(DaemonThreadFactory.INSTANCE); + private final long expectedResult = PerfTestUtil.accumulatedAddition(ITERATIONS); + + /////////////////////////////////////////////////////////////////////////////////////////////// + + private final RingBuffer ringBuffer = + createSingleProducer(ValueEvent.EVENT_FACTORY, BUFFER_SIZE, new YieldingWaitStrategy()); + + private final EventPoller poller = ringBuffer.newPoller(); + private final PollRunnable pollRunnable = new PollRunnable(poller); + { + ringBuffer.addGatingSequences(poller.getSequence()); + } + + /////////////////////////////////////////////////////////////////////////////////////////////// + + @Override + protected int getRequiredProcessorCount() + { + return 2; + } + + private static class PollRunnable implements Runnable, EventPoller.Handler + { + private final EventPoller poller; + private volatile boolean running = true; + private final PaddedLong value = new PaddedLong(); + private CountDownLatch latch; + private long count; + + public PollRunnable(EventPoller poller) + { + this.poller = poller; + } + + @Override + public void run() + { + try + { + while (running) + { + if (PollState.PROCESSING != poller.poll(this)) + { + Thread.yield(); + } + } + } + catch (Exception e) + { + e.printStackTrace(); + } + } + + @Override + public boolean onEvent(ValueEvent event, long sequence, boolean endOfBatch) + { + value.set(value.get() + event.getValue()); + + if (count == sequence) + { + latch.countDown(); + } + + return true; + } + + public void halt() + { + running = false; + } + + public void reset(final CountDownLatch latch, final long expectedCount) + { + value.set(0L); + this.latch = latch; + count = expectedCount; + running = true; + } + + public long getValue() + { + return value.get(); + } + } + + @Override + protected long runDisruptorPass() throws InterruptedException + { + final CountDownLatch latch = new CountDownLatch(1); + long expectedCount = poller.getSequence().get() + ITERATIONS; + pollRunnable.reset(latch, expectedCount); + executor.submit(pollRunnable); + long start = System.currentTimeMillis(); + + final RingBuffer rb = ringBuffer; + + for (long i = 0; i < ITERATIONS; i++) + { + long next = rb.next(); + rb.get(next).setValue(i); + rb.publish(next); + } + + latch.await(); + long opsPerSecond = (ITERATIONS * 1000L) / (System.currentTimeMillis() - start); + waitForEventProcessorSequence(expectedCount); + pollRunnable.halt(); + + failIfNot(expectedResult, pollRunnable.getValue()); + + return opsPerSecond; + } + + private void waitForEventProcessorSequence(long expectedCount) throws InterruptedException + { + while (poller.getSequence().get() != expectedCount) + { + Thread.sleep(1); + } + } + + public static void main(String[] args) throws Exception + { + OneToOneSequencedPollerThroughputTest test = new OneToOneSequencedPollerThroughputTest(); + test.testImplementations(); + } +} diff -Nru disruptor-3.2.1/src/perftest/java/com/lmax/disruptor/sequenced/PingPongSequencedLatencyTest.java disruptor-3.3.0/src/perftest/java/com/lmax/disruptor/sequenced/PingPongSequencedLatencyTest.java --- disruptor-3.2.1/src/perftest/java/com/lmax/disruptor/sequenced/PingPongSequencedLatencyTest.java 2014-03-10 01:07:35.000000000 +0000 +++ disruptor-3.3.0/src/perftest/java/com/lmax/disruptor/sequenced/PingPongSequencedLatencyTest.java 2014-07-21 00:57:02.000000000 +0000 @@ -116,15 +116,15 @@ } } - private static void dumpHistogram(Histogram histogram, final PrintStream out) + private static void dumpHistogram(final Histogram histogram, final PrintStream out) { - histogram.getHistogramData().outputPercentileDistribution(out, 1, 1000.0); + histogram.outputPercentileDistribution(out, 1, 1000.0); } private void runDisruptorPass() throws InterruptedException, BrokenBarrierException { - CountDownLatch latch = new CountDownLatch(1); - CyclicBarrier barrier = new CyclicBarrier(3); + final CountDownLatch latch = new CountDownLatch(1); + final CyclicBarrier barrier = new CyclicBarrier(3); pinger.reset(barrier, latch, histogram); ponger.reset(barrier); @@ -138,9 +138,9 @@ pongProcessor.halt(); } - public static void main(String[] args) throws Exception + public static void main(final String[] args) throws Exception { - PingPongSequencedLatencyTest test = new PingPongSequencedLatencyTest(); + final PingPongSequencedLatencyTest test = new PingPongSequencedLatencyTest(); test.shouldCompareDisruptorVsQueues(); } @@ -156,7 +156,7 @@ private Histogram histogram; private long t0; - public Pinger(RingBuffer buffer, long maxEvents, long pauseTimeNs) + public Pinger(final RingBuffer buffer, final long maxEvents, final long pauseTimeNs) { this.buffer = buffer; this.maxEvents = maxEvents; @@ -164,11 +164,11 @@ } @Override - public void onEvent(ValueEvent event, long sequence, boolean endOfBatch) throws Exception + public void onEvent(final ValueEvent event, final long sequence, final boolean endOfBatch) throws Exception { - long t1 = System.nanoTime(); + final long t1 = System.nanoTime(); - histogram.recordValue(t1 - t0, pauseTimeNs); + histogram.recordValueWithExpectedInterval(t1 - t0, pauseTimeNs); if (event.getValue() < maxEvents) { @@ -188,7 +188,7 @@ private void send() { t0 = System.nanoTime(); - long next = buffer.next(); + final long next = buffer.next(); buffer.get(next).setValue(counter); buffer.publish(next); @@ -205,7 +205,7 @@ Thread.sleep(1000); send(); } - catch (Exception e) + catch (final Exception e) { throw new RuntimeException(e); } @@ -216,7 +216,7 @@ { } - public void reset(CyclicBarrier barrier, CountDownLatch latch, Histogram histogram) + public void reset(final CyclicBarrier barrier, final CountDownLatch latch, final Histogram histogram) { this.histogram = histogram; this.barrier = barrier; @@ -232,15 +232,15 @@ private CyclicBarrier barrier; - public Ponger(RingBuffer buffer) + public Ponger(final RingBuffer buffer) { this.buffer = buffer; } @Override - public void onEvent(ValueEvent event, long sequence, boolean endOfBatch) throws Exception + public void onEvent(final ValueEvent event, final long sequence, final boolean endOfBatch) throws Exception { - long next = buffer.next(); + final long next = buffer.next(); buffer.get(next).setValue(event.getValue()); buffer.publish(next); } @@ -252,7 +252,7 @@ { barrier.await(); } - catch (Exception e) + catch (final Exception e) { throw new RuntimeException(e); } @@ -263,7 +263,7 @@ { } - public void reset(CyclicBarrier barrier) + public void reset(final CyclicBarrier barrier) { this.barrier = barrier; } diff -Nru disruptor-3.2.1/src/perftest/java/com/lmax/disruptor/support/ValueAdditionEventHandler.java disruptor-3.3.0/src/perftest/java/com/lmax/disruptor/support/ValueAdditionEventHandler.java --- disruptor-3.2.1/src/perftest/java/com/lmax/disruptor/support/ValueAdditionEventHandler.java 2014-03-10 01:07:35.000000000 +0000 +++ disruptor-3.3.0/src/perftest/java/com/lmax/disruptor/support/ValueAdditionEventHandler.java 2014-07-21 00:57:02.000000000 +0000 @@ -15,17 +15,16 @@ */ package com.lmax.disruptor.support; +import java.util.concurrent.CountDownLatch; + import com.lmax.disruptor.EventHandler; import com.lmax.disruptor.util.PaddedLong; -import java.util.concurrent.CountDownLatch; - public final class ValueAdditionEventHandler implements EventHandler { private final PaddedLong value = new PaddedLong(); private long count; private CountDownLatch latch; - private long localSequence = -1; public long getValue() { @@ -44,15 +43,6 @@ { value.set(value.get() + event.getValue()); - if (localSequence + 1 == sequence) - { - localSequence = sequence; - } - else - { - System.err.println("Expected: " + (localSequence + 1) + "found: " + sequence); - } - if (count == sequence) { latch.countDown(); diff -Nru disruptor-3.2.1/src/test/java/com/lmax/disruptor/BatchingTest.java disruptor-3.3.0/src/test/java/com/lmax/disruptor/BatchingTest.java --- disruptor-3.2.1/src/test/java/com/lmax/disruptor/BatchingTest.java 1970-01-01 00:00:00.000000000 +0000 +++ disruptor-3.3.0/src/test/java/com/lmax/disruptor/BatchingTest.java 2014-07-21 00:57:02.000000000 +0000 @@ -0,0 +1,117 @@ +package com.lmax.disruptor; + +import java.util.Arrays; +import java.util.Collection; +import java.util.concurrent.Executors; +import java.util.concurrent.locks.LockSupport; + +import org.hamcrest.CoreMatchers; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +import com.lmax.disruptor.dsl.Disruptor; +import com.lmax.disruptor.dsl.ProducerType; +import com.lmax.disruptor.support.LongEvent; + +@RunWith(Parameterized.class) +public class BatchingTest +{ + private final ProducerType producerType; + + public BatchingTest(ProducerType producerType) + { + this.producerType = producerType; + } + + @Parameters + public static Collection generateData() + { + Object[][] producerTypes = { { ProducerType.MULTI }, { ProducerType.SINGLE } }; + return Arrays.asList(producerTypes); + } + + private static class ParallelEventHandler implements EventHandler + { + private final long mask; + private final long ordinal; + private final int batchSize = 10; + + private long eventCount; + private long batchCount; + private long publishedValue; + private long tempValue; + private volatile long processed; + + public ParallelEventHandler(long mask, long ordinal) + { + this.mask = mask; + this.ordinal = ordinal; + } + + @Override + public void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception + { + if ((sequence & mask) == ordinal) + { + eventCount++; + tempValue = event.get(); + } + + if (endOfBatch || ++batchCount >= batchSize) + { + publishedValue = tempValue; + batchCount = 0; + } + else + { + LockSupport.parkNanos(1); + } + + processed = sequence; + } + } + + @SuppressWarnings("unchecked") + @Test + public void shouldBatch() throws Exception + { + Disruptor d = new Disruptor(LongEvent.FACTORY, 2048, Executors.newCachedThreadPool(), + producerType, new SleepingWaitStrategy()); + + ParallelEventHandler handler1 = new ParallelEventHandler(1, 0); + ParallelEventHandler handler2 = new ParallelEventHandler(1, 1); + + d.handleEventsWith(handler1, handler2); + + RingBuffer buffer = d.start(); + + EventTranslator translator = new EventTranslator() + { + @Override + public void translateTo(LongEvent event, long sequence) + { + event.set(sequence); + } + }; + + int eventCount = 10000; + for (int i = 0; i < eventCount; i++) + { + buffer.publishEvent(translator); + } + + while (handler1.processed != eventCount - 1 || + handler2.processed != eventCount - 1) + { + Thread.sleep(1); + } + + Assert.assertThat(handler1.publishedValue, CoreMatchers.is((long) eventCount - 2)); + Assert.assertThat(handler1.eventCount, CoreMatchers.is((long) eventCount / 2)); + Assert.assertThat(handler2.publishedValue, CoreMatchers.is((long) eventCount - 1)); + Assert.assertThat(handler2.eventCount, CoreMatchers.is((long) eventCount / 2)); + } +} diff -Nru disruptor-3.2.1/src/test/java/com/lmax/disruptor/dsl/DisruptorTest.java disruptor-3.3.0/src/test/java/com/lmax/disruptor/dsl/DisruptorTest.java --- disruptor-3.2.1/src/test/java/com/lmax/disruptor/dsl/DisruptorTest.java 2014-03-10 01:07:35.000000000 +0000 +++ disruptor-3.3.0/src/test/java/com/lmax/disruptor/dsl/DisruptorTest.java 2014-07-21 00:57:02.000000000 +0000 @@ -15,19 +15,14 @@ */ package com.lmax.disruptor.dsl; -import java.util.ArrayList; -import java.util.Collection; -import java.util.concurrent.BrokenBarrierException; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executor; -import java.util.concurrent.atomic.AtomicReference; - import com.lmax.disruptor.BatchEventProcessor; import com.lmax.disruptor.BlockingWaitStrategy; import com.lmax.disruptor.EventHandler; +import com.lmax.disruptor.EventProcessor; import com.lmax.disruptor.EventTranslator; import com.lmax.disruptor.ExceptionHandler; import com.lmax.disruptor.RingBuffer; +import com.lmax.disruptor.Sequence; import com.lmax.disruptor.SequenceBarrier; import com.lmax.disruptor.TimeoutException; import com.lmax.disruptor.dsl.stubs.DelayedEventHandler; @@ -40,22 +35,27 @@ import com.lmax.disruptor.dsl.stubs.StubPublisher; import com.lmax.disruptor.dsl.stubs.TestWorkHandler; import com.lmax.disruptor.support.TestEvent; - import org.junit.After; import org.junit.Before; import org.junit.Test; +import java.util.ArrayList; +import java.util.Collection; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicReference; + +import static java.lang.Thread.yield; +import static java.util.concurrent.TimeUnit.SECONDS; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; - -import static java.lang.Thread.yield; -import static java.util.concurrent.TimeUnit.SECONDS; - @SuppressWarnings(value = {"unchecked"}) public class DisruptorTest { @@ -343,13 +343,6 @@ workHandler2.processEvent(); } - private TestWorkHandler createTestWorkHandler() - { - final TestWorkHandler testWorkHandler = new TestWorkHandler(); - testWorkHandlers.add(testWorkHandler); - return testWorkHandler; - } - @Test public void shouldSupportUsingWorkerPoolAsDependency() throws Exception { @@ -362,7 +355,7 @@ publishEvent(); assertThat(disruptor.getBarrierFor(delayedEventHandler).getCursor(), equalTo(-1L)); - + workHandler2.processEvent(); workHandler1.processEvent(); @@ -427,7 +420,7 @@ } @Test(expected = TimeoutException.class, timeout = 2000) - public void shouldThrowTimeoutExceptionIfShutdownDoesntCompleteNormally() throws Exception + public void shouldThrowTimeoutExceptionIfShutdownDoesNotCompleteNormally() throws Exception { //Given final DelayedEventHandler delayedEventHandler = createDelayedEventHandler(); @@ -468,6 +461,75 @@ assertThat(disruptor.getRingBuffer().remainingCapacity(), is(ringBuffer.getBufferSize() - 0L)); } + @Test + public void shouldAllowEventHandlerWithSuperType() throws Exception + { + final CountDownLatch latch = new CountDownLatch(2); + final EventHandler objectHandler = new EventHandlerStub(latch); + + disruptor.handleEventsWith(objectHandler); + + ensureTwoEventsProcessedAccordingToDependencies(latch); + } + + @Test + public void shouldAllowChainingEventHandlersWithSuperType() throws Exception + { + final CountDownLatch latch = new CountDownLatch(2); + final DelayedEventHandler delayedEventHandler = createDelayedEventHandler(); + final EventHandler objectHandler = new EventHandlerStub(latch); + + disruptor.handleEventsWith(delayedEventHandler).then(objectHandler); + + ensureTwoEventsProcessedAccordingToDependencies(latch, delayedEventHandler); + } + + @Test + public void shouldMakeEntriesAvailableToFirstCustomProcessorsImmediately() throws Exception + { + final CountDownLatch countDownLatch = new CountDownLatch(2); + final EventHandler eventHandler = new EventHandlerStub(countDownLatch); + + disruptor.handleEventsWith(new EventProcessorFactory() + { + @Override + public EventProcessor createEventProcessor(final RingBuffer ringBuffer, final Sequence[] barrierSequences) + { + assertEquals("Should not have had any barrier sequences", 0, barrierSequences.length); + return new BatchEventProcessor(disruptor.getRingBuffer(), ringBuffer.newBarrier(barrierSequences), eventHandler); + } + }); + + ensureTwoEventsProcessedAccordingToDependencies(countDownLatch); + } + + @Test + public void shouldHonourDependenciesForCustomProcessors() throws Exception + { + final CountDownLatch countDownLatch = new CountDownLatch(2); + final EventHandler eventHandler = new EventHandlerStub(countDownLatch); + final DelayedEventHandler delayedEventHandler = createDelayedEventHandler(); + + disruptor.handleEventsWith(delayedEventHandler).then(new EventProcessorFactory() + { + @Override + public EventProcessor createEventProcessor(final RingBuffer ringBuffer, final Sequence[] barrierSequences) + { + assertSame("Should have had a barrier sequence", 1, barrierSequences.length); + return new BatchEventProcessor(disruptor.getRingBuffer(), ringBuffer.newBarrier(barrierSequences), eventHandler); + } + }); + + ensureTwoEventsProcessedAccordingToDependencies(countDownLatch, delayedEventHandler); + } + + private TestWorkHandler createTestWorkHandler() + { + final TestWorkHandler testWorkHandler = new TestWorkHandler(); + testWorkHandlers.add(testWorkHandler); + return testWorkHandler; + } + private void ensureTwoEventsProcessedAccordingToDependencies(final CountDownLatch countDownLatch, final DelayedEventHandler... dependencies) throws InterruptedException, BrokenBarrierException diff -Nru disruptor-3.2.1/src/test/java/com/lmax/disruptor/dsl/stubs/EventHandlerStub.java disruptor-3.3.0/src/test/java/com/lmax/disruptor/dsl/stubs/EventHandlerStub.java --- disruptor-3.2.1/src/test/java/com/lmax/disruptor/dsl/stubs/EventHandlerStub.java 2014-03-10 01:07:35.000000000 +0000 +++ disruptor-3.3.0/src/test/java/com/lmax/disruptor/dsl/stubs/EventHandlerStub.java 2014-07-21 00:57:02.000000000 +0000 @@ -16,11 +16,10 @@ package com.lmax.disruptor.dsl.stubs; import com.lmax.disruptor.EventHandler; -import com.lmax.disruptor.support.TestEvent; import java.util.concurrent.CountDownLatch; -public class EventHandlerStub implements EventHandler +public class EventHandlerStub implements EventHandler { private final CountDownLatch countDownLatch; @@ -30,7 +29,7 @@ } @Override - public void onEvent(final TestEvent entry, final long sequence, final boolean endOfBatch) throws Exception + public void onEvent(final T entry, final long sequence, final boolean endOfBatch) throws Exception { countDownLatch.countDown(); } diff -Nru disruptor-3.2.1/src/test/java/com/lmax/disruptor/EventPollerTest.java disruptor-3.3.0/src/test/java/com/lmax/disruptor/EventPollerTest.java --- disruptor-3.2.1/src/test/java/com/lmax/disruptor/EventPollerTest.java 1970-01-01 00:00:00.000000000 +0000 +++ disruptor-3.3.0/src/test/java/com/lmax/disruptor/EventPollerTest.java 2014-07-21 00:57:02.000000000 +0000 @@ -0,0 +1,120 @@ +package com.lmax.disruptor; + +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertThat; + +import org.jmock.Expectations; +import org.jmock.Mockery; +import org.jmock.States; +import org.jmock.integration.junit4.JMock; +import org.junit.Test; +import org.junit.runner.RunWith; + +import com.lmax.disruptor.EventPoller.PollState; + +@RunWith(JMock.class) +public class EventPollerTest +{ + private final Mockery mockery = new Mockery(); + + @Test + @SuppressWarnings("unchecked") + public void shouldPollForEvents() throws Exception + { + final Sequence pollSequence = new Sequence(); + final Sequence bufferSequence = new Sequence(); + final Sequence gatingSequence = new Sequence(); + final Sequencer sequencer = mockery.mock(Sequencer.class); + final EventPoller.Handler handler = mockery.mock(EventPoller.Handler.class); + final DataProvider provider = mockery.mock(DataProvider.class); + final EventPoller poller = EventPoller.newInstance(provider, sequencer, pollSequence, bufferSequence, + gatingSequence); + final Object event = new Object(); + + final States states = mockery.states("polling"); + + mockery.checking(new Expectations() + { + { + allowing(sequencer).getCursor(); + will(returnValue(-1L)); + when(states.is("idle")); + + allowing(sequencer).getCursor(); + will(returnValue(0L)); + when(states.is("gating")); + + allowing(sequencer).getCursor(); + will(returnValue(0L)); + when(states.is("processing")); + + allowing(sequencer).getHighestPublishedSequence(0L, -1L); + will(returnValue(-1L)); + + allowing(sequencer).getHighestPublishedSequence(0L, 0L); + will(returnValue(0L)); + + allowing(provider).get(0); + will(returnValue(event)); + when(states.is("processing")); + + one(handler).onEvent(event, 0, true); + when(states.is("processing")); + } + }); + + // Initial State - nothing published. + states.become("idle"); + assertThat(poller.poll(handler), is(PollState.IDLE)); + + // Publish Event. + states.become("gating"); + bufferSequence.incrementAndGet(); + assertThat(poller.poll(handler), is(PollState.GATING)); + + states.become("processing"); + gatingSequence.incrementAndGet(); + assertThat(poller.poll(handler), is(PollState.PROCESSING)); + } + + @Test + public void shouldSuccessfullyPollWhenBufferIsFull() throws Exception + { + @SuppressWarnings("unchecked") + final EventPoller.Handler handler = mockery.mock(EventPoller.Handler.class); + + EventFactory factory = new EventFactory() + { + @Override + public byte[] newInstance() + { + return new byte[1]; + } + }; + + final RingBuffer ringBuffer = RingBuffer.createMultiProducer(factory, 0x4, new SleepingWaitStrategy()); + + final EventPoller poller = ringBuffer.newPoller(); + ringBuffer.addGatingSequences(poller.getSequence()); + + int count = 4; + + for (byte i = 1; i <= count; ++i) + { + long next = ringBuffer.next(); + ringBuffer.get(next)[0] = i; + ringBuffer.publish(next); + } + + mockery.checking(new Expectations() + { + { + exactly(4).of(handler).onEvent(with(any(byte[].class)), with(any(Long.TYPE)), with(any(Boolean.TYPE))); + will(returnValue(true)); + } + }); + + // think of another thread + poller.poll(handler); + } +} diff -Nru disruptor-3.2.1/src/test/java/com/lmax/disruptor/example/NamedEventHandler.java disruptor-3.3.0/src/test/java/com/lmax/disruptor/example/NamedEventHandler.java --- disruptor-3.2.1/src/test/java/com/lmax/disruptor/example/NamedEventHandler.java 1970-01-01 00:00:00.000000000 +0000 +++ disruptor-3.3.0/src/test/java/com/lmax/disruptor/example/NamedEventHandler.java 2014-07-21 00:57:02.000000000 +0000 @@ -0,0 +1,34 @@ +package com.lmax.disruptor.example; + +import com.lmax.disruptor.EventHandler; +import com.lmax.disruptor.LifecycleAware; + +public class NamedEventHandler implements EventHandler, LifecycleAware +{ + private String oldName; + private final String name; + + public NamedEventHandler(final String name) + { + this.name = name; + } + + @Override + public void onEvent(final T event, final long sequence, final boolean endOfBatch) throws Exception + { + } + + @Override + public void onStart() + { + final Thread currentThread = Thread.currentThread(); + oldName = currentThread.getName(); + currentThread.setName(name); + } + + @Override + public void onShutdown() + { + Thread.currentThread().setName(oldName); + } +} diff -Nru disruptor-3.2.1/src/test/java/com/lmax/disruptor/example/TwoDisruptors.java disruptor-3.3.0/src/test/java/com/lmax/disruptor/example/TwoDisruptors.java --- disruptor-3.2.1/src/test/java/com/lmax/disruptor/example/TwoDisruptors.java 1970-01-01 00:00:00.000000000 +0000 +++ disruptor-3.3.0/src/test/java/com/lmax/disruptor/example/TwoDisruptors.java 2014-07-21 00:57:02.000000000 +0000 @@ -0,0 +1,100 @@ +package com.lmax.disruptor.example; + +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; + +import com.lmax.disruptor.BlockingWaitStrategy; +import com.lmax.disruptor.EventFactory; +import com.lmax.disruptor.EventHandler; +import com.lmax.disruptor.EventTranslatorOneArg; +import com.lmax.disruptor.RingBuffer; +import com.lmax.disruptor.dsl.Disruptor; +import com.lmax.disruptor.dsl.ProducerType; + +public class TwoDisruptors +{ + private static class ValueEvent + { + private T t; + + public T get() + { + return t; + } + + public void set(final T t) + { + this.t = t; + } + } + + private static class Translator implements EventTranslatorOneArg, ValueEvent> + { + @Override + public void translateTo(final ValueEvent event, final long sequence, final ValueEvent arg0) + { + event.set(arg0.get()); + } + } + + private static class ValueEventHandler implements EventHandler> + { + private final RingBuffer> ringBuffer; + private final Translator translator = new Translator(); + + public ValueEventHandler(final RingBuffer> ringBuffer) + { + this.ringBuffer = ringBuffer; + } + + @Override + public void onEvent(final ValueEvent event, final long sequence, final boolean endOfBatch) throws Exception + { + ringBuffer.publishEvent(translator, event); + } + + public static EventFactory> factory() + { + return new EventFactory>() + { + @Override + public ValueEvent newInstance() + { + return new ValueEvent(); + } + }; + } + } + + @SuppressWarnings("unchecked") + public static void main(final String[] args) + { + final Executor executor = Executors.newFixedThreadPool(2); + final EventFactory> factory = ValueEventHandler.factory(); + + final Disruptor> disruptorA = + new Disruptor>( + factory, + 1024, + executor, + ProducerType.MULTI, + new BlockingWaitStrategy()); + + final Disruptor> disruptorB = + new Disruptor>( + factory, + 1024, + executor, + ProducerType.SINGLE, + new BlockingWaitStrategy()); + + final ValueEventHandler handlerA = new ValueEventHandler(disruptorB.getRingBuffer()); + disruptorA.handleEventsWith(handlerA); + + final ValueEventHandler handlerB = new ValueEventHandler(disruptorA.getRingBuffer()); + disruptorB.handleEventsWith(handlerB); + + disruptorA.start(); + disruptorB.start(); + } +} diff -Nru disruptor-3.2.1/src/test/java/com/lmax/disruptor/SequencerTest.java disruptor-3.3.0/src/test/java/com/lmax/disruptor/SequencerTest.java --- disruptor-3.2.1/src/test/java/com/lmax/disruptor/SequencerTest.java 2014-03-10 01:07:35.000000000 +0000 +++ disruptor-3.3.0/src/test/java/com/lmax/disruptor/SequencerTest.java 2014-07-21 00:57:02.000000000 +0000 @@ -169,7 +169,7 @@ public void shouldNotifyWaitStrategyOnPublish() throws Exception { final WaitStrategy waitStrategy = mockery.mock(WaitStrategy.class); - final Sequencer sequencer = newProducer(producerType, BUFFER_SIZE, waitStrategy); + final Sequenced sequencer = newProducer(producerType, BUFFER_SIZE, waitStrategy); mockery.checking(new Expectations() { @@ -187,7 +187,7 @@ public void shouldNotifyWaitStrategyOnPublishBatch() throws Exception { final WaitStrategy waitStrategy = mockery.mock(WaitStrategy.class); - final Sequencer sequencer = newProducer(producerType, BUFFER_SIZE, waitStrategy); + final Sequenced sequencer = newProducer(producerType, BUFFER_SIZE, waitStrategy); mockery.checking(new Expectations() {