Set up custom event processors to handle events from the ring buffer. The Disruptor will + * automatically start these processors when {@link #start()} is called.
+ * + *This method can be used as the start of a chain. For example if the handler A
must
+ * process events before handler B
:
dw.handleEventsWith(A).then(B);
+ *
+ * Since this is the start of the chain, the processor factories will always be passed an empty Sequence
+ * array, so the factory isn't necessary in this case. This method is provided for consistency with
+ * {@link EventHandlerGroup#handleEventsWith(EventProcessorFactory...)} and {@link EventHandlerGroup#then(EventProcessorFactory...)}
+ * which do have barrier sequences to provide.
Set up custom event processors to handle events from the ring buffer. The Disruptor will + * automatically start this processors when {@link #start()} is called.
+ * + *This method can be used as the start of a chain. For example if the processor A
must
+ * process events before handler B
:
dw.handleEventsWith(A).then(B);
*
* @param processors the event processors that will process events.
* @return a {@link EventHandlerGroup} that can be used to chain dependencies.
@@ -393,7 +419,7 @@
}
EventHandlerGroupSet up custom event processors to handle events from the ring buffer. The Disruptor will + * automatically start these processors when {@link Disruptor#start()} is called.
+ * + *This method is generally used as part of a chain. For example if the handler A
must
+ * process events before handler B
:
This method is generally used as part of a chain. For example if the handler A
must
- * process events before handler B
:
This method is generally used as part of a chain. For example if A
must
+ * process events before B
:
dw.after(A).handleEventsWith(B);
*
* @param handlers the batch handlers that will process events.
* @return a {@link EventHandlerGroup} that can be used to set up a event processor barrier over the created event processors.
*/
- public EventHandlerGroupSet up custom event processors to handle events from the ring buffer. The Disruptor will + * automatically start these processors when {@link Disruptor#start()} is called.
+ * + *This method is generally used as part of a chain. For example if A
must
+ * process events before B
:
dw.after(A).handleEventsWith(B);
+ *
+ * @param eventProcessorFactories the event processor factories to use to create the event processors that will process events.
+ * @return a {@link EventHandlerGroup} that can be used to chain dependencies.
+ */
+ public EventHandlerGroup
+ * disruptor.handleEventsWith(handler1).then((ringBuffer, barrierSequences) -> new CustomEventProcessor(ringBuffer, barrierSequences));
+ *
+ */
+public interface EventProcessorFactorybarrierSequences
.
+ *
+ * @param barrierSequences the sequences to gate on
+ * @return a new EventProcessor that gates on barrierSequences
before processing events
+ */
+ EventProcessor createEventProcessor(RingBuffer
+ * With this call the data that is to be inserted into the ring
+ * buffer will be a field (either explicitly or captured anonymously),
+ * therefore this call will require an instance of the translator
+ * for each value that is to be inserted into the ring buffer.
+ *
+ * @param translators The user specified translation for each event
+ */
+ void publishEvents(EventTranslator
+ * With this call the data that is to be inserted into the ring
+ * buffer will be a field (either explicitly or captured anonymously),
+ * therefore this call will require an instance of the translator
+ * for each value that is to be inserted into the ring buffer.
+ *
+ * @param translators The user specified translation for each event
+ * @param batchStartsAt The first element of the array which is within the batch.
+ * @param batchSize The actual size of the batch
+ */
+ void publishEvents(EventTranslator Coordinator for claiming sequences for access to a data structure while tracking dependent {@link Sequence}s.
+ * Suitable for use for sequencing across multiple publisher threads. * Note on {@link Sequencer#getCursor()}: With this sequencer the cursor value is updated after the call
+ * to {@link Sequencer#next()}, to determine the highest available sequence that can be read, then
+ * {@link Sequencer#getHighestPublishedSequence(long, long)} should be used.
*/
public final class MultiProducerSequencer extends AbstractSequencer
{
diff -Nru disruptor-3.2.1/src/main/java/com/lmax/disruptor/PhasedBackoffWaitStrategy.java disruptor-3.3.0/src/main/java/com/lmax/disruptor/PhasedBackoffWaitStrategy.java
--- disruptor-3.2.1/src/main/java/com/lmax/disruptor/PhasedBackoffWaitStrategy.java 2014-03-10 01:07:35.000000000 +0000
+++ disruptor-3.3.0/src/main/java/com/lmax/disruptor/PhasedBackoffWaitStrategy.java 2014-07-21 00:57:02.000000000 +0000
@@ -52,6 +52,17 @@
}
/**
+ * Block with wait/notifyAll semantics
+ */
+ public static PhasedBackoffWaitStrategy withLiteLock(long spinTimeout,
+ long yieldTimeout,
+ TimeUnit units)
+ {
+ return new PhasedBackoffWaitStrategy(spinTimeout, yieldTimeout,
+ units, new LiteBlockingWaitStrategy());
+ }
+
+ /**
* Block by sleeping in a loop
*/
public static PhasedBackoffWaitStrategy withSleep(long spinTimeout,
diff -Nru disruptor-3.2.1/src/main/java/com/lmax/disruptor/RingBuffer.java disruptor-3.3.0/src/main/java/com/lmax/disruptor/RingBuffer.java
--- disruptor-3.2.1/src/main/java/com/lmax/disruptor/RingBuffer.java 2014-03-10 01:07:35.000000000 +0000
+++ disruptor-3.3.0/src/main/java/com/lmax/disruptor/RingBuffer.java 2014-07-21 00:57:02.000000000 +0000
@@ -16,7 +16,81 @@
package com.lmax.disruptor;
+import sun.misc.Unsafe;
+
import com.lmax.disruptor.dsl.ProducerType;
+import com.lmax.disruptor.util.Util;
+
+abstract class RingBufferPad
+{
+ protected long p1, p2, p3, p4, p5, p6, p7;
+}
+
+abstract class RingBufferFields Concurrent sequence class used for tracking the progress of
@@ -27,7 +43,7 @@
* Also attempts to be more efficient with regards to false
* sharing by adding padding around the volatile field.
*/
-public class Sequence
+public class Sequence extends RhsPadding
{
static final long INITIAL_VALUE = -1L;
private static final Unsafe UNSAFE;
@@ -36,13 +52,16 @@
static
{
UNSAFE = Util.getUnsafe();
- final int base = UNSAFE.arrayBaseOffset(long[].class);
- final int scale = UNSAFE.arrayIndexScale(long[].class);
- VALUE_OFFSET = base + (scale * 7);
+ try
+ {
+ VALUE_OFFSET = UNSAFE.objectFieldOffset(Value.class.getDeclaredField("value"));
+ }
+ catch (final Exception e)
+ {
+ throw new RuntimeException(e);
+ }
}
- private final long[] paddedValue = new long[15];
-
/**
* Create a sequence initialised to -1.
*/
@@ -58,7 +77,7 @@
*/
public Sequence(final long initialValue)
{
- UNSAFE.putOrderedLong(paddedValue, VALUE_OFFSET, initialValue);
+ UNSAFE.putOrderedLong(this, VALUE_OFFSET, initialValue);
}
/**
@@ -68,7 +87,7 @@
*/
public long get()
{
- return UNSAFE.getLongVolatile(paddedValue, VALUE_OFFSET);
+ return value;
}
/**
@@ -80,7 +99,7 @@
*/
public void set(final long value)
{
- UNSAFE.putOrderedLong(paddedValue, VALUE_OFFSET, value);
+ UNSAFE.putOrderedLong(this, VALUE_OFFSET, value);
}
/**
@@ -93,7 +112,7 @@
*/
public void setVolatile(final long value)
{
- UNSAFE.putLongVolatile(paddedValue, VALUE_OFFSET, value);
+ UNSAFE.putLongVolatile(this, VALUE_OFFSET, value);
}
/**
@@ -105,7 +124,7 @@
*/
public boolean compareAndSet(final long expectedValue, final long newValue)
{
- return UNSAFE.compareAndSwapLong(paddedValue, VALUE_OFFSET, expectedValue, newValue);
+ return UNSAFE.compareAndSwapLong(this, VALUE_OFFSET, expectedValue, newValue);
}
/**
@@ -139,6 +158,7 @@
return newValue;
}
+ @Override
public String toString()
{
return Long.toString(get());
diff -Nru disruptor-3.2.1/src/main/java/com/lmax/disruptor/Sequencer.java disruptor-3.3.0/src/main/java/com/lmax/disruptor/Sequencer.java
--- disruptor-3.2.1/src/main/java/com/lmax/disruptor/Sequencer.java 2014-03-10 01:07:35.000000000 +0000
+++ disruptor-3.3.0/src/main/java/com/lmax/disruptor/Sequencer.java 2014-07-21 00:57:02.000000000 +0000
@@ -18,78 +18,12 @@
/**
* Coordinates claiming sequences for access to a data structure while tracking dependent {@link Sequence}s
*/
-public interface Sequencer extends Cursored
+public interface Sequencer extends Cursored, Sequenced
{
/** Set to -1 as sequence starting point */
long INITIAL_CURSOR_VALUE = -1L;
/**
- * The capacity of the data structure to hold entries.
- *
- * @return the size of the RingBuffer.
- */
- int getBufferSize();
-
- /**
- * Has the buffer got capacity to allocate another sequence. This is a concurrent
- * method so the response should only be taken as an indication of available capacity.
- * @param requiredCapacity in the buffer
- * @return true if the buffer has the capacity to allocate the next sequence otherwise false.
- */
- boolean hasAvailableCapacity(final int requiredCapacity);
-
- /**
- * Claim the next event in sequence for publishing.
- * @return the claimed sequence value
- */
- long next();
-
- /**
- * Claim the next n events in sequence for publishing. This is for batch event producing. Using batch producing
- * requires a little care and some math.
- * Coordinator for claiming sequences for access to a data structure while tracking dependent {@link Sequence}s.
- *
- * Generally not safe for use from multiple threads as it does not implement any barriers. Coordinator for claiming sequences for access to a data structure while tracking dependent {@link Sequence}s.
+ * Not safe for use from multiple threads as it does not implement any barriers. Note on {@link Sequencer#getCursor()}: With this sequencer the cursor value is updated after the call
+ * to {@link Sequencer#publish(long)} is made.
+ */
+
+public final class SingleProducerSequencer extends SingleProducerSequencerFields
+{
+ protected long p1, p2, p3, p4, p5, p6, p7;
/**
* Construct a Sequencer with the selected wait strategy and buffer size.
@@ -53,15 +69,15 @@
@Override
public boolean hasAvailableCapacity(final int requiredCapacity)
{
- long nextValue = pad.nextValue;
+ long nextValue = this.nextValue;
long wrapPoint = (nextValue + requiredCapacity) - bufferSize;
- long cachedGatingSequence = pad.cachedValue;
+ long cachedGatingSequence = this.cachedValue;
if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue)
{
long minSequence = Util.getMinimumSequence(gatingSequences, nextValue);
- pad.cachedValue = minSequence;
+ this.cachedValue = minSequence;
if (wrapPoint > minSequence)
{
@@ -92,11 +108,11 @@
throw new IllegalArgumentException("n must be > 0");
}
- long nextValue = pad.nextValue;
+ long nextValue = this.nextValue;
long nextSequence = nextValue + n;
long wrapPoint = nextSequence - bufferSize;
- long cachedGatingSequence = pad.cachedValue;
+ long cachedGatingSequence = this.cachedValue;
if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue)
{
@@ -106,10 +122,10 @@
LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin?
}
- pad.cachedValue = minSequence;
+ this.cachedValue = minSequence;
}
- pad.nextValue = nextSequence;
+ this.nextValue = nextSequence;
return nextSequence;
}
@@ -139,7 +155,7 @@
throw InsufficientCapacityException.INSTANCE;
}
- long nextSequence = pad.nextValue += n;
+ long nextSequence = this.nextValue += n;
return nextSequence;
}
@@ -150,7 +166,7 @@
@Override
public long remainingCapacity()
{
- long nextValue = pad.nextValue;
+ long nextValue = this.nextValue;
long consumed = Util.getMinimumSequence(gatingSequences, nextValue);
long produced = nextValue;
@@ -163,7 +179,7 @@
@Override
public void claim(long sequence)
{
- pad.nextValue = sequence;
+ this.nextValue = sequence;
}
/**
diff -Nru disruptor-3.2.1/src/main/java/com/lmax/disruptor/WorkerPool.java disruptor-3.3.0/src/main/java/com/lmax/disruptor/WorkerPool.java
--- disruptor-3.2.1/src/main/java/com/lmax/disruptor/WorkerPool.java 2014-03-10 01:07:35.000000000 +0000
+++ disruptor-3.3.0/src/main/java/com/lmax/disruptor/WorkerPool.java 2014-07-21 00:57:02.000000000 +0000
@@ -48,7 +48,7 @@
public WorkerPool(final RingBuffer
+ * int n = 10;
+ * long hi = sequencer.next(n);
+ * long lo = hi - (n - 1);
+ * for (long sequence = lo; sequence <= hi; sequence++) {
+ * // Do work.
+ * }
+ * sequencer.publish(lo, hi);
+ *
+ *
+ * @param n the number of sequences to claim
+ * @return the highest claimed sequence value
+ */
+ long next(int n);
+
+ /**
+ * Attempt to claim the next event in sequence for publishing. Will return the
+ * number of the slot if there is at least requiredCapacity
slots
+ * available.
+ * @return the claimed sequence value
+ * @throws InsufficientCapacityException
+ */
+ long tryNext() throws InsufficientCapacityException;
+
+ /**
+ * Attempt to claim the next n events in sequence for publishing. Will return the
+ * highest numbered slot if there is at least requiredCapacity
slots
+ * available. Have a look at {@link Sequencer#next()} for a description on how to
+ * use this method.
+ *
+ * @param n the number of sequences to claim
+ * @return the claimed sequence value
+ * @throws InsufficientCapacityException
+ */
+ long tryNext(int n) throws InsufficientCapacityException;
+
+ /**
+ * Publishes a sequence. Call when the event has been filled.
+ *
+ * @param sequence
+ */
+ void publish(long sequence);
+
+ /**
+ * Batch publish sequences. Called when all of the events have been filled.
+ *
+ * @param lo first sequence number to publish
+ * @param hi last sequence number to publish
+ */
+ void publish(long lo, long hi);
+}
\ No newline at end of file
diff -Nru disruptor-3.2.1/src/main/java/com/lmax/disruptor/Sequence.java disruptor-3.3.0/src/main/java/com/lmax/disruptor/Sequence.java
--- disruptor-3.2.1/src/main/java/com/lmax/disruptor/Sequence.java 2014-03-10 01:07:35.000000000 +0000
+++ disruptor-3.3.0/src/main/java/com/lmax/disruptor/Sequence.java 2014-07-21 00:57:02.000000000 +0000
@@ -15,9 +15,25 @@
*/
package com.lmax.disruptor;
+import sun.misc.Unsafe;
+
import com.lmax.disruptor.util.Util;
-import sun.misc.Unsafe;
+
+class LhsPadding
+{
+ protected long p1, p2, p3, p4, p5, p6, p7;
+}
+
+class Value extends LhsPadding
+{
+ protected volatile long value;
+}
+
+class RhsPadding extends Value
+{
+ protected long p9, p10, p11, p12, p13, p14, p15;
+}
/**
*
- * int n = 10;
- * long hi = sequencer.next(n);
- * long lo = hi - (n - 1);
- * for (long sequence = lo; sequence <= hi; sequence++) {
- * // Do work.
- * }
- * sequencer.publish(lo, hi);
- *
- *
- * @param n the number of sequences to claim
- * @return the highest claimed sequence value
- */
- long next(int n);
-
- /**
- * Attempt to claim the next event in sequence for publishing. Will return the
- * number of the slot if there is at least requiredCapacity
slots
- * available.
- * @return the claimed sequence value
- * @throws InsufficientCapacityException
- */
- long tryNext() throws InsufficientCapacityException;
-
- /**
- * Attempt to claim the next n events in sequence for publishing. Will return the
- * highest numbered slot if there is at least requiredCapacity
slots
- * available. Have a look at {@link Sequencer#next()} for a description on how to
- * use this method.
- *
- * @param n the number of sequences to claim
- * @return the claimed sequence value
- * @throws InsufficientCapacityException
- */
- long tryNext(int n) throws InsufficientCapacityException;
-
- /**
- * Get the remaining capacity for this sequencer.
- * @return The number of slots remaining.
- */
- long remainingCapacity();
-
- /**
* Claim a specific sequence. Only used if initialising the ring buffer to
* a specific value.
*
@@ -98,21 +32,6 @@
void claim(long sequence);
/**
- * Publishes a sequence. Call when the event has been filled.
- *
- * @param sequence
- */
- void publish(long sequence);
-
- /**
- * Batch publish sequences. Called when all of the events have been filled.
- *
- * @param lo first sequence number to publish
- * @param hi last sequence number to publish
- */
- void publish(long lo, long hi);
-
- /**
* Confirms if a sequence is published and the event is available for use; non-blocking.
*
* @param sequence of the buffer to check
@@ -155,5 +74,19 @@
*/
long getMinimumSequence();
- long getHighestPublishedSequence(long sequence, long availableSequence);
+ /**
+ * Get the highest sequence number that can be safely read from the ring buffer. Depending
+ * on the implementation of the Sequencer this call may need to scan a number of values
+ * in the Sequencer. The scan will range from nextSequence to availableSequence. If
+ * there are no available values >= nextSequence
the return value will be
+ * nextSequence - 1
. To work correctly a consumer should pass a value that
+ * it 1 higher than the last sequence that was successfully processed.
+ *
+ * @param nextSequence The sequence to start scanning from.
+ * @param availableSequence The sequence to scan to.
+ * @return The highest value that can be safely read, will be at least nextSequence - 1
.
+ */
+ long getHighestPublishedSequence(long nextSequence, long availableSequence);
+
+
+ * UniCast a series of items between 1 publisher and 1 event processor.
+ *
+ * +----+ +-----+
+ * | P1 |--->| EP1 |
+ * +----+ +-----+
+ *
+ * Disruptor:
+ * ==========
+ * track to prevent wrap
+ * +------------------+
+ * | |
+ * | v
+ * +----+ +====+ +====+ +-----+
+ * | P1 |--->| RB |<---| SB | | EP1 |
+ * +----+ +====+ +====+ +-----+
+ * claim get ^ |
+ * | |
+ * +--------+
+ * waitFor
+ *
+ * P1 - Publisher 1
+ * RB - RingBuffer
+ * SB - SequenceBarrier
+ * EP1 - EventProcessor 1
+ *
+ *
+ */
+public final class OneToOneSequencedPollerThroughputTest extends AbstractPerfTestDisruptor
+{
+ private static final int BUFFER_SIZE = 1024 * 64;
+ private static final long ITERATIONS = 1000L * 1000L * 100L;
+ private final ExecutorService executor = Executors.newSingleThreadExecutor(DaemonThreadFactory.INSTANCE);
+ private final long expectedResult = PerfTestUtil.accumulatedAddition(ITERATIONS);
+
+ ///////////////////////////////////////////////////////////////////////////////////////////////
+
+ private final RingBuffer