Re: Kafka component error handling - consumer keeps leaving and rejoining the group
Hi Thanks for all your findings, this is great insight. You are surely welcome to create a JIRA ticket about this bug. We can then together work on a fix, and you can help test it. Yeah it seems catching KafkaException is maybe too wide. And lets see if we also can incorporate bridge error handler. On Mon, Apr 27, 2020 at 10:06 AM Joseph M'BIMBI-BENE wrote: > > I also realize that the property "bridgeErrorHandler" seems to never be > used, while another one, like "breakOnFirstError". > > Also, going back to the exception handling, at least a couple other > subclasses of KafkaException would deserve not to be retried on. Just a few > examples: > > >- ConfigException: "Thrown if the user supplies an invalid >configuration" -> a retry will not solve that >- OAuthBearerConfigException: "Exception thrown when there is a problem >with the configuration (an invalid option in a JAAS config, for example)" : >this one seems to fall under the same category >- and obviously the serializationException > > > > On Sun, 26 Apr 2020 at 21:55, Joseph M'BIMBI-BENE > wrote: > > > digging into the code (version 3.2.0 i repeat), > > > > i can see in the class > > `org.apache.camel.component.kafka.KafkaConsumer.KafkaFetchRecords.doRun`, > > lign 406: > > > > ``` > > catch (KafkaException e) { > > // some kind of error in kafka, it may happen during > > // unsubscribing or during normal processing > > if (unsubscribing) { > > getExceptionHandler().handleException("Error unsubscribing " + > > threadId + " from kafka topic " + topicName, e); > > } else { > > LOG.debug("KafkaException consuming {} from topic {} causedby {}. Will > > attempt to re-connect on next run", threadId, topicName, e.getMessage()); > > reConnect = true; > > } > > } > > ``` > > > > A `SerializationException` occurs, which extends KafkaException. > > It definitely is not normal processing. And logging with debug level hides > > the true cause. > > I guess one would have to narrow down the classes of exception to be > > caught in that catch clause, or as a quickfix, explicitly catch the > > serializationException. > > > > How to proceed ? > > I am not super familiar with Camel, and overall with open source > > contributions ^^. > > > > Do i just open a ticket in some bug tracker, github maybe ? > > Do you want me to open a pull request ? > > > > I am toying with camel for a couple of weeks now, i would like to > > introduce it in the projects i work with. But i am by no mean a camel guru, > > > > And this bug might be a showstopper, so i would like to help fix it > > > > Thank you > > > > On Sun, 26 Apr 2020 at 20:56, Joseph M'BIMBI-BENE > > wrote: > > > >> I forgot to tell i am using version 3.2.0 > >> > >> On Sun, 26 Apr 2020 at 20:45, Joseph M'BIMBI-BENE < > >> joseph.mbi...@gmail.com> wrote: > >> > >>> Hello everyone, > >>> > >>> I'm having a problem with the Kafka component: > >>> When the kafka consumer can't read a message (caused by some avro errors > >>> after investigation), it continuously leaves the group and joins again. > >>> > >>> I would like it to just throw an exception and let me decide how to > >>> handle it: dlq, ignore, etc. > >>> > >>> I configured the parameter `bridgeErrorHandler` but ot no avail. The > >>> behaviour is still the same > >>> > >>> Am i doing something wrong? Please help. Thank you > >>> > >>> -- > >>> > >>> Here is the route definition : > >>> > >>> @Component > >>> public class CamelConfiguration extends RouteBuilder { > >>> > >>> @Override > >>> public void configure() throws Exception { > >>> LocalDateTime now = LocalDateTime.now(); > >>> String kafkaCamelUri = String.format("kafka:cont_hist" + > >>> "?brokers={{bootstrap-servers}}" + > >>> "=http://localhost:8081; + > >>> "=true" + > >>> "=true" + > >>> "=%s" + > >>> "=%s", > >>> StringDeserializer.class.getName(), > >>> KafkaAvroDeserializer.class.getName()); > >>> from(kafkaCamelUri) > >>> .errorHandler(defaultErrorHandler().disableRedelivery()) > >>> .to("log:coucou") > >>> .to("sql-stored:classpath:procstoc.sql" + > >>> "?outputHeader=outError" > >>> ) > >>> .to("log:output") > >>> .log("coucou ${headers.outError}"); > >>> } > >>> > >>> } > >>> > >>> --- > >>> > >>> And here are some log excerpts : > >>> > >>> 2020-04-26 20:16:53.193 INFO 28096 --- [umer[cont_hist]] > >>> o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.3.1 > >>> 2020-04-26 20:16:53.193 INFO 28096 --- [umer[cont_hist]] > >>> o.a.kafka.common.utils.AppInfoParser : Kafka commitId: > >>> 18a913733fb71c01 > >>> 2020-04-26 20:16:53.193 INFO 28096 --- [umer[cont_hist]] > >>>
Re: Kafka component error handling - consumer keeps leaving and rejoining the group
digging into the code (version 3.2.0 i repeat), i can see in the class `org.apache.camel.component.kafka.KafkaConsumer.KafkaFetchRecords.doRun`, lign 406: ``` catch (KafkaException e) { // some kind of error in kafka, it may happen during // unsubscribing or during normal processing if (unsubscribing) { getExceptionHandler().handleException("Error unsubscribing " + threadId + " from kafka topic " + topicName, e); } else { LOG.debug("KafkaException consuming {} from topic {} causedby {}. Will attempt to re-connect on next run", threadId, topicName, e.getMessage()); reConnect = true; } } ``` A `SerializationException` occurs, which extends KafkaException. It definitely is not normal processing. And logging with debug level hides the true cause. I guess one would have to narrow down the classes of exception to be caught in that catch clause, or as a quickfix, explicitly catch the serializationException. How to proceed ? I am not super familiar with Camel, and overall with open source contributions ^^. Do i just open a ticket in some bug tracker, github maybe ? Do you want me to open a pull request ? I am toying with camel for a couple of weeks now, i would like to introduce it in the projects i work with. But i am by no mean a camel guru, And this bug might be a showstopper, so i would like to help fix it Thank you On Sun, 26 Apr 2020 at 20:56, Joseph M'BIMBI-BENE wrote: > I forgot to tell i am using version 3.2.0 > > On Sun, 26 Apr 2020 at 20:45, Joseph M'BIMBI-BENE > wrote: > >> Hello everyone, >> >> I'm having a problem with the Kafka component: >> When the kafka consumer can't read a message (caused by some avro errors >> after investigation), it continuously leaves the group and joins again. >> >> I would like it to just throw an exception and let me decide how to >> handle it: dlq, ignore, etc. >> >> I configured the parameter `bridgeErrorHandler` but ot no avail. The >> behaviour is still the same >> >> Am i doing something wrong? Please help. Thank you >> >> -- >> >> Here is the route definition : >> >> @Component >> public class CamelConfiguration extends RouteBuilder { >> >> @Override >> public void configure() throws Exception { >> LocalDateTime now = LocalDateTime.now(); >> String kafkaCamelUri = String.format("kafka:cont_hist" + >> "?brokers={{bootstrap-servers}}" + >> "=http://localhost:8081; + >> "=true" + >> "=true" + >> "=%s" + >> "=%s", >> StringDeserializer.class.getName(), >> KafkaAvroDeserializer.class.getName()); >> from(kafkaCamelUri) >> .errorHandler(defaultErrorHandler().disableRedelivery()) >> .to("log:coucou") >> .to("sql-stored:classpath:procstoc.sql" + >> "?outputHeader=outError" >> ) >> .to("log:output") >> .log("coucou ${headers.outError}"); >> } >> >> } >> >> --- >> >> And here are some log excerpts : >> >> 2020-04-26 20:16:53.193 INFO 28096 --- [umer[cont_hist]] >> o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.3.1 >> 2020-04-26 20:16:53.193 INFO 28096 --- [umer[cont_hist]] >> o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 18a913733fb71c01 >> 2020-04-26 20:16:53.193 INFO 28096 --- [umer[cont_hist]] >> o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1587925013193 >> 2020-04-26 20:16:53.194 INFO 28096 --- [umer[cont_hist]] >> o.a.camel.component.kafka.KafkaConsumer : Reconnecting cont_hist-Thread 0 >> to topic cont_hist after 5000 ms >> 2020-04-26 20:16:58.194 INFO 28096 --- [umer[cont_hist]] >> o.a.camel.component.kafka.KafkaConsumer : Subscribing cont_hist-Thread 0 >> to topic cont_hist >> 2020-04-26 20:16:58.194 INFO 28096 --- [umer[cont_hist]] >> o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-10, >> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Subscribed to topic(s): >> cont_hist >> 2020-04-26 20:16:58.208 INFO 28096 --- [umer[cont_hist]] >> org.apache.kafka.clients.Metadata: [Consumer clientId=consumer-10, >> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Cluster ID: >> yyC1KuR2Sv2BVVRNLdTnsg >> 2020-04-26 20:16:58.209 INFO 28096 --- [umer[cont_hist]] >> o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-10, >> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Discovered group coordinator >> localhost:9092 (id: 2147483646 rack: null) >> 2020-04-26 20:16:58.210 INFO 28096 --- [umer[cont_hist]] >> o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-10, >> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Revoking previously assigned >> partitions [] >> 2020-04-26 20:16:58.211 INFO 28096 --- [umer[cont_hist]] >> o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-10,
Re: Kafka component error handling - consumer keeps leaving and rejoining the group
I forgot to tell i am using version 3.2.0 On Sun, 26 Apr 2020 at 20:45, Joseph M'BIMBI-BENE wrote: > Hello everyone, > > I'm having a problem with the Kafka component: > When the kafka consumer can't read a message (caused by some avro errors > after investigation), it continuously leaves the group and joins again. > > I would like it to just throw an exception and let me decide how to handle > it: dlq, ignore, etc. > > I configured the parameter `bridgeErrorHandler` but ot no avail. The > behaviour is still the same > > Am i doing something wrong? Please help. Thank you > > -- > > Here is the route definition : > > @Component > public class CamelConfiguration extends RouteBuilder { > > @Override > public void configure() throws Exception { > LocalDateTime now = LocalDateTime.now(); > String kafkaCamelUri = String.format("kafka:cont_hist" + > "?brokers={{bootstrap-servers}}" + > "=http://localhost:8081; + > "=true" + > "=true" + > "=%s" + > "=%s", > StringDeserializer.class.getName(), > KafkaAvroDeserializer.class.getName()); > from(kafkaCamelUri) > .errorHandler(defaultErrorHandler().disableRedelivery()) > .to("log:coucou") > .to("sql-stored:classpath:procstoc.sql" + > "?outputHeader=outError" > ) > .to("log:output") > .log("coucou ${headers.outError}"); > } > > } > > --- > > And here are some log excerpts : > > 2020-04-26 20:16:53.193 INFO 28096 --- [umer[cont_hist]] > o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.3.1 > 2020-04-26 20:16:53.193 INFO 28096 --- [umer[cont_hist]] > o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 18a913733fb71c01 > 2020-04-26 20:16:53.193 INFO 28096 --- [umer[cont_hist]] > o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1587925013193 > 2020-04-26 20:16:53.194 INFO 28096 --- [umer[cont_hist]] > o.a.camel.component.kafka.KafkaConsumer : Reconnecting cont_hist-Thread 0 > to topic cont_hist after 5000 ms > 2020-04-26 20:16:58.194 INFO 28096 --- [umer[cont_hist]] > o.a.camel.component.kafka.KafkaConsumer : Subscribing cont_hist-Thread 0 > to topic cont_hist > 2020-04-26 20:16:58.194 INFO 28096 --- [umer[cont_hist]] > o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-10, > groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Subscribed to topic(s): > cont_hist > 2020-04-26 20:16:58.208 INFO 28096 --- [umer[cont_hist]] > org.apache.kafka.clients.Metadata: [Consumer clientId=consumer-10, > groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Cluster ID: > yyC1KuR2Sv2BVVRNLdTnsg > 2020-04-26 20:16:58.209 INFO 28096 --- [umer[cont_hist]] > o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-10, > groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Discovered group coordinator > localhost:9092 (id: 2147483646 rack: null) > 2020-04-26 20:16:58.210 INFO 28096 --- [umer[cont_hist]] > o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-10, > groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Revoking previously assigned > partitions [] > 2020-04-26 20:16:58.211 INFO 28096 --- [umer[cont_hist]] > o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-10, > groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] (Re-)joining group > 2020-04-26 20:16:58.221 INFO 28096 --- [umer[cont_hist]] > o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-10, > groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] (Re-)joining group > 2020-04-26 20:16:58.229 INFO 28096 --- [umer[cont_hist]] > o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-10, > groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Successfully joined group > with generation 19 > 2020-04-26 20:16:58.232 INFO 28096 --- [umer[cont_hist]] > o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-10, > groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Setting newly assigned > partitions: cont_hist-0 > 2020-04-26 20:16:58.236 INFO 28096 --- [umer[cont_hist]] > o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-10, > groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Setting offset for partition > cont_hist-0 to the committed offset FetchPosition{offset=4, > offsetEpoch=Optional.empty, > currentLeader=LeaderAndEpoch{leader=localhost:9092 (id: 1 rack: null), > epoch=0}} > 2020-04-26 20:16:58.251 INFO 28096 --- [umer[cont_hist]] > o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-10, > groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Member > consumer-10-187aed26-5575-4285-a567-3deca28e099c sending LeaveGroup request > to coordinator localhost:9092 (id: 2147483646 rack: null) > 2020-04-26 20:16:58.274 INFO 28096 ---
Re: Kafka component error handling - consumer keeps leaving and rejoining the group
I also realize that the property "bridgeErrorHandler" seems to never be used, while another one, like "breakOnFirstError". Also, going back to the exception handling, at least a couple other subclasses of KafkaException would deserve not to be retried on. Just a few examples: - ConfigException: "Thrown if the user supplies an invalid configuration" -> a retry will not solve that - OAuthBearerConfigException: "Exception thrown when there is a problem with the configuration (an invalid option in a JAAS config, for example)" : this one seems to fall under the same category - and obviously the serializationException On Sun, 26 Apr 2020 at 21:55, Joseph M'BIMBI-BENE wrote: > digging into the code (version 3.2.0 i repeat), > > i can see in the class > `org.apache.camel.component.kafka.KafkaConsumer.KafkaFetchRecords.doRun`, > lign 406: > > ``` > catch (KafkaException e) { > // some kind of error in kafka, it may happen during > // unsubscribing or during normal processing > if (unsubscribing) { > getExceptionHandler().handleException("Error unsubscribing " + > threadId + " from kafka topic " + topicName, e); > } else { > LOG.debug("KafkaException consuming {} from topic {} causedby {}. Will > attempt to re-connect on next run", threadId, topicName, e.getMessage()); > reConnect = true; > } > } > ``` > > A `SerializationException` occurs, which extends KafkaException. > It definitely is not normal processing. And logging with debug level hides > the true cause. > I guess one would have to narrow down the classes of exception to be > caught in that catch clause, or as a quickfix, explicitly catch the > serializationException. > > How to proceed ? > I am not super familiar with Camel, and overall with open source > contributions ^^. > > Do i just open a ticket in some bug tracker, github maybe ? > Do you want me to open a pull request ? > > I am toying with camel for a couple of weeks now, i would like to > introduce it in the projects i work with. But i am by no mean a camel guru, > > And this bug might be a showstopper, so i would like to help fix it > > Thank you > > On Sun, 26 Apr 2020 at 20:56, Joseph M'BIMBI-BENE > wrote: > >> I forgot to tell i am using version 3.2.0 >> >> On Sun, 26 Apr 2020 at 20:45, Joseph M'BIMBI-BENE < >> joseph.mbi...@gmail.com> wrote: >> >>> Hello everyone, >>> >>> I'm having a problem with the Kafka component: >>> When the kafka consumer can't read a message (caused by some avro errors >>> after investigation), it continuously leaves the group and joins again. >>> >>> I would like it to just throw an exception and let me decide how to >>> handle it: dlq, ignore, etc. >>> >>> I configured the parameter `bridgeErrorHandler` but ot no avail. The >>> behaviour is still the same >>> >>> Am i doing something wrong? Please help. Thank you >>> >>> -- >>> >>> Here is the route definition : >>> >>> @Component >>> public class CamelConfiguration extends RouteBuilder { >>> >>> @Override >>> public void configure() throws Exception { >>> LocalDateTime now = LocalDateTime.now(); >>> String kafkaCamelUri = String.format("kafka:cont_hist" + >>> "?brokers={{bootstrap-servers}}" + >>> "=http://localhost:8081; + >>> "=true" + >>> "=true" + >>> "=%s" + >>> "=%s", >>> StringDeserializer.class.getName(), >>> KafkaAvroDeserializer.class.getName()); >>> from(kafkaCamelUri) >>> .errorHandler(defaultErrorHandler().disableRedelivery()) >>> .to("log:coucou") >>> .to("sql-stored:classpath:procstoc.sql" + >>> "?outputHeader=outError" >>> ) >>> .to("log:output") >>> .log("coucou ${headers.outError}"); >>> } >>> >>> } >>> >>> --- >>> >>> And here are some log excerpts : >>> >>> 2020-04-26 20:16:53.193 INFO 28096 --- [umer[cont_hist]] >>> o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.3.1 >>> 2020-04-26 20:16:53.193 INFO 28096 --- [umer[cont_hist]] >>> o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 18a913733fb71c01 >>> 2020-04-26 20:16:53.193 INFO 28096 --- [umer[cont_hist]] >>> o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1587925013193 >>> 2020-04-26 20:16:53.194 INFO 28096 --- [umer[cont_hist]] >>> o.a.camel.component.kafka.KafkaConsumer : Reconnecting cont_hist-Thread 0 >>> to topic cont_hist after 5000 ms >>> 2020-04-26 20:16:58.194 INFO 28096 --- [umer[cont_hist]] >>> o.a.camel.component.kafka.KafkaConsumer : Subscribing cont_hist-Thread 0 >>> to topic cont_hist >>> 2020-04-26 20:16:58.194 INFO 28096 --- [umer[cont_hist]] >>> o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-10, >>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Subscribed to