Hello all, I am trying to implement a fork/join process where work is read by a single thread, distributed to several parallel threads then joined back to another single thread. I am using Camel SEDA components for interfacing between threads. The single thread at the end must process the messages in the correct order which is why I add a stream resequencer in the route.
I have built a small prototype that works correctly with a small number of messages (1000 messages, 10 threads in parallel). However if I increase the number of messages (10000 messages, 10 threads) I start getting null pointer exceptions: 01:35:49.719 [Camel (camel-1) thread #0 - Resequencer Delivery] WARN o.a.c.processor.StreamResequencer - Caused by: [java.lang.NullPointerException - null] java.lang.NullPointerException: null at org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:42) ~[camel-core-2.10.3.jar:2.10.3] at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) ~[camel-core-2.10.3.jar:2.10.3] at org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:303) ~[camel-core-2.10.3.jar:2.10.3] at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:99) ~[camel-core-2.10.3.jar:2.10.3] at org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:290) ~[camel-core-2.10.3.jar:2.10.3] at org.apache.camel.processor.StreamResequencer.sendElement(StreamResequencer.java:185) ~[camel-core-2.10.3.jar:2.10.3] at org.apache.camel.processor.StreamResequencer.sendElement(StreamResequencer.java:65) ~[camel-core-2.10.3.jar:2.10.3] at org.apache.camel.processor.resequencer.ResequencerEngine.deliverNext(ResequencerEngine.java:261) ~[camel-core-2.10.3.jar:2.10.3] at org.apache.camel.processor.resequencer.ResequencerEngine.deliver(ResequencerEngine.java:225) ~[camel-core-2.10.3.jar:2.10.3] at org.apache.camel.processor.StreamResequencer$Delivery.run(StreamResequencer.java:242) ~[camel-core-2.10.3.jar:2.10.3] I have tried adjusting the resequencer time-out and capacity without any result. My CPU is only lightly loaded as there is plenty of sleep in the dummy processor. If anybody could tell me what I am doing wrong I would be very grateful. Here is the complete code of my prototype class: package camelTest; import org.apache.camel.CamelContext; import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.ProducerTemplate; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.impl.DefaultCamelContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * * Simulation of a fork / join parallel processing. * * Work is created by the main method, distributed (seda:fork) to several * parallel workers, joined back to a single queue (seda:join) and resequenced. * */ public class MainApp { private static final Logger LOG = LoggerFactory.getLogger(MainApp.class); private static final int NUMBER_OF_MESSAGES = 10000; public static void main(String... args) throws Exception { CamelContext context = new DefaultCamelContext(); context.addRoutes(new MyRouteBuilder()); context.start(); LOG.info("Started"); ProducerTemplate template = context.createProducerTemplate(); for (int i = 0; i < NUMBER_OF_MESSAGES; i++) { template.sendBodyAndHeader("seda:fork", "Test Message: " + i, "seqnum", new Long(i)); } long expectedTime = NUMBER_OF_MESSAGES * (RandomSleepProcessor.MAX_PROCESS_TIME + RandomSleepProcessor.MIN_PROCESS_TIME) / 2 / MyRouteBuilder.CONCURRENCY + MyRouteBuilder.TIMEOUT; LOG.info("Expected time: {}", expectedTime); Thread.sleep(expectedTime); LOG.info("Stopping"); context.stop(); LOG.info("Stopped"); } public static class MyRouteBuilder extends RouteBuilder { // Number of concurrent processing threads public static final int CONCURRENCY = 10; // Additional resequencer time-out above theoretical time-out public static final long SAFETY_TIMEOUT = 100; // Additional resequencer capacity above theoretical capacity public static final int SAFETY_CAPACITY = 10; // Resequencer time-out public static final long TIMEOUT = SAFETY_TIMEOUT + (RandomSleepProcessor.MAX_PROCESS_TIME - RandomSleepProcessor.MIN_PROCESS_TIME); // Resequencer capacity public static final int CAPACITY = SAFETY_CAPACITY + (int) (CONCURRENCY * TIMEOUT / RandomSleepProcessor.MIN_PROCESS_TIME); public void configure() { LOG.info("Number of processor threads: {}", CONCURRENCY); LOG.info("Resequencer time-out: {}", TIMEOUT); LOG.info("Resequencer capacity: {}", CAPACITY); Processor myProcessor = new RandomSleepProcessor(); from("seda:fork?concurrentConsumers=" + CONCURRENCY).process( myProcessor).to("seda:join"); from("seda:join").resequence(header("seqnum")).stream() .capacity(CAPACITY).timeout(TIMEOUT).to("mock:result"); } } /** * Simulation processor that sleeps a random time between MIN_PROCESS_TIME * and MAX_PROCESS_TIME milliseconds. * */ public static class RandomSleepProcessor implements Processor { public static final long MIN_PROCESS_TIME = 5; public static final long MAX_PROCESS_TIME = 50; @Override public void process(Exchange arg0) throws Exception { long processTime = (long) (MIN_PROCESS_TIME + Math.random() * (MAX_PROCESS_TIME - MIN_PROCESS_TIME)); LOG.debug("Process time: {}", processTime); Thread.sleep(processTime); } } } And here is the full log of a run: 01:35:45.632 [main] INFO camelTest.MainApp - Number of processor threads: 10 01:35:45.635 [main] INFO camelTest.MainApp - Resequencer time-out: 145 01:35:45.635 [main] INFO camelTest.MainApp - Resequencer capacity: 300 01:35:45.674 [main] INFO o.a.camel.impl.DefaultCamelContext - Apache Camel 2.10.3 (CamelContext: camel-1) is starting 01:35:45.733 [main] INFO o.a.c.m.ManagementStrategyFactory - JMX enabled. 01:35:45.938 [main] INFO o.a.c.i.c.DefaultTypeConverter - Loaded 172 type converters 01:35:46.333 [main] INFO o.a.camel.impl.DefaultCamelContext - Route: route1 started and consuming from: Endpoint[seda://fork?concurrentConsumers=10] 01:35:46.348 [main] INFO o.a.camel.impl.DefaultCamelContext - Route: route2 started and consuming from: Endpoint[seda://join] 01:35:46.348 [main] INFO o.a.c.m.DefaultManagementLifecycleStrategy - StatisticsLevel at All so enabling load performance statistics 01:35:46.360 [main] INFO o.a.camel.impl.DefaultCamelContext - Total 2 routes, of which 2 is started. 01:35:46.360 [main] INFO o.a.camel.impl.DefaultCamelContext - Apache Camel 2.10.3 (CamelContext: camel-1) started in 0.688 seconds 01:35:46.360 [main] INFO camelTest.MainApp - Started 01:35:48.061 [main] INFO camelTest.MainApp - Expected time: 27645 01:35:49.719 [Camel (camel-1) thread #0 - Resequencer Delivery] WARN o.a.c.processor.StreamResequencer - Caused by: [java.lang.NullPointerException - null] java.lang.NullPointerException: null at org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:42) ~[camel-core-2.10.3.jar:2.10.3] at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) ~[camel-core-2.10.3.jar:2.10.3] at org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:303) ~[camel-core-2.10.3.jar:2.10.3] at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:99) ~[camel-core-2.10.3.jar:2.10.3] at org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:290) ~[camel-core-2.10.3.jar:2.10.3] at org.apache.camel.processor.StreamResequencer.sendElement(StreamResequencer.java:185) ~[camel-core-2.10.3.jar:2.10.3] at org.apache.camel.processor.StreamResequencer.sendElement(StreamResequencer.java:65) ~[camel-core-2.10.3.jar:2.10.3] at org.apache.camel.processor.resequencer.ResequencerEngine.deliverNext(ResequencerEngine.java:261) ~[camel-core-2.10.3.jar:2.10.3] at org.apache.camel.processor.resequencer.ResequencerEngine.deliver(ResequencerEngine.java:225) ~[camel-core-2.10.3.jar:2.10.3] at org.apache.camel.processor.StreamResequencer$Delivery.run(StreamResequencer.java:242) ~[camel-core-2.10.3.jar:2.10.3] 01:36:02.994 [Camel (camel-1) thread #0 - Resequencer Delivery] WARN o.a.c.processor.StreamResequencer - Caused by: [java.lang.NullPointerException - null] java.lang.NullPointerException: null at org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:42) ~[camel-core-2.10.3.jar:2.10.3] at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) ~[camel-core-2.10.3.jar:2.10.3] at org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:303) ~[camel-core-2.10.3.jar:2.10.3] at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:99) ~[camel-core-2.10.3.jar:2.10.3] at org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:290) ~[camel-core-2.10.3.jar:2.10.3] at org.apache.camel.processor.StreamResequencer.sendElement(StreamResequencer.java:185) ~[camel-core-2.10.3.jar:2.10.3] at org.apache.camel.processor.StreamResequencer.sendElement(StreamResequencer.java:65) ~[camel-core-2.10.3.jar:2.10.3] at org.apache.camel.processor.resequencer.ResequencerEngine.deliverNext(ResequencerEngine.java:261) ~[camel-core-2.10.3.jar:2.10.3] at org.apache.camel.processor.resequencer.ResequencerEngine.deliver(ResequencerEngine.java:225) ~[camel-core-2.10.3.jar:2.10.3] at org.apache.camel.processor.StreamResequencer$Delivery.run(StreamResequencer.java:242) ~[camel-core-2.10.3.jar:2.10.3] 01:36:10.913 [Camel (camel-1) thread #0 - Resequencer Delivery] WARN o.a.c.processor.StreamResequencer - Caused by: [java.lang.NullPointerException - null] java.lang.NullPointerException: null at org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:42) ~[camel-core-2.10.3.jar:2.10.3] at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) ~[camel-core-2.10.3.jar:2.10.3] at org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:303) ~[camel-core-2.10.3.jar:2.10.3] at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:99) ~[camel-core-2.10.3.jar:2.10.3] at org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:290) ~[camel-core-2.10.3.jar:2.10.3] at org.apache.camel.processor.StreamResequencer.sendElement(StreamResequencer.java:185) ~[camel-core-2.10.3.jar:2.10.3] at org.apache.camel.processor.StreamResequencer.sendElement(StreamResequencer.java:65) ~[camel-core-2.10.3.jar:2.10.3] at org.apache.camel.processor.resequencer.ResequencerEngine.deliverNext(ResequencerEngine.java:261) ~[camel-core-2.10.3.jar:2.10.3] at org.apache.camel.processor.resequencer.ResequencerEngine.deliver(ResequencerEngine.java:225) ~[camel-core-2.10.3.jar:2.10.3] at org.apache.camel.processor.StreamResequencer$Delivery.run(StreamResequencer.java:242) ~[camel-core-2.10.3.jar:2.10.3] 01:36:15.706 [main] INFO camelTest.MainApp - Stopping 01:36:15.706 [main] INFO o.a.camel.impl.DefaultCamelContext - Apache Camel 2.10.3 (CamelContext: camel-1) is shutting down 01:36:15.707 [main] INFO o.a.c.impl.DefaultShutdownStrategy - Starting to graceful shutdown 2 routes (timeout 300 seconds) 01:36:16.778 [Camel (camel-1) thread #14 - ShutdownTask] INFO o.a.c.impl.DefaultShutdownStrategy - Route: route2 shutdown complete, was consuming from: Endpoint[seda://join] 01:36:16.779 [Camel (camel-1) thread #14 - ShutdownTask] INFO o.a.c.impl.DefaultShutdownStrategy - Route: route1 shutdown complete, was consuming from: Endpoint[seda://fork?concurrentConsumers=10] 01:36:16.779 [main] INFO o.a.c.impl.DefaultShutdownStrategy - Graceful shutdown of 2 routes completed in 1 seconds 01:36:16.784 [main] INFO o.a.c.i.c.DefaultTypeConverter - TypeConverterRegistry utilization[attempts=264380, hits=264380, misses=0, failures=0] mappings[total=172, misses=0] 01:36:16.786 [main] INFO o.a.camel.impl.DefaultCamelContext - Apache Camel 2.10.3 (CamelContext: camel-1) is shutdown in 1.079 seconds. Uptime 31.114 seconds. 01:36:16.786 [main] INFO camelTest.MainApp - Stopped Vincent -- View this message in context: http://camel.465427.n5.nabble.com/Fork-join-with-resequencing-tp5724727.html Sent from the Camel - Users mailing list archive at Nabble.com.