Hi,

As a relative newbie to Apache Camel I was experimenting with shutting down
and restarting a route when certain conditions are met in my program (using
camel-core 2.2.0).  While doing this I encountered an
IllegalThreadStateException in the BatchProcessor class of a resequencer
that my route was using.  I put together a short unit test to demonstrate
this:

package org.apache.camel.test;

import org.apache.camel.EndpointInject;
import org.apache.camel.Produce;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.apache.camel.util.ServiceHelper;
import org.junit.Ignore;
import org.junit.Test;

public class BatchProcessorThreadStateTest extends CamelTestSupport{    
    @EndpointInject(uri = "mock:result")
    protected MockEndpoint resultEndpoint;

    @Produce(uri = "direct:start")
    protected ProducerTemplate template;

    @Test
    public void testColdRestartOfBatchProcessor() throws Exception {
        context.stop();
        context.start(); //Causes the resequencer to try and reuse a stopped
thread.
    }
    
    @Override
    protected RouteBuilder createRouteBuilder() {        
        return new RouteBuilder(){
            @Override
            public void configure() throws Exception {
                from("direct:start").resequence(body()).to("mock:result");
            }            
        };        
    }
}

Running this test results in:

java.lang.IllegalThreadStateException
        at java.lang.Thread.start(Thread.java:638)
        at
org.apache.camel.processor.BatchProcessor.doStart(BatchProcessor.java:207)
        at org.apache.camel.impl.ServiceSupport.start(ServiceSupport.java:53)
        at 
org.apache.camel.util.ServiceHelper.startService(ServiceHelper.java:49)
        at 
org.apache.camel.util.ServiceHelper.startServices(ServiceHelper.java:60)
        at
org.apache.camel.processor.DelegateProcessor.doStart(DelegateProcessor.java:71)
        at org.apache.camel.impl.ServiceSupport.start(ServiceSupport.java:53)
        at 
org.apache.camel.util.ServiceHelper.startService(ServiceHelper.java:49)
        at 
org.apache.camel.util.ServiceHelper.startServices(ServiceHelper.java:60)
        at
org.apache.camel.processor.DelegateProcessor.doStart(DelegateProcessor.java:71)
        at
org.apache.camel.processor.interceptor.TraceInterceptor.doStart(TraceInterceptor.java:375)
        at org.apache.camel.impl.ServiceSupport.start(ServiceSupport.java:53)
        at 
org.apache.camel.util.ServiceHelper.startService(ServiceHelper.java:49)
        at 
org.apache.camel.util.ServiceHelper.startServices(ServiceHelper.java:60)
        at
org.apache.camel.processor.DelegateProcessor.doStart(DelegateProcessor.java:71)
        at org.apache.camel.impl.ServiceSupport.start(ServiceSupport.java:53)
        at 
org.apache.camel.util.ServiceHelper.startService(ServiceHelper.java:49)
        at 
org.apache.camel.util.ServiceHelper.startServices(ServiceHelper.java:60)
        at
org.apache.camel.processor.RedeliveryErrorHandler.doStart(RedeliveryErrorHandler.java:477)
        at org.apache.camel.impl.ServiceSupport.start(ServiceSupport.java:53)
        at 
org.apache.camel.util.ServiceHelper.startService(ServiceHelper.java:49)
        at 
org.apache.camel.util.ServiceHelper.startServices(ServiceHelper.java:60)
        at
org.apache.camel.processor.DefaultChannel.doStart(DefaultChannel.java:139)
        at org.apache.camel.impl.ServiceSupport.start(ServiceSupport.java:53)
        at 
org.apache.camel.util.ServiceHelper.startService(ServiceHelper.java:49)
        at 
org.apache.camel.util.ServiceHelper.startServices(ServiceHelper.java:60)
        at
org.apache.camel.processor.DelegateProcessor.doStart(DelegateProcessor.java:71)
        at org.apache.camel.impl.ServiceSupport.start(ServiceSupport.java:53)
        at 
org.apache.camel.util.ServiceHelper.startService(ServiceHelper.java:49)
        at 
org.apache.camel.util.ServiceHelper.startServices(ServiceHelper.java:60)
        at
org.apache.camel.processor.DelegateProcessor.doStart(DelegateProcessor.java:71)
        at org.apache.camel.impl.ServiceSupport.start(ServiceSupport.java:53)
        at 
org.apache.camel.util.ServiceHelper.startService(ServiceHelper.java:49)
        at
org.apache.camel.impl.RouteService.startChildService(RouteService.java:202)
        at org.apache.camel.impl.RouteService.doStart(RouteService.java:141)
        at org.apache.camel.impl.ServiceSupport.start(ServiceSupport.java:53)
        at
org.apache.camel.impl.DefaultCamelContext.start(DefaultCamelContext.java:957)
        at
org.apache.camel.test.BatchProcessorThreadStateTest.testColdRestartOfBatchProcessor(BatchProcessorThreadStateTest.java:23)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
        at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
        at java.lang.reflect.Method.invoke(Method.java:597)
        at org.junit.internal.runners.TestMethod.invoke(TestMethod.java:59)
        at
org.junit.internal.runners.MethodRoadie.runTestMethod(MethodRoadie.java:98)
        at org.junit.internal.runners.MethodRoadie$2.run(MethodRoadie.java:79)
        at
org.junit.internal.runners.MethodRoadie.runBeforesThenTestThenAfters(MethodRoadie.java:87)
        at org.junit.internal.runners.MethodRoadie.runTest(MethodRoadie.java:77)
        at org.junit.internal.runners.MethodRoadie.run(MethodRoadie.java:42)
        at
org.junit.internal.runners.JUnit4ClassRunner.invokeTestMethod(JUnit4ClassRunner.java:88)
        at
org.junit.internal.runners.JUnit4ClassRunner.runMethods(JUnit4ClassRunner.java:51)
        at
org.junit.internal.runners.JUnit4ClassRunner$1.run(JUnit4ClassRunner.java:44)
        at
org.junit.internal.runners.ClassRoadie.runUnprotected(ClassRoadie.java:27)
        at 
org.junit.internal.runners.ClassRoadie.runProtected(ClassRoadie.java:37)
        at
org.junit.internal.runners.JUnit4ClassRunner.run(JUnit4ClassRunner.java:42)
        at
org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:46)
        at
org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
        at
org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:467)
        at
org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:683)
        at
org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:390)
        at
org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:197)

If my guess is correct the BatchProcessor class is attempting the use the
same instance of a BatchSender that was stopped during the shutdown of the
route instead of creating a new one to act as the sending thread.  This code
appears to be unchanged through to the current head revision in the
repository.

Hopefully I'm not doing anything here that is contrary to the intended
design of Camel.  I'm also aware that newer versions of Camel allow
suspend/resume functionality that might not have this problem, but it might
be necessary for some use cases to use start/stop instead.
-- 
View this message in context: 
http://camel.465427.n5.nabble.com/IllegalThreadStateException-on-cold-restart-of-BatchProcessor-tp3296082p3296082.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Reply via email to