This is an automated email from the git hooks/post-receive script. tmancill pushed a commit to branch master in repository disruptor.
commit d3e725b50ab91f48dbe8cc88b03fd551e9e1ffb0 Author: tony mancill <[email protected]> Date: Sun Oct 9 19:44:01 2016 -0700 New upstream version 3.3.6 --- README.md | 12 ++ build.gradle | 2 +- .../lmax/disruptor/SingleProducerSequencer.java | 4 + .../java/com/lmax/disruptor/dsl/Disruptor.java | 53 +++++- .../com/lmax/disruptor/dsl/EventHandlerGroup.java | 5 +- .../OneToOneTranslatorThroughputTest.java | 13 +- .../com/lmax/disruptor/DisruptorStressTest.java | 11 +- .../disruptor/ShutdownOnFatalExceptionTest.java | 4 +- ...ruptorStressTest.java => WorkerStressTest.java} | 46 ++--- .../java/com/lmax/disruptor/dsl/DisruptorTest.java | 211 ++++++++++++++++++--- .../{StubExecutor.java => StubThreadFactory.java} | 27 ++- .../example/MultiProducerWithTranslator.java | 11 +- .../lmax/disruptor/example/WaitForShutdown.java | 65 +++++++ 13 files changed, 372 insertions(+), 92 deletions(-) diff --git a/README.md b/README.md index 330435b..ce0118e 100644 --- a/README.md +++ b/README.md @@ -13,6 +13,18 @@ A High Performance Inter-Thread Messaging Library ## Changelog +### 3.3.6 + +- Support adding gating sequences before calling Disruptor.start() +- Fix minor concurrency race when dynamically adding sequences +- Fix wrapping problem when adding work handlers to the Disruptor + +### 3.3.5 + +- Fix NPE in TimeoutBlockingWaitStrategy when used with WorkProcessor +- Add LiteTimeoutBlockingWaitStrategy +- Resignal any waiting threads when trying to publish to a full ring buffer + ### 3.3.4 - Small build fixes and refactorings diff --git a/build.gradle b/build.gradle index 0a12475..840b791 100644 --- a/build.gradle +++ b/build.gradle @@ -24,7 +24,7 @@ apply plugin: 'idea' defaultTasks 'build' group = 'com.lmax' -version = new Version(major: 3, minor: 3, revision: 5) +version = new Version(major: 3, minor: 3, revision: 6) ext { fullName = 'Disruptor Framework' diff --git a/src/main/java/com/lmax/disruptor/SingleProducerSequencer.java b/src/main/java/com/lmax/disruptor/SingleProducerSequencer.java index 50d2bea..35b1b15 100644 --- a/src/main/java/com/lmax/disruptor/SingleProducerSequencer.java +++ b/src/main/java/com/lmax/disruptor/SingleProducerSequencer.java @@ -79,6 +79,8 @@ public final class SingleProducerSequencer extends SingleProducerSequencerFields if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue) { + cursor.setVolatile(nextValue); // StoreLoad fence + long minSequence = Util.getMinimumSequence(gatingSequences, nextValue); this.cachedValue = minSequence; @@ -119,6 +121,8 @@ public final class SingleProducerSequencer extends SingleProducerSequencerFields if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue) { + cursor.setVolatile(nextValue); // StoreLoad fence + long minSequence; while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue))) { diff --git a/src/main/java/com/lmax/disruptor/dsl/Disruptor.java b/src/main/java/com/lmax/disruptor/dsl/Disruptor.java index eaaca72..c9d8f8f 100644 --- a/src/main/java/com/lmax/disruptor/dsl/Disruptor.java +++ b/src/main/java/com/lmax/disruptor/dsl/Disruptor.java @@ -134,9 +134,9 @@ public class Disruptor<T> final ProducerType producerType, final WaitStrategy waitStrategy) { - this(RingBuffer.create( - producerType, eventFactory, ringBufferSize, waitStrategy), - new BasicExecutor(threadFactory)); + this( + RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy), + new BasicExecutor(threadFactory)); } /** @@ -204,6 +204,15 @@ public class Disruptor<T> { consumerRepository.add(processor); } + + Sequence[] sequences = new Sequence[processors.length]; + for (int i = 0; i < processors.length; i++) + { + sequences[i] = processors[i].getSequence(); + } + + ringBuffer.addGatingSequences(sequences); + return new EventHandlerGroup<T>(this, consumerRepository, Util.getSequencesFor(processors)); } @@ -349,9 +358,6 @@ public class Disruptor<T> */ public RingBuffer<T> start() { - final Sequence[] gatingSequences = consumerRepository.getLastSequenceInChain(true); - ringBuffer.addGatingSequences(gatingSequences); - checkOnlyStartedOnce(); for (final ConsumerInfo consumerInfo : consumerRepository) { @@ -473,6 +479,17 @@ public class Disruptor<T> } /** + * Gets the sequence value for the specified event handlers. + * + * @param b1 + * @return + */ + public long getSequenceValueFor(EventHandler<T> b1) + { + return consumerRepository.getSequenceFor(b1).get(); + } + + /** * Confirms if all messages have been consumed by all event processors */ private boolean hasBacklog() @@ -513,12 +530,22 @@ public class Disruptor<T> processorSequences[i] = batchEventProcessor.getSequence(); } + updateGatingSequencesForNextInChain(barrierSequences, processorSequences); + + return new EventHandlerGroup<T>(this, consumerRepository, processorSequences); + } + + private void updateGatingSequencesForNextInChain(Sequence[] barrierSequences, Sequence[] processorSequences) + { if (processorSequences.length > 0) { + ringBuffer.addGatingSequences(processorSequences); + for (final Sequence barrierSequence : barrierSequences) + { + ringBuffer.removeGatingSequence(barrierSequence); + } consumerRepository.unMarkEventProcessorsAsEndOfChain(barrierSequences); } - - return new EventHandlerGroup<T>(this, consumerRepository, processorSequences); } EventHandlerGroup<T> createEventProcessors( @@ -529,6 +556,7 @@ public class Disruptor<T> { eventProcessors[i] = processorFactories[i].createEventProcessor(ringBuffer, barrierSequences); } + return handleEventsWith(eventProcessors); } @@ -537,8 +565,15 @@ public class Disruptor<T> { final SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(barrierSequences); final WorkerPool<T> workerPool = new WorkerPool<T>(ringBuffer, sequenceBarrier, exceptionHandler, workHandlers); + + consumerRepository.add(workerPool, sequenceBarrier); - return new EventHandlerGroup<T>(this, consumerRepository, workerPool.getWorkerSequences()); + + Sequence[] workerSequences = workerPool.getWorkerSequences(); + + updateGatingSequencesForNextInChain(barrierSequences, workerSequences); + + return new EventHandlerGroup<T>(this, consumerRepository, workerSequences); } private void checkNotStarted() diff --git a/src/main/java/com/lmax/disruptor/dsl/EventHandlerGroup.java b/src/main/java/com/lmax/disruptor/dsl/EventHandlerGroup.java index 0acfc92..c8fe91d 100644 --- a/src/main/java/com/lmax/disruptor/dsl/EventHandlerGroup.java +++ b/src/main/java/com/lmax/disruptor/dsl/EventHandlerGroup.java @@ -54,8 +54,9 @@ public class EventHandlerGroup<T> { final Sequence[] combinedSequences = new Sequence[this.sequences.length + otherHandlerGroup.sequences.length]; System.arraycopy(this.sequences, 0, combinedSequences, 0, this.sequences.length); - System - .arraycopy(otherHandlerGroup.sequences, 0, combinedSequences, this.sequences.length, otherHandlerGroup.sequences.length); + System.arraycopy( + otherHandlerGroup.sequences, 0, + combinedSequences, this.sequences.length, otherHandlerGroup.sequences.length); return new EventHandlerGroup<T>(disruptor, consumerRepository, combinedSequences); } diff --git a/src/perftest/java/com/lmax/disruptor/translator/OneToOneTranslatorThroughputTest.java b/src/perftest/java/com/lmax/disruptor/translator/OneToOneTranslatorThroughputTest.java index 94863c8..3c483a8 100644 --- a/src/perftest/java/com/lmax/disruptor/translator/OneToOneTranslatorThroughputTest.java +++ b/src/perftest/java/com/lmax/disruptor/translator/OneToOneTranslatorThroughputTest.java @@ -15,12 +15,6 @@ */ package com.lmax.disruptor.translator; -import static com.lmax.disruptor.support.PerfTestUtil.failIfNot; - -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - import com.lmax.disruptor.AbstractPerfTestDisruptor; import com.lmax.disruptor.EventTranslatorOneArg; import com.lmax.disruptor.RingBuffer; @@ -33,6 +27,10 @@ import com.lmax.disruptor.support.ValueEvent; import com.lmax.disruptor.util.DaemonThreadFactory; import com.lmax.disruptor.util.MutableLong; +import java.util.concurrent.CountDownLatch; + +import static com.lmax.disruptor.support.PerfTestUtil.failIfNot; + /** * <pre> * UniCast a series of items between 1 publisher and 1 event processor using the EventTranslator API @@ -66,7 +64,6 @@ public final class OneToOneTranslatorThroughputTest extends AbstractPerfTestDisr { private static final int BUFFER_SIZE = 1024 * 64; private static final long ITERATIONS = 1000L * 1000L * 100L; - private final ExecutorService executor = Executors.newSingleThreadExecutor(DaemonThreadFactory.INSTANCE); private final long expectedResult = PerfTestUtil.accumulatedAddition(ITERATIONS); private final ValueAdditionEventHandler handler = new ValueAdditionEventHandler(); private final RingBuffer<ValueEvent> ringBuffer; @@ -80,7 +77,7 @@ public final class OneToOneTranslatorThroughputTest extends AbstractPerfTestDisr Disruptor<ValueEvent> disruptor = new Disruptor<ValueEvent>( ValueEvent.EVENT_FACTORY, - BUFFER_SIZE, executor, + BUFFER_SIZE, DaemonThreadFactory.INSTANCE, ProducerType.SINGLE, new YieldingWaitStrategy()); disruptor.handleEventsWith(handler); diff --git a/src/test/java/com/lmax/disruptor/DisruptorStressTest.java b/src/test/java/com/lmax/disruptor/DisruptorStressTest.java index 0d885fb..a066b3c 100644 --- a/src/test/java/com/lmax/disruptor/DisruptorStressTest.java +++ b/src/test/java/com/lmax/disruptor/DisruptorStressTest.java @@ -2,6 +2,7 @@ package com.lmax.disruptor; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType; +import com.lmax.disruptor.util.DaemonThreadFactory; import org.junit.Test; import java.util.concurrent.CountDownLatch; @@ -23,14 +24,14 @@ public class DisruptorStressTest public void shouldHandleLotsOfThreads() throws Exception { Disruptor<TestEvent> disruptor = new Disruptor<TestEvent>( - TestEvent.FACTORY, 1 << 16, executor, - ProducerType.MULTI, new BusySpinWaitStrategy()); + TestEvent.FACTORY, 1 << 16, DaemonThreadFactory.INSTANCE, + ProducerType.MULTI, new BusySpinWaitStrategy()); RingBuffer<TestEvent> ringBuffer = disruptor.getRingBuffer(); disruptor.setDefaultExceptionHandler(new FatalExceptionHandler()); int threads = max(1, Runtime.getRuntime().availableProcessors() / 2); - int iterations = 20000000; + int iterations = 200000; int publisherCount = threads; int handlerCount = threads; @@ -177,9 +178,9 @@ public class DisruptorStressTest public static final EventFactory<TestEvent> FACTORY = new EventFactory<DisruptorStressTest.TestEvent>() { @Override - public TestEvent newInstance() + public DisruptorStressTest.TestEvent newInstance() { - return new TestEvent(); + return new DisruptorStressTest.TestEvent(); } }; } diff --git a/src/test/java/com/lmax/disruptor/ShutdownOnFatalExceptionTest.java b/src/test/java/com/lmax/disruptor/ShutdownOnFatalExceptionTest.java index 327c9e2..7a26e6e 100644 --- a/src/test/java/com/lmax/disruptor/ShutdownOnFatalExceptionTest.java +++ b/src/test/java/com/lmax/disruptor/ShutdownOnFatalExceptionTest.java @@ -2,12 +2,12 @@ package com.lmax.disruptor; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType; +import com.lmax.disruptor.util.DaemonThreadFactory; import org.junit.After; import org.junit.Before; import org.junit.Test; import java.util.Random; -import java.util.concurrent.Executors; public class ShutdownOnFatalExceptionTest { @@ -23,7 +23,7 @@ public class ShutdownOnFatalExceptionTest public void setUp() { disruptor = new Disruptor<byte[]>( - new ByteArrayFactory(256), 1024, Executors.newCachedThreadPool(), ProducerType.SINGLE, + new ByteArrayFactory(256), 1024, DaemonThreadFactory.INSTANCE, ProducerType.SINGLE, new BlockingWaitStrategy()); disruptor.handleEventsWith(eventHandler); disruptor.setDefaultExceptionHandler(new FatalExceptionHandler()); diff --git a/src/test/java/com/lmax/disruptor/DisruptorStressTest.java b/src/test/java/com/lmax/disruptor/WorkerStressTest.java similarity index 76% copy from src/test/java/com/lmax/disruptor/DisruptorStressTest.java copy to src/test/java/com/lmax/disruptor/WorkerStressTest.java index 0d885fb..ca5e4c3 100644 --- a/src/test/java/com/lmax/disruptor/DisruptorStressTest.java +++ b/src/test/java/com/lmax/disruptor/WorkerStressTest.java @@ -2,6 +2,7 @@ package com.lmax.disruptor; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType; +import com.lmax.disruptor.util.DaemonThreadFactory; import org.junit.Test; import java.util.concurrent.CountDownLatch; @@ -15,7 +16,7 @@ import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.not; import static org.junit.Assert.assertThat; -public class DisruptorStressTest +public class WorkerStressTest { private final ExecutorService executor = Executors.newCachedThreadPool(); @@ -23,23 +24,25 @@ public class DisruptorStressTest public void shouldHandleLotsOfThreads() throws Exception { Disruptor<TestEvent> disruptor = new Disruptor<TestEvent>( - TestEvent.FACTORY, 1 << 16, executor, - ProducerType.MULTI, new BusySpinWaitStrategy()); + TestEvent.FACTORY, 1 << 16, DaemonThreadFactory.INSTANCE, + ProducerType.MULTI, new SleepingWaitStrategy()); RingBuffer<TestEvent> ringBuffer = disruptor.getRingBuffer(); disruptor.setDefaultExceptionHandler(new FatalExceptionHandler()); int threads = max(1, Runtime.getRuntime().availableProcessors() / 2); - int iterations = 20000000; + int iterations = 200000; int publisherCount = threads; int handlerCount = threads; CyclicBarrier barrier = new CyclicBarrier(publisherCount); CountDownLatch latch = new CountDownLatch(publisherCount); - TestEventHandler[] handlers = initialise(disruptor, new TestEventHandler[handlerCount]); + TestWorkHandler[] handlers = initialise(new TestWorkHandler[handlerCount]); Publisher[] publishers = initialise(new Publisher[publisherCount], ringBuffer, iterations, barrier, latch); + disruptor.handleEventsWithWorkerPool(handlers); + disruptor.start(); for (Publisher publisher : publishers) @@ -60,10 +63,9 @@ public class DisruptorStressTest assertThat(publisher.failed, is(false)); } - for (TestEventHandler handler : handlers) + for (TestWorkHandler handler : handlers) { - assertThat(handler.messagesSeen, is(not(0))); - assertThat(handler.failureCount, is(0)); + assertThat(handler.seen, is(not(0))); } } @@ -80,39 +82,25 @@ public class DisruptorStressTest } @SuppressWarnings("unchecked") - private TestEventHandler[] initialise(Disruptor<TestEvent> disruptor, TestEventHandler[] testEventHandlers) + private TestWorkHandler[] initialise(TestWorkHandler[] testEventHandlers) { for (int i = 0; i < testEventHandlers.length; i++) { - TestEventHandler handler = new TestEventHandler(); - disruptor.handleEventsWith(handler); + TestWorkHandler handler = new TestWorkHandler(); testEventHandlers[i] = handler; } return testEventHandlers; } - private static class TestEventHandler implements EventHandler<TestEvent> + private static class TestWorkHandler implements WorkHandler<TestEvent> { - public int failureCount = 0; - public int messagesSeen = 0; - - public TestEventHandler() - { - } + private int seen; @Override - public void onEvent(TestEvent event, long sequence, boolean endOfBatch) throws Exception + public void onEvent(TestEvent event) throws Exception { - if (event.sequence != sequence || - event.a != sequence + 13 || - event.b != sequence - 7 || - !("wibble-" + sequence).equals(event.s)) - { - failureCount++; - } - - messagesSeen++; + seen++; } } @@ -174,7 +162,7 @@ public class DisruptorStressTest public long b; public String s; - public static final EventFactory<TestEvent> FACTORY = new EventFactory<DisruptorStressTest.TestEvent>() + public static final EventFactory<TestEvent> FACTORY = new EventFactory<WorkerStressTest.TestEvent>() { @Override public TestEvent newInstance() diff --git a/src/test/java/com/lmax/disruptor/dsl/DisruptorTest.java b/src/test/java/com/lmax/disruptor/dsl/DisruptorTest.java index ea008d8..42511fa 100644 --- a/src/test/java/com/lmax/disruptor/dsl/DisruptorTest.java +++ b/src/test/java/com/lmax/disruptor/dsl/DisruptorTest.java @@ -32,8 +32,8 @@ import com.lmax.disruptor.dsl.stubs.EvilEqualsEventHandler; import com.lmax.disruptor.dsl.stubs.ExceptionThrowingEventHandler; import com.lmax.disruptor.dsl.stubs.SleepingEventHandler; import com.lmax.disruptor.dsl.stubs.StubExceptionHandler; -import com.lmax.disruptor.dsl.stubs.StubExecutor; import com.lmax.disruptor.dsl.stubs.StubPublisher; +import com.lmax.disruptor.dsl.stubs.StubThreadFactory; import com.lmax.disruptor.dsl.stubs.TestWorkHandler; import com.lmax.disruptor.support.TestEvent; import org.junit.After; @@ -44,7 +44,8 @@ import java.util.ArrayList; import java.util.Collection; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executor; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import static java.lang.Thread.yield; @@ -56,15 +57,17 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; @SuppressWarnings(value = {"unchecked"}) public class DisruptorTest { private static final int TIMEOUT_IN_SECONDS = 2; - private Disruptor<TestEvent> disruptor; - private StubExecutor executor; + private final Collection<DelayedEventHandler> delayedEventHandlers = new ArrayList<DelayedEventHandler>(); private final Collection<TestWorkHandler> testWorkHandlers = new ArrayList<TestWorkHandler>(); + private Disruptor<TestEvent> disruptor; + private StubThreadFactory executor; private RingBuffer<TestEvent> ringBuffer; private TestEvent lastPublishedEvent; @@ -91,6 +94,119 @@ public class DisruptorTest } @Test + public void shouldProcessMessagesPublishedBeforeStartIsCalled() throws Exception + { + final CountDownLatch eventCounter = new CountDownLatch(0); + disruptor.handleEventsWith(new EventHandler<TestEvent>() + { + @Override + public void onEvent(final TestEvent event, final long sequence, final boolean endOfBatch) throws Exception + { + eventCounter.countDown(); + } + }); + + disruptor.publishEvent( + new EventTranslator<TestEvent>() + { + @Override + public void translateTo(final TestEvent event, final long sequence) + { + lastPublishedEvent = event; + } + }); + + disruptor.start(); + + disruptor.publishEvent( + new EventTranslator<TestEvent>() + { + @Override + public void translateTo(final TestEvent event, final long sequence) + { + lastPublishedEvent = event; + } + }); + + if (!eventCounter.await(5, TimeUnit.SECONDS)) + { + fail("Did not process event published before start was called. Missed events: " + eventCounter.getCount()); + } + } + + @Test + public void shouldAddEventProcessorsAfterPublishing() throws Exception + { + RingBuffer<TestEvent> rb = disruptor.getRingBuffer(); + BatchEventProcessor<TestEvent> b1 = new BatchEventProcessor<TestEvent>( + rb, rb.newBarrier(), new SleepingEventHandler()); + BatchEventProcessor<TestEvent> b2 = new BatchEventProcessor<TestEvent>( + rb, rb.newBarrier(b1.getSequence()), new SleepingEventHandler()); + BatchEventProcessor<TestEvent> b3 = new BatchEventProcessor<TestEvent>( + rb, rb.newBarrier(b2.getSequence()), new SleepingEventHandler()); + + assertThat(b1.getSequence().get(), is(-1L)); + assertThat(b2.getSequence().get(), is(-1L)); + assertThat(b3.getSequence().get(), is(-1L)); + + rb.publish(rb.next()); + rb.publish(rb.next()); + rb.publish(rb.next()); + rb.publish(rb.next()); + rb.publish(rb.next()); + rb.publish(rb.next()); + + disruptor.handleEventsWith(b1, b2, b3); + + assertThat(b1.getSequence().get(), is(5L)); + assertThat(b2.getSequence().get(), is(5L)); + assertThat(b3.getSequence().get(), is(5L)); + } + + @Test + public void shouldSetSequenceForHandlerIfAddedAfterPublish() throws Exception + { + RingBuffer<TestEvent> rb = disruptor.getRingBuffer(); + EventHandler<TestEvent> b1 = new SleepingEventHandler(); + EventHandler<TestEvent> b2 = new SleepingEventHandler(); + EventHandler<TestEvent> b3 = new SleepingEventHandler(); + + rb.publish(rb.next()); + rb.publish(rb.next()); + rb.publish(rb.next()); + rb.publish(rb.next()); + rb.publish(rb.next()); + rb.publish(rb.next()); + + disruptor.handleEventsWith(b1, b2, b3); + + assertThat(disruptor.getSequenceValueFor(b1), is(5L)); + assertThat(disruptor.getSequenceValueFor(b2), is(5L)); + assertThat(disruptor.getSequenceValueFor(b3), is(5L)); + } + + @Test + public void shouldSetSequenceForWorkProcessorIfAddedAfterPublish() throws Exception + { + RingBuffer<TestEvent> rb = disruptor.getRingBuffer(); + TestWorkHandler wh1 = createTestWorkHandler(); + TestWorkHandler wh2 = createTestWorkHandler(); + TestWorkHandler wh3 = createTestWorkHandler(); + + rb.publish(rb.next()); + rb.publish(rb.next()); + rb.publish(rb.next()); + rb.publish(rb.next()); + rb.publish(rb.next()); + rb.publish(rb.next()); + + disruptor.handleEventsWithWorkerPool(wh1, wh2, wh3); + + assertThat(disruptor.getRingBuffer().getMinimumGatingSequence(), is(5L)); + } + + + @Test public void shouldCreateEventProcessorGroupForFirstEventProcessors() throws Exception { @@ -132,6 +248,29 @@ public class DisruptorTest } @Test + public void should() + throws Exception + { + RingBuffer<TestEvent> rb = disruptor.getRingBuffer(); + BatchEventProcessor<TestEvent> b1 = new BatchEventProcessor<TestEvent>( + rb, rb.newBarrier(), new SleepingEventHandler()); + EventProcessorFactory<TestEvent> b2 = new EventProcessorFactory<TestEvent>() + { + @Override + public EventProcessor createEventProcessor( + RingBuffer<TestEvent> ringBuffer, Sequence[] barrierSequences) + { + return new BatchEventProcessor<TestEvent>( + ringBuffer, ringBuffer.newBarrier(barrierSequences), new SleepingEventHandler()); + } + }; + + disruptor.handleEventsWith(b1).then(b2); + + disruptor.start(); + } + + @Test public void shouldAllowSpecifyingSpecificEventProcessorsToWaitFor() throws Exception { @@ -270,7 +409,7 @@ public class DisruptorTest final StubPublisher stubPublisher = new StubPublisher(ringBuffer); try { - executor.execute(stubPublisher); + executor.newThread(stubPublisher).start(); assertProducerReaches(stubPublisher, 4, true); @@ -338,8 +477,8 @@ public class DisruptorTest final BatchEventProcessor<TestEvent> processor = new BatchEventProcessor<TestEvent>(ringBuffer, ringBuffer.newBarrier(), delayedEventHandler); - disruptor.handleEventsWith(processor); - disruptor.after(processor).handleEventsWith(handlerWithBarrier); + + disruptor.handleEventsWith(processor).then(handlerWithBarrier); ensureTwoEventsProcessedAccordingToDependencies(countDownLatch, delayedEventHandler); } @@ -399,6 +538,28 @@ public class DisruptorTest workHandler2.processEvent(); } + + @Test + public void shouldProvideEventsMultipleWorkHandlers() throws Exception + { + final TestWorkHandler workHandler1 = createTestWorkHandler(); + final TestWorkHandler workHandler2 = createTestWorkHandler(); + final TestWorkHandler workHandler3 = createTestWorkHandler(); + final TestWorkHandler workHandler4 = createTestWorkHandler(); + final TestWorkHandler workHandler5 = createTestWorkHandler(); + final TestWorkHandler workHandler6 = createTestWorkHandler(); + final TestWorkHandler workHandler7 = createTestWorkHandler(); + final TestWorkHandler workHandler8 = createTestWorkHandler(); + + disruptor + .handleEventsWithWorkerPool(workHandler1, workHandler2) + .thenHandleEventsWithWorkerPool(workHandler3, workHandler4); + disruptor + .handleEventsWithWorkerPool(workHandler5, workHandler6) + .thenHandleEventsWithWorkerPool(workHandler7, workHandler8); + } + + @Test public void shouldSupportUsingWorkerPoolAsDependency() throws Exception { @@ -461,8 +622,10 @@ public class DisruptorTest final TestWorkHandler workHandler1 = createTestWorkHandler(); final DelayedEventHandler delayedEventHandler1 = createDelayedEventHandler(); final DelayedEventHandler delayedEventHandler2 = createDelayedEventHandler(); - disruptor.handleEventsWith(delayedEventHandler1).and(disruptor.handleEventsWithWorkerPool(workHandler1)).then( - delayedEventHandler2); + disruptor + .handleEventsWith(delayedEventHandler1) + .and(disruptor.handleEventsWithWorkerPool(workHandler1)) + .then(delayedEventHandler2); publishEvent(); publishEvent(); @@ -549,18 +712,18 @@ public class DisruptorTest final EventHandler<TestEvent> eventHandler = new EventHandlerStub<TestEvent>(countDownLatch); disruptor.handleEventsWith( - new EventProcessorFactory<TestEvent>() - { - @Override - public EventProcessor createEventProcessor( - final RingBuffer<TestEvent> ringBuffer, final Sequence[] barrierSequences) - { - assertEquals("Should not have had any barrier sequences", 0, barrierSequences.length); - return new BatchEventProcessor<TestEvent>( - disruptor.getRingBuffer(), ringBuffer.newBarrier( - barrierSequences), eventHandler); - } - }); + new EventProcessorFactory<TestEvent>() + { + @Override + public EventProcessor createEventProcessor( + final RingBuffer<TestEvent> ringBuffer, final Sequence[] barrierSequences) + { + assertEquals("Should not have had any barrier sequences", 0, barrierSequences.length); + return new BatchEventProcessor<TestEvent>( + disruptor.getRingBuffer(), ringBuffer.newBarrier( + barrierSequences), eventHandler); + } + }); ensureTwoEventsProcessedAccordingToDependencies(countDownLatch); } @@ -641,14 +804,14 @@ public class DisruptorTest private void createDisruptor() { - executor = new StubExecutor(); + executor = new StubThreadFactory(); createDisruptor(executor); } - private void createDisruptor(final Executor executor) + private void createDisruptor(final ThreadFactory threadFactory) { disruptor = new Disruptor<TestEvent>( - TestEvent.EVENT_FACTORY, 4, executor, + TestEvent.EVENT_FACTORY, 4, threadFactory, ProducerType.SINGLE, new BlockingWaitStrategy()); } diff --git a/src/test/java/com/lmax/disruptor/dsl/stubs/StubExecutor.java b/src/test/java/com/lmax/disruptor/dsl/stubs/StubThreadFactory.java similarity index 77% rename from src/test/java/com/lmax/disruptor/dsl/stubs/StubExecutor.java rename to src/test/java/com/lmax/disruptor/dsl/stubs/StubThreadFactory.java index 1167340..0e160e2 100644 --- a/src/test/java/com/lmax/disruptor/dsl/stubs/StubExecutor.java +++ b/src/test/java/com/lmax/disruptor/dsl/stubs/StubThreadFactory.java @@ -20,27 +20,30 @@ import org.junit.Assert; import java.util.Collection; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.Executor; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -public class StubExecutor implements Executor +public final class StubThreadFactory implements ThreadFactory { private final DaemonThreadFactory threadFactory = DaemonThreadFactory.INSTANCE; private final Collection<Thread> threads = new CopyOnWriteArrayList<Thread>(); private final AtomicBoolean ignoreExecutions = new AtomicBoolean(false); private final AtomicInteger executionCount = new AtomicInteger(0); - public void execute(final Runnable command) + @Override + public Thread newThread(final Runnable command) { executionCount.getAndIncrement(); - if (!ignoreExecutions.get()) + Runnable toExecute = command; + if(ignoreExecutions.get()) { - Thread t = threadFactory.newThread(command); - t.setName(command.toString()); - threads.add(t); - t.start(); + toExecute = new NoOpRunnable(); } + final Thread thread = threadFactory.newThread(toExecute); + thread.setName(command.toString()); + threads.add(thread); + return thread; } public void joinAllThreads() @@ -75,4 +78,12 @@ public class StubExecutor implements Executor { return executionCount.get(); } + + private static final class NoOpRunnable implements Runnable + { + @Override + public void run() + { + } + } } diff --git a/src/test/java/com/lmax/disruptor/example/MultiProducerWithTranslator.java b/src/test/java/com/lmax/disruptor/example/MultiProducerWithTranslator.java index 71727aa..7102523 100644 --- a/src/test/java/com/lmax/disruptor/example/MultiProducerWithTranslator.java +++ b/src/test/java/com/lmax/disruptor/example/MultiProducerWithTranslator.java @@ -1,10 +1,13 @@ package com.lmax.disruptor.example; -import com.lmax.disruptor.*; +import com.lmax.disruptor.BlockingWaitStrategy; +import com.lmax.disruptor.EventFactory; +import com.lmax.disruptor.EventHandler; +import com.lmax.disruptor.EventTranslatorThreeArg; +import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType; - -import java.util.concurrent.Executors; +import com.lmax.disruptor.util.DaemonThreadFactory; public class MultiProducerWithTranslator { @@ -72,7 +75,7 @@ public class MultiProducerWithTranslator public static void main(String[] args) throws InterruptedException { Disruptor<ObjectBox> disruptor = new Disruptor<ObjectBox>( - ObjectBox.FACTORY, RING_SIZE, Executors.newCachedThreadPool(), ProducerType.MULTI, + ObjectBox.FACTORY, RING_SIZE, DaemonThreadFactory.INSTANCE, ProducerType.MULTI, new BlockingWaitStrategy()); disruptor.handleEventsWith(new Consumer()).then(new Consumer()); final RingBuffer<ObjectBox> ringBuffer = disruptor.getRingBuffer(); diff --git a/src/test/java/com/lmax/disruptor/example/WaitForShutdown.java b/src/test/java/com/lmax/disruptor/example/WaitForShutdown.java new file mode 100644 index 0000000..ab2ffe0 --- /dev/null +++ b/src/test/java/com/lmax/disruptor/example/WaitForShutdown.java @@ -0,0 +1,65 @@ +package com.lmax.disruptor.example; + +import com.lmax.disruptor.EventHandler; +import com.lmax.disruptor.LifecycleAware; +import com.lmax.disruptor.TimeoutException; +import com.lmax.disruptor.dsl.Disruptor; +import com.lmax.disruptor.support.LongEvent; +import com.lmax.disruptor.util.DaemonThreadFactory; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +public class WaitForShutdown +{ + private static volatile int value = 0; + + private static class Handler implements EventHandler<LongEvent>, LifecycleAware + { + private final CountDownLatch latch; + + public Handler(CountDownLatch latch) + { + this.latch = latch; + } + + @Override + public void onStart() + { + } + + @Override + public void onShutdown() + { + latch.countDown(); + } + + @Override + public void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception + { + value = 1; + } + } + + public static void main(String[] args) throws TimeoutException, InterruptedException + { + Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>( + LongEvent.FACTORY, 16, DaemonThreadFactory.INSTANCE + ); + + CountDownLatch shutdownLatch = new CountDownLatch(2); + + disruptor.handleEventsWith(new Handler(shutdownLatch)).then(new Handler(shutdownLatch)); + disruptor.start(); + + long next = disruptor.getRingBuffer().next(); + disruptor.getRingBuffer().get(next).set(next); + disruptor.getRingBuffer().publish(next); + + disruptor.shutdown(10, TimeUnit.SECONDS); + + shutdownLatch.await(); + + System.out.println(value); + } +} -- Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-java/disruptor.git _______________________________________________ pkg-java-commits mailing list [email protected] http://lists.alioth.debian.org/cgi-bin/mailman/listinfo/pkg-java-commits

