Hi Team, I upgraded the camel version to 3.9 and the kafka-client jar upgraded to 2.7.0. I could see that kafka consumers are able to resume from the WakeupException and InterruptedException. However, I see a new issue. I see the below error message that gets printed endlessly until the consumer service is restarted manually.
2021-06-09 23:02:09.228+0000 ERROR Camel (camel-1) thread #1 - KafkaConsumer[testTopicV1] [camel.processor.errorhandler.DefaultErrorHandler(log:205)] Failed delivery for (MessageId: *D3D1C394132DC66-0000000000000001 *on ExchangeId: D3D1C394132DC66-0000000000000001). Exhausted after delivery attempt: 1 caught: java.lang.IllegalStateException: Consumer is not subscribed to any topics or assigned any partitions Message History (complete message history is disabled) --------------------------------------------------------------------------------------------------------------------------------------- RouteId ProcessorId Processor Elapsed (ms) [route1 ] [route1 ] [from[kafka://testTopicV1?allowManualCommit=True&autoCommitEnable=False] [ 7] [route1 ] [process3 ] [Processor@0x77c233af ] [ 0] Stacktrace --------------------------------------------------------------------------------------------------------------------------------------- java.lang.IllegalStateException: Consumer is not subscribed to any topics or assigned any partitions at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1218) ~[kafka-clients-2.7.0.jar!/:?] at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1161) ~[kafka-clients-2.7.0.jar!/:?] at org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.doPollRun(KafkaConsumer.java:345) [camel-kafka-3.9.0.jar!/:3.9.0] at org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.doRun(KafkaConsumer.java:273) [camel-kafka-3.9.0.jar!/:3.9.0] at org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.run(KafkaConsumer.java:238) [camel-kafka-3.9.0.jar!/:3.9.0] at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) [?:?] at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:?] at java.lang.Thread.run(Thread.java:829) [?:?] Exception keeps appearing something like this: 1st occurance of the exception at time *23:00:34.195* prints D3D1C394132DC66-0000000000000000 2nd occurrence happens at *23:02:09.228* prints D3D1C394132DC66-000000000000000*1* 3rd occurrence happens at *23:02:14.235* prints D3D1C394132DC66-000000000000000*2* 4th occurrence happens at *23:02:19.243* prints D3D1C394132DC66-000000000000000*3* All these are printed by the same thread which was initially processing the original request before the exception started. However, when I checked the logs, the consumer threads are getting subscribed back to the topic. Not sure why this message is not processed and errors out endlessly. Any inputs are appreciated. -Regards Srikant Mantha On Thu, Apr 1, 2021 at 6:44 PM SRIKANT MVS <srikant....@gmail.com> wrote: > Hi Andrea/Claus, > Thanks for the quick response. I am seeing this issue from camel 3.1.0, > also migration to camel 3.4.5 (java 8 support) and kafka client upgraded to > 2.5.0. Still the same issue is seen. > > Let me also give a try upgrading to 3.9.0. > > -Regards > Srikant Mantha > > On Thu, Apr 1, 2021 at 6:37 PM Andrea Cosentino <anco...@gmail.com> wrote: > >> Please try with an LTS version 3.7.x. 3.2.0 was a development version. >> >> Il giorno gio 1 apr 2021 alle ore 18:30 SRIKANT MVS < >> srikant....@gmail.com> >> ha scritto: >> >> > HI Team, >> > >> > I am using camel-kafka (version: 3.2.0) for consuming messages. >> > Below is the flow >> > >> > 1. Kafka service consumes events from the topic >> > 2. Make a call to the Server >> > 3. When the server is not responding in 40ms, throw >> > ServerUnavailableException >> > 4. Stopping Kafka consumer on the topic >> > 5. Unsubscribing from the topic testTopicV1 >> > 6. *Error unsubscribing testTopicV1-Thread 0 from kafka topic >> > testTopicV1. Caused by: >> [org.apache.kafka.common.errors.WakeupException >> > - >> > null]* >> > 7. >> > >> > >> > *Also seen an InterruptedException with the message "Interrupted while >> > waiting for consumer heartbeat thread to close" Once this happens, >> then >> > the >> > Kafka consumer never recovers and I have to start the consumer >> service >> > manually. Any idea why this happens and how to mitigate from this >> issue >> > ? >> > 2021-01-31 05:48:23.143+0000 WARN Camel (camel-1) thread #6 - >> > KafkaConsumer[testTopicV1] >> > [camel.component.kafka.KafkaConsumer(log:212)] >> > Error unsubscribing testTopicV1-Thread 0 from kafka topic >> testTopicV1. >> > Caused by: [org.apache.kafka.common.errors.WakeupException - null] >> > 2021-01-31 05:48:23.143+0000 INFO Camel (camel-1) thread #7 - >> > KafkaConsumer[testTopicV1] >> > [camel.component.kafka.KafkaConsumer(doRun:397)] >> > Unsubscribing testTopicV1-Thread 1 from topic testTopicV1 >> > org.apache.kafka.common.errors.WakeupException: null at >> > >> > >> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.maybeTriggerWakeup(ConsumerNetworkClient.java:511) >> > ~[kafka-clients-2.4.0.jar!/:?] at >> > >> > >> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:275) >> > ~[kafka-clients-2.4.0.jar!/:?] at >> > >> > >> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233) >> > ~[kafka-clients-2.4.0.jar!/:?] at >> > >> > >> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:212) >> > ~[kafka-clients-2.4.0.jar!/:?] at >> > >> > >> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:937) >> > ~[kafka-clients-2.4.0.jar!/:?] at >> > >> > >> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1473) >> > ~[kafka-clients-2.4.0.jar!/:?] at >> > >> > >> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1431) >> > ~[kafka-clients-2.4.0.jar!/:?] at >> > >> > >> org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.commitOffset(KafkaConsumer.java:432) >> > ~[camel-kafka-3.2.0.jar!/:3.2.0] at >> > >> > >> org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.onPartitionsRevoked(KafkaConsumer.java:455) >> > ~[camel-kafka-3.2.0.jar!/:3.2.0] at >> > >> > >> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsRevoked(ConsumerCoordinator.java:291) >> > ~[kafka-clients-2.4.0.jar!/:?] at >> > >> > >> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onLeavePrepare(ConsumerCoordinator.java:707) >> > ~[kafka-clients-2.4.0.jar!/:?] at >> > >> > >> org.apache.kafka.clients.consumer.KafkaConsumer.unsubscribe(KafkaConsumer.java:1073) >> > ~[kafka-clients-2.4.0.jar!/:?] at >> > >> > >> org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.doRun(KafkaConsumer.java:400) >> > [camel-kafka-3.2.0.jar!/:3.2.0] at >> > >> > >> org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.run(KafkaConsumer.java:214) >> > [camel-kafka-3.2.0.jar!/:3.2.0] at >> > java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) >> > [?:1.8.0_271] at java.util.concurrent.FutureTask.run(Unknown Source) >> > [?:1.8.0_271] at >> > java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown >> > Source) [?:1.8.0_271] at >> > java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) >> > [?:1.8.0_271] at java.lang.Thread.run(Unknown Source) >> [?:1.8.0_271]Full >> > StackTrace 2021-01-31 05:48:13.138+0000 ERROR Camel (camel-1) thread >> #7 >> > - >> > KafkaConsumer[testTopicV1] >> > [camel.processor.errorhandler.DefaultErrorHandler(log:203)] Failed >> > delivery >> > for (MessageId: ID-TestId-1611647443253-0-25 on ExchangeId: >> > ID-TestId-1611647443253-0-25). Exhausted after delivery attempt: 1 >> > caught: >> > com.example.exception.ServerUnavailableException: RestClientException >> > during rest call Message History (complete message history is >> disabled) >> > >> > >> --------------------------------------------------------------------------------------------------------------------------------------- >> > RouteId ProcessorId Processor Elapsed (ms) [route1 ] [route1 ] >> > >> > >> [from[kafka://testTopicV1?allowManualCommit=True&autoCommitEnable=False] [ >> > 40027] ... [route1 ] [to1 ] [bean:kafkaProcessor ] [ 0] Stacktrace >> > >> > >> --------------------------------------------------------------------------------------------------------------------------------------- >> > com.example.exception.ServerUnavailableException: RestClientException >> > during rest call at >> > com.example.processor.KafkaProcessor.process(KafkaProcessor.java:46) >> > ~[classes!/:?] at >> > >> > >> org.apache.camel.component.bean.AbstractBeanProcessor.process(AbstractBeanProcessor.java:117) >> > ~[camel-bean-3.2.0.jar!/:3.2.0] at >> > >> > >> org.apache.camel.component.bean.BeanProcessor.process(BeanProcessor.java:56) >> > ~[camel-bean-3.2.0.jar!/:3.2.0] at >> > >> > >> org.apache.camel.component.bean.BeanProducer.process(BeanProducer.java:41) >> > ~[camel-bean-3.2.0.jar!/:3.2.0] at >> > >> org.apache.camel.processor.SendProcessor.process(SendProcessor.java:166) >> > ~[camel-base-3.2.0.jar!/:3.2.0] at >> > >> > >> org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$RedeliveryTask.doRun(RedeliveryErrorHandler.java:702) >> > [camel-base-3.2.0.jar!/:3.2.0] at >> > >> > >> org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$RedeliveryTask.run(RedeliveryErrorHandler.java:616) >> > [camel-base-3.2.0.jar!/:3.2.0] at >> > >> > >> org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:148) >> > [camel-base-3.2.0.jar!/:3.2.0] at >> > >> > >> org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:60) >> > [camel-base-3.2.0.jar!/:3.2.0] at >> > org.apache.camel.processor.Pipeline.process(Pipeline.java:147) >> > [camel-base-3.2.0.jar!/:3.2.0] at >> > >> > >> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:286) >> > [camel-base-3.2.0.jar!/:3.2.0] at >> > >> > >> org.apache.camel.impl.engine.DefaultAsyncProcessorAwaitManager.process(DefaultAsyncProcessorAwaitManager.java:83) >> > [camel-base-3.2.0.jar!/:3.2.0] at >> > >> > >> org.apache.camel.support.AsyncProcessorSupport.process(AsyncProcessorSupport.java:40) >> > [camel-support-3.2.0.jar!/:3.2.0] at >> > >> > >> org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.doRun(KafkaConsumer.java:338) >> > [camel-kafka-3.2.0.jar!/:3.2.0] at >> > >> > >> org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.run(KafkaConsumer.java:214) >> > [camel-kafka-3.2.0.jar!/:3.2.0] at >> > java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) >> > [?:1.8.0_271] at java.util.concurrent.FutureTask.run(Unknown Source) >> > [?:1.8.0_271] at >> > java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown >> > Source) [?:1.8.0_271] at >> > java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) >> > [?:1.8.0_271] at java.lang.Thread.run(Unknown Source) [?:1.8.0_271] >> > Caused >> > by: org.springframework.web.client.ResourceAccessException: I/O >> error on >> > POST request for "http://xxxxxxx": Read timed out; nested exception >> is >> > java.net.SocketTimeoutException: Read timed out at >> > >> > >> org.springframework.web.client.RestTemplate.doExecute(RestTemplate.java:746) >> > ~[spring-web-5.2.10.RELEASE.jar!/:5.2.10.RELEASE] at >> > >> > >> org.springframework.web.client.RestTemplate.execute(RestTemplate.java:672) >> > ~[spring-web-5.2.10.RELEASE.jar!/:5.2.10.RELEASE] at >> > >> > >> org.springframework.web.client.RestTemplate.postForEntity(RestTemplate.java:447) >> > ~[spring-web-5.2.10.RELEASE.jar!/:5.2.10.RELEASE] ... 21 more Caused >> by: >> > java.net.SocketTimeoutException: Read timed out at >> > java.net.SocketInputStream.socketRead0(Native Method) ~[?:1.8.0_271] >> at >> > java.net.SocketInputStream.socketRead(Unknown Source) ~[?:1.8.0_271] >> at >> > java.net.SocketInputStream.read(Unknown Source) ~[?:1.8.0_271] at >> > java.net.SocketInputStream.read(Unknown Source) ~[?:1.8.0_271] at >> > org.apache.http.impl.io >> > .SessionInputBufferImpl.streamRead(SessionInputBufferImpl.java:137) >> > ~[httpcore-4.4.13.jar!/:4.4.13] at >> > org.apache.http.impl.io >> > .SessionInputBufferImpl.fillBuffer(SessionInputBufferImpl.java:153) >> > ~[httpcore-4.4.13.jar!/:4.4.13] at >> > org.apache.http.impl.io >> > .SessionInputBufferImpl.readLine(SessionInputBufferImpl.java:280) >> > ~[httpcore-4.4.13.jar!/:4.4.13] at >> > >> > >> org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:138) >> > ~[httpclient-4.5.12.jar!/:4.5.12] at >> > >> > >> org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:56) >> > ~[httpclient-4.5.12.jar!/:4.5.12] at >> > org.apache.http.impl.io >> > .AbstractMessageParser.parse(AbstractMessageParser.java:259) >> > ~[httpcore-4.4.13.jar!/:4.4.13] at >> > >> > >> org.apache.http.impl.DefaultBHttpClientConnection.receiveResponseHeader(DefaultBHttpClientConnection.java:163) >> > ~[httpcore-4.4.13.jar!/:4.4.13] at >> > >> > >> org.apache.http.impl.conn.CPoolProxy.receiveResponseHeader(CPoolProxy.java:157) >> > ~[httpclient-4.5.12.jar!/:4.5.12] at >> > >> > >> org.apache.http.protocol.HttpRequestExecutor.doReceiveResponse(HttpRequestExecutor.java:273) >> > ~[httpcore-4.4.13.jar!/:4.4.13] at >> > >> > >> org.apache.http.protocol.HttpRequestExecutor.execute(HttpRequestExecutor.java:125) >> > ~[httpcore-4.4.13.jar!/:4.4.13] at >> > >> > >> org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:272) >> > ~[httpclient-4.5.12.jar!/:4.5.12] at >> > >> > >> org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186) >> > ~[httpclient-4.5.12.jar!/:4.5.12] at >> > org.apache.http.impl.execchain.RetryExec.execute(RetryExec.java:89) >> > ~[httpclient-4.5.12.jar!/:4.5.12] at >> > >> > >> org.apache.http.impl.execchain.RedirectExec.execute(RedirectExec.java:110) >> > ~[httpclient-4.5.12.jar!/:4.5.12] at >> > >> > >> org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185) >> > ~[httpclient-4.5.12.jar!/:4.5.12] at >> > >> > >> org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83) >> > ~[httpclient-4.5.12.jar!/:4.5.12] at >> > >> > >> org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56) >> > ~[httpclient-4.5.12.jar!/:4.5.12] at >> > >> > >> org.springframework.http.client.HttpComponentsClientHttpRequest.executeInternal(HttpComponentsClientHttpRequest.java:87) >> > ~[spring-web-5.2.10.RELEASE.jar!/:5.2.10.RELEASE] at >> > >> > >> org.springframework.http.client.AbstractBufferingClientHttpRequest.executeInternal(AbstractBufferingClientHttpRequest.java:48) >> > ~[spring-web-5.2.10.RELEASE.jar!/:5.2.10.RELEASE] at >> > >> > >> org.springframework.http.client.AbstractClientHttpRequest.execute(AbstractClientHttpRequest.java:53) >> > ~[spring-web-5.2.10.RELEASE.jar!/:5.2.10.RELEASE] at >> > >> > >> org.springframework.http.client.InterceptingClientHttpRequest$InterceptingRequestExecution.execute(InterceptingClientHttpRequest.java:109) >> > ~[spring-web-5.2.10.RELEASE.jar!/:5.2.10.RELEASE] at >> > >> > >> org.springframework.boot.actuate.metrics.web.client.MetricsClientHttpRequestInterceptor.intercept(MetricsClientHttpRequestInterceptor.java:95) >> > ~[spring-boot-actuator-2.2.11.RELEASE.jar!/:2.2.11.RELEASE] at >> > >> > >> org.springframework.http.client.InterceptingClientHttpRequest$InterceptingRequestExecution.execute(InterceptingClientHttpRequest.java:93) >> > ~[spring-web-5.2.10.RELEASE.jar!/:5.2.10.RELEASE] at >> > >> > >> org.springframework.http.client.InterceptingClientHttpRequest.executeInternal(InterceptingClientHttpRequest.java:77) >> > ~[spring-web-5.2.10.RELEASE.jar!/:5.2.10.RELEASE] at >> > >> > >> org.springframework.http.client.AbstractBufferingClientHttpRequest.executeInternal(AbstractBufferingClientHttpRequest.java:48) >> > ~[spring-web-5.2.10.RELEASE.jar!/:5.2.10.RELEASE] at >> > >> > >> org.springframework.http.client.AbstractClientHttpRequest.execute(AbstractClientHttpRequest.java:53) >> > ~[spring-web-5.2.10.RELEASE.jar!/:5.2.10.RELEASE] at >> > >> > >> org.springframework.web.client.RestTemplate.doExecute(RestTemplate.java:737) >> > ~[spring-web-5.2.10.RELEASE.jar!/:5.2.10.RELEASE] at >> > >> > >> org.springframework.web.client.RestTemplate.execute(RestTemplate.java:672) >> > ~[spring-web-5.2.10.RELEASE.jar!/:5.2.10.RELEASE] at >> > >> > >> org.springframework.web.client.RestTemplate.postForEntity(RestTemplate.java:447) >> > ~[spring-web-5.2.10.RELEASE.jar!/:5.2.10.RELEASE] ... 21 more >> 2021-01-31 >> > 05:48:13.139+0000 INFO Camel (camel-1) thread #7 - >> > KafkaConsumer[testTopicV1] >> > [camel.component.kafka.KafkaConsumer(doStop:141)] Stopping Kafka >> > consumer >> > on topic: testTopicV1 2021-01-31 05:48:15.140+0000 INFO Camel >> (camel-1) >> > thread #7 - KafkaConsumer[testTopicV1] >> > [camel.impl.engine.BaseExecutorServiceManager(awaitTermination:405)] >> > Waited >> > 2.000 seconds for ExecutorService: >> > >> org.apache.camel.util.concurrent.RejectableThreadPoolExecutor@35c175c0 >> > [Shutting >> > down, pool size = 2, active threads = 2, queued tasks = 0, completed >> > tasks >> > = 0][KafkaConsumer[testTopicV1]] to terminate... 2021-01-31 >> > 05:48:17.141+0000 INFO Camel (camel-1) thread #7 - >> > KafkaConsumer[testTopicV1] >> > [camel.impl.engine.BaseExecutorServiceManager(awaitTermination:405)] >> > Waited >> > 4.001 seconds for ExecutorService: >> > >> org.apache.camel.util.concurrent.RejectableThreadPoolExecutor@35c175c0 >> > [Shutting >> > down, pool size = 2, active threads = 2, queued tasks = 0, completed >> > tasks >> > = 0][KafkaConsumer[testTopicV1]] to terminate... 2021-01-31 >> > 05:48:19.142+0000 INFO Camel (camel-1) thread #7 - >> > KafkaConsumer[testTopicV1] >> > [camel.impl.engine.BaseExecutorServiceManager(awaitTermination:405)] >> > Waited >> > 6.001 seconds for ExecutorService: >> > >> org.apache.camel.util.concurrent.RejectableThreadPoolExecutor@35c175c0 >> > [Shutting >> > down, pool size = 2, active threads = 2, queued tasks = 0, completed >> > tasks >> > = 0][KafkaConsumer[testTopicV1]] to terminate... 2021-01-31 >> > 05:48:21.147+0000 INFO Camel (camel-1) thread #7 - >> > KafkaConsumer[testTopicV1] >> > [camel.impl.engine.BaseExecutorServiceManager(awaitTermination:405)] >> > Waited >> > 8.007 seconds for ExecutorService: >> > >> org.apache.camel.util.concurrent.RejectableThreadPoolExecutor@35c175c0 >> > [Shutting >> > down, pool size = 2, active threads = 2, queued tasks = 0, completed >> > tasks >> > = 0][KafkaConsumer[testTopicV1]] to terminate... 2021-01-31 >> > 05:48:23.141+0000 INFO Camel (camel-1) thread #7 - >> > KafkaConsumer[testTopicV1] >> > [camel.impl.engine.BaseExecutorServiceManager(awaitTermination:405)] >> > Waited >> > 10.000 seconds for ExecutorService: >> > >> org.apache.camel.util.concurrent.RejectableThreadPoolExecutor@35c175c0 >> > [Shutting >> > down, pool size = 2, active threads = 2, queued tasks = 0, completed >> > tasks >> > = 0][KafkaConsumer[testTopicV1]] to terminate... 2021-01-31 >> > 05:48:23.141+0000 WARN Camel (camel-1) thread #7 - >> > KafkaConsumer[testTopicV1] >> > [camel.impl.engine.BaseExecutorServiceManager(doShutdown:305)] >> Forcing >> > shutdown of ExecutorService: >> > >> org.apache.camel.util.concurrent.RejectableThreadPoolExecutor@35c175c0 >> > [Shutting >> > down, pool size = 2, active threads = 2, queued tasks = 0, completed >> > tasks >> > = 0][KafkaConsumer[testTopicV1]] due first await termination elapsed. >> > 2021-01-31 05:48:23.141+0000 WARN Camel (camel-1) thread #7 - >> > KafkaConsumer[testTopicV1] >> > [camel.impl.engine.BaseExecutorServiceManager(doShutdown:314)] >> Forcing >> > shutdown of ExecutorService: >> > >> org.apache.camel.util.concurrent.RejectableThreadPoolExecutor@35c175c0 >> > [Shutting >> > down, pool size = 2, active threads = 2, queued tasks = 0, completed >> > tasks >> > = 0][KafkaConsumer[testTopicV1]] due interrupted. 2021-01-31 >> > 05:48:23.141+0000 INFO Camel (camel-1) thread #7 - >> > KafkaConsumer[testTopicV1] >> > [camel.impl.engine.BaseExecutorServiceManager(doShutdown:322)] >> Shutdown >> > of >> > ExecutorService: >> > >> org.apache.camel.util.concurrent.RejectableThreadPoolExecutor@35c175c0 >> > [Shutting >> > down, pool size = 2, active threads = 2, queued tasks = 0, completed >> > tasks >> > = 0][KafkaConsumer[testTopicV1]] is shutdown: true and terminated: >> false >> > took: 10.001 seconds. 2021-01-31 05:48:23.142+0000 INFO Camel >> (camel-1) >> > thread #6 - KafkaConsumer[testTopicV1] >> > [camel.component.kafka.KafkaConsumer(doRun:397)] Unsubscribing >> > testTopicV1-Thread 0 from topic testTopicV1 2021-01-31 >> 05:48:23.143+0000 >> > INFO Camel (camel-1) thread #6 - KafkaConsumer[testTopicV1] >> > >> > >> [clients.consumer.internals.ConsumerCoordinator(invokePartitionsRevoked:286)] >> > [Consumer clientId=consumer-TestService-5, groupId=TestService] >> Revoke >> > previously assigned partitions testTopicV1-13, testTopicV1-12, >> > testTopicV1-14, testTopicV1-11, testTopicV1-10 2021-01-31 >> > 05:48:23.142+0000 >> > WARN Camel (camel-1) thread #7 - KafkaConsumer[testTopicV1] >> > [camel.component.kafka.KafkaConsumer(log:212)] Error during >> processing. >> > Exchange[ID-TestId-1611647443253-0-25]. Caused by: >> > [ServerUnavailableException - RestClientException during rest call] >> > ServerUnavailableException: RestClientException during rest call >> > 2021-01-31 >> > 05:48:23.143+0000 INFO kafka-coordinator-heartbeat-thread | >> TestService >> > >> > >> [clients.consumer.internals.AbstractCoordinator(markCoordinatorUnknown:808)] >> > [Consumer clientId=consumer-TestService-5, groupId=TestService] Group >> > coordinator KafkaBroker3:9093 (id: 2147482641 rack: null) is >> > unavailable or >> > invalid, will attempt rediscovery 2021-01-31 05:48:23.143+0000 WARN >> > Camel >> > (camel-1) thread #6 - KafkaConsumer[testTopicV1] >> > [camel.component.kafka.KafkaConsumer(log:212)] Error unsubscribing >> > testTopicV1-Thread 0 from kafka topic testTopicV1. Caused by: >> > [org.apache.kafka.common.errors.WakeupException - null] 2021-01-31 >> > 05:48:23.143+0000 INFO Camel (camel-1) thread #7 - >> > KafkaConsumer[testTopicV1] >> > [camel.component.kafka.KafkaConsumer(doRun:397)] >> > Unsubscribing testTopicV1-Thread 1 from topic testTopicV1 >> > org.apache.kafka.common.errors.WakeupException: null at >> > >> > >> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.maybeTriggerWakeup(ConsumerNetworkClient.java:511) >> > ~[kafka-clients-2.4.0.jar!/:?] at >> > >> > >> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:275) >> > ~[kafka-clients-2.4.0.jar!/:?] at >> > >> > >> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233) >> > ~[kafka-clients-2.4.0.jar!/:?] at >> > >> > >> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:212) >> > ~[kafka-clients-2.4.0.jar!/:?] at >> > >> > >> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:937) >> > ~[kafka-clients-2.4.0.jar!/:?] at >> > >> > >> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1473) >> > ~[kafka-clients-2.4.0.jar!/:?] at >> > >> > >> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1431) >> > ~[kafka-clients-2.4.0.jar!/:?] at >> > >> > >> org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.commitOffset(KafkaConsumer.java:432) >> > ~[camel-kafka-3.2.0.jar!/:3.2.0] at >> > >> > >> org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.onPartitionsRevoked(KafkaConsumer.java:455) >> > ~[camel-kafka-3.2.0.jar!/:3.2.0] at >> > >> > >> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsRevoked(ConsumerCoordinator.java:291) >> > ~[kafka-clients-2.4.0.jar!/:?] at >> > >> > >> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onLeavePrepare(ConsumerCoordinator.java:707) >> > ~[kafka-clients-2.4.0.jar!/:?] at >> > >> > >> org.apache.kafka.clients.consumer.KafkaConsumer.unsubscribe(KafkaConsumer.java:1073) >> > ~[kafka-clients-2.4.0.jar!/:?] at >> > >> > >> org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.doRun(KafkaConsumer.java:400) >> > [camel-kafka-3.2.0.jar!/:3.2.0] at >> > >> > >> org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.run(KafkaConsumer.java:214) >> > [camel-kafka-3.2.0.jar!/:3.2.0] at >> > java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) >> > [?:1.8.0_271] at java.util.concurrent.FutureTask.run(Unknown Source) >> > [?:1.8.0_271] at >> > java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown >> > Source) [?:1.8.0_271] at >> > java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) >> > [?:1.8.0_271] at java.lang.Thread.run(Unknown Source) [?:1.8.0_271] >> > 2021-01-31 05:48:23.143+0000 INFO Camel (camel-1) thread #7 - >> > KafkaConsumer[testTopicV1] >> > >> > >> [clients.consumer.internals.ConsumerCoordinator(invokePartitionsRevoked:286)] >> > [Consumer clientId=consumer-TestService-6, groupId=TestService] >> Revoke >> > previously assigned partitions testTopicV1-24, testTopicV1-21, >> > testTopicV1-20, testTopicV1-23, testTopicV1-22 2021-01-31 >> > 05:48:23.143+0000 >> > WARN Camel (camel-1) thread #6 - KafkaConsumer[testTopicV1] >> > >> > >> [clients.consumer.internals.AbstractCoordinator(closeHeartbeatThread:367)] >> > [Consumer clientId=consumer-TestService-5, groupId=TestService] >> > Interrupted >> > while waiting for consumer heartbeat thread to close 2021-01-31 >> > 05:48:23.144+0000 INFO Camel (camel-1) thread #6 - >> > KafkaConsumer[testTopicV1] >> > >> > >> [clients.consumer.internals.ConsumerCoordinator(invokePartitionsRevoked:286)] >> > [Consumer clientId=consumer-TestService-5, groupId=TestService] >> Revoke >> > previously assigned partitions testTopicV1-13, testTopicV1-12, >> > testTopicV1-14, testTopicV1-11, testTopicV1-10 2021-01-31 >> > 05:48:23.144+0000 >> > WARN Camel (camel-1) thread #7 - KafkaConsumer[testTopicV1] >> > [camel.component.kafka.KafkaConsumer(log:212)] Error unsubscribing >> > testTopicV1-Thread 1 from kafka topic testTopicV1. Caused by: >> > [org.apache.kafka.common.errors.WakeupException - null] >> > org.apache.kafka.common.errors.WakeupException: null .... 2021-01-31 >> > 05:48:23.144+0000 WARN Camel (camel-1) thread #7 - >> > KafkaConsumer[testTopicV1] >> > >> > >> [clients.consumer.internals.AbstractCoordinator(closeHeartbeatThread:367)] >> > [Consumer clientId=consumer-TestService-6, groupId=TestService] >> > Interrupted >> > while waiting for consumer heartbeat thread to close 2021-01-31 >> > 05:48:23.144+0000 INFO Camel (camel-1) thread #7 - >> > KafkaConsumer[testTopicV1] >> > >> > >> [clients.consumer.internals.ConsumerCoordinator(invokePartitionsRevoked:286)] >> > [Consumer clientId=consumer-TestService-6, groupId=TestService] >> Revoke >> > previously assigned partitions testTopicV1-24, testTopicV1-21, >> > testTopicV1-20, testTopicV1-23, testTopicV1-22 2021-01-31 >> > 05:48:23.144+0000 >> > ERROR Camel (camel-1) thread #6 - KafkaConsumer[testTopicV1] >> > [kafka.clients.consumer.KafkaConsumer(close:2297)] [Consumer >> > clientId=consumer-TestService-5, groupId=TestService] Failed to close >> > coordinator org.apache.kafka.common.errors.InterruptException: >> > java.lang.InterruptedException at >> > >> > >> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.maybeThrowInterruptException(ConsumerNetworkClient.java:517) >> > ~[kafka-clients-2.4.0.jar!/:?] at >> > >> > >> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:278) >> > ~[kafka-clients-2.4.0.jar!/:?] at >> > >> > >> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233) >> > ~[kafka-clients-2.4.0.jar!/:?] at >> > >> > >> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:212) >> > ~[kafka-clients-2.4.0.jar!/:?] at >> > >> > >> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:230) >> > ~[kafka-clients-2.4.0.jar!/:?] at >> > >> > >> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:932) >> > ~[kafka-clients-2.4.0.jar!/:?] at >> > >> > >> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1473) >> > ~[kafka-clients-2.4.0.jar!/:?] at >> > >> > >> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1431) >> > ~[kafka-clients-2.4.0.jar!/:?] at >> > >> > >> org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.commitOffset(KafkaConsumer.java:432) >> > ~[camel-kafka-3.2.0.jar!/:3.2.0] at >> > >> > >> org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.onPartitionsRevoked(KafkaConsumer.java:455) >> > ~[camel-kafka-3.2.0.jar!/:3.2.0] at >> > >> > >> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsRevoked(ConsumerCoordinator.java:291) >> > ~[kafka-clients-2.4.0.jar!/:?] at >> > >> > >> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onLeavePrepare(ConsumerCoordinator.java:707) >> > ~[kafka-clients-2.4.0.jar!/:?] at >> > >> > >> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.close(AbstractCoordinator.java:887) >> > ~[kafka-clients-2.4.0.jar!/:?] at >> > >> > >> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.close(ConsumerCoordinator.java:832) >> > ~[kafka-clients-2.4.0.jar!/:?] at >> > >> > >> org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2294) >> > [kafka-clients-2.4.0.jar!/:?] at >> > >> > >> org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2261) >> > [kafka-clients-2.4.0.jar!/:?] at >> > >> > >> org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2211) >> > [kafka-clients-2.4.0.jar!/:?] at >> > org.apache.camel.util.IOHelper.close(IOHelper.java:341) >> > [camel-util-3.2.0.jar!/:3.2.0] at >> > org.apache.camel.util.IOHelper.close(IOHelper.java:403) >> > [camel-util-3.2.0.jar!/:3.2.0] at >> > >> > >> org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.doRun(KafkaConsumer.java:419) >> > [camel-kafka-3.2.0.jar!/:3.2.0] at >> > >> > >> org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.run(KafkaConsumer.java:214) >> > [camel-kafka-3.2.0.jar!/:3.2.0] at >> > java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) >> > [?:1.8.0_271] at java.util.concurrent.FutureTask.run(Unknown Source) >> > [?:1.8.0_271] at >> > java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown >> > Source) [?:1.8.0_271] at >> > java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) >> > [?:1.8.0_271] at java.lang.Thread.run(Unknown Source) [?:1.8.0_271] >> > Caused >> > by: java.lang.InterruptedException ... 26 more * >> > >> > *-RegardsSrikant Mantha* >> > >> >