Hi All,

I’m observing an intermittent UnsupportedOperationException in Splitter EIP
with parallel processing when the pooled exchange is enabled. The scenario
is to flatten a nested list using a recursive route. The route has
streaming and parallel processing enabled.


The following exception is raised ~7 out of 10 runs


11:32:17.179 [Camel (camel-1) thread #2 - Split] ERROR
org.apache.camel.processor.errorhandler.DefaultErrorHandler -- Failed
delivery for (MessageId: 6B26CA6266F8B1A-000000000000016E on ExchangeId:
6B26CA6266F8B1A-000000000000016E). Exhausted after delivery attempt: 1
caught: java.lang.UnsupportedOperationException: Is this really correct ?

Message History (source location and message history is disabled)
---------------------------------------------------------------------------------------------------------------------------------------
Source                                   ID
Processor                                          Elapsed (ms)
                                        route1/route1
from[direct://processList]                                    8
...
                                        route1/split1
split[simple{${body}}]                                        0

Stacktrace
---------------------------------------------------------------------------------------------------------------------------------------
java.lang.UnsupportedOperationException: Is this really correct ?
at
org.apache.camel.processor.MulticastProcessor.wrapInErrorHandler(MulticastProcessor.java:1063)
at
org.apache.camel.processor.MulticastProcessor.createProcessorExchangePair(MulticastProcessor.java:1045)
at
org.apache.camel.processor.Splitter$SplitterIterable$1.next(Splitter.java:243)
at
org.apache.camel.processor.Splitter$SplitterIterable$1.next(Splitter.java:184)
at
org.apache.camel.processor.MulticastProcessor$MulticastReactiveTask.getNextProcessorExchangePair(MulticastProcessor.java:640)
at
org.apache.camel.processor.MulticastProcessor$MulticastReactiveTask.run(MulticastProcessor.java:557)
at
org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.doRun(DefaultReactiveExecutor.java:199)
at
org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.executeReactiveWork(DefaultReactiveExecutor.java:189)
at
org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.tryExecuteReactiveWork(DefaultReactiveExecutor.java:166)
at
org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:148)
at
org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleSync(DefaultReactiveExecutor.java:64)
at
org.apache.camel.processor.MulticastProcessor.lambda$doProcess$0(MulticastProcessor.java:362)
at
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)


Test Code


public class FlattenListTest extends CamelTestSupport {

   private static final int L1 = 20;

   private static final int L2 = 5;

   private static final int L3 = 5;

   @Override

   protected RoutesBuilder createRouteBuilder() {

       return new RouteBuilder() {

           @Override

           public void configure() {

           from("direct:processList")

               .choice()

                   .when(exchange -> (exchange.getIn().getBody() instanceof
List))

                       .split(body()).streaming().parallelProcessing(true)

                       .to("direct:processList")

                   .endChoice()

                   .otherwise()

                       .log("${body}")

                       .to("mock:result")

                   .end();

           }

       };

   }

   @Override

   protected CamelContext createCamelContext() throws Exception {

       CamelContext camelContext = super.createCamelContext();

       ExtendedCamelContext ecc = camelContext.getCamelContextExtension();

       ecc.setExchangeFactory(new PooledExchangeFactory());

       ecc.setProcessorExchangeFactory(new
PooledProcessorExchangeFactory());

       ecc.getExchangeFactory().setStatisticsEnabled(true);

       ecc.getProcessorExchangeFactory().setStatisticsEnabled(true);

       return camelContext;

   }

   @Test

   public void testSplitter() throws Exception {

       List<List<List<Integer>>> data = new ArrayList<>();

       int num = 0;

       for (int i = 0; i < L1; i++) {  // Outer level

           List<List<Integer>> level2 = new ArrayList<>();

           for (int j = 0; j < L2; j++) {  // Middle level

               List<Integer> level3 = new ArrayList<>();

               for (int k = 0; k < L3; k++) {  // Inner level

                   level3.add(num++);

               }

               level2.add(level3);

           }

           data.add(level2);

       }

       getMockEndpoint("mock:result").expectedMessageCount(num);

       template.sendBody("direct:processList", data);

       MockEndpoint.assertIsSatisfied(context);

   }

}


The route should flatten the nested list and deliver all individual
elements to the final endpoint without exceptions or data loss. Please let
me know if this is a known limitation or if I’m misusing pooled exchanges
in this scenario.


Camel Version: 4.18.0

Java Version: 17

Reply via email to