digging into the code (version 3.2.0 i repeat), i can see in the class `org.apache.camel.component.kafka.KafkaConsumer.KafkaFetchRecords.doRun`, lign 406:
``` catch (KafkaException e) { // some kind of error in kafka, it may happen during // unsubscribing or during normal processing if (unsubscribing) { getExceptionHandler().handleException("Error unsubscribing " + threadId + " from kafka topic " + topicName, e); } else { LOG.debug("KafkaException consuming {} from topic {} causedby {}. Will attempt to re-connect on next run", threadId, topicName, e.getMessage()); reConnect = true; } } ``` A `SerializationException` occurs, which extends KafkaException. It definitely is not normal processing. And logging with debug level hides the true cause. I guess one would have to narrow down the classes of exception to be caught in that catch clause, or as a quickfix, explicitly catch the serializationException. How to proceed ? I am not super familiar with Camel, and overall with open source contributions ^^. Do i just open a ticket in some bug tracker, github maybe ? Do you want me to open a pull request ? I am toying with camel for a couple of weeks now, i would like to introduce it in the projects i work with. But i am by no mean a camel guru, And this bug might be a showstopper, so i would like to help fix it Thank you On Sun, 26 Apr 2020 at 20:56, Joseph M'BIMBI-BENE <joseph.mbi...@gmail.com> wrote: > I forgot to tell i am using version 3.2.0 > > On Sun, 26 Apr 2020 at 20:45, Joseph M'BIMBI-BENE <joseph.mbi...@gmail.com> > wrote: > >> Hello everyone, >> >> I'm having a problem with the Kafka component: >> When the kafka consumer can't read a message (caused by some avro errors >> after investigation), it continuously leaves the group and joins again. >> >> I would like it to just throw an exception and let me decide how to >> handle it: dlq, ignore, etc. >> >> I configured the parameter `bridgeErrorHandler` but ot no avail. The >> behaviour is still the same >> >> Am i doing something wrong? Please help. Thank you >> >> ---------- >> >> Here is the route definition : >> >> @Component >> public class CamelConfiguration extends RouteBuilder { >> >> @Override >> public void configure() throws Exception { >> LocalDateTime now = LocalDateTime.now(); >> String kafkaCamelUri = String.format("kafka:cont_hist" + >> "?brokers={{bootstrap-servers}}" + >> "&schemaRegistryURL=http://localhost:8081" + >> "&specificAvroReader=true" + >> "&bridgeErrorHandler=true" + >> "&keyDeserializer=%s" + >> "&valueDeserializer=%s", >> StringDeserializer.class.getName(), >> KafkaAvroDeserializer.class.getName()); >> from(kafkaCamelUri) >> .errorHandler(defaultErrorHandler().disableRedelivery()) >> .to("log:coucou") >> .to("sql-stored:classpath:procstoc.sql" + >> "?outputHeader=outError" >> ) >> .to("log:output") >> .log("coucou ${headers.outError}"); >> } >> >> } >> >> ----------- >> >> And here are some log excerpts : >> >> 2020-04-26 20:16:53.193 INFO 28096 --- [umer[cont_hist]] >> o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.3.1 >> 2020-04-26 20:16:53.193 INFO 28096 --- [umer[cont_hist]] >> o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 18a913733fb71c01 >> 2020-04-26 20:16:53.193 INFO 28096 --- [umer[cont_hist]] >> o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1587925013193 >> 2020-04-26 20:16:53.194 INFO 28096 --- [umer[cont_hist]] >> o.a.camel.component.kafka.KafkaConsumer : Reconnecting cont_hist-Thread 0 >> to topic cont_hist after 5000 ms >> 2020-04-26 20:16:58.194 INFO 28096 --- [umer[cont_hist]] >> o.a.camel.component.kafka.KafkaConsumer : Subscribing cont_hist-Thread 0 >> to topic cont_hist >> 2020-04-26 20:16:58.194 INFO 28096 --- [umer[cont_hist]] >> o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-10, >> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Subscribed to topic(s): >> cont_hist >> 2020-04-26 20:16:58.208 INFO 28096 --- [umer[cont_hist]] >> org.apache.kafka.clients.Metadata : [Consumer clientId=consumer-10, >> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Cluster ID: >> yyC1KuR2Sv2BVVRNLdTnsg >> 2020-04-26 20:16:58.209 INFO 28096 --- [umer[cont_hist]] >> o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-10, >> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Discovered group coordinator >> localhost:9092 (id: 2147483646 rack: null) >> 2020-04-26 20:16:58.210 INFO 28096 --- [umer[cont_hist]] >> o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-10, >> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Revoking previously assigned >> partitions [] >> 2020-04-26 20:16:58.211 INFO 28096 --- [umer[cont_hist]] >> o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-10, >> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] (Re-)joining group >> 2020-04-26 20:16:58.221 INFO 28096 --- [umer[cont_hist]] >> o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-10, >> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] (Re-)joining group >> 2020-04-26 20:16:58.229 INFO 28096 --- [umer[cont_hist]] >> o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-10, >> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Successfully joined group >> with generation 19 >> 2020-04-26 20:16:58.232 INFO 28096 --- [umer[cont_hist]] >> o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-10, >> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Setting newly assigned >> partitions: cont_hist-0 >> 2020-04-26 20:16:58.236 INFO 28096 --- [umer[cont_hist]] >> o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-10, >> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Setting offset for partition >> cont_hist-0 to the committed offset FetchPosition{offset=4, >> offsetEpoch=Optional.empty, >> currentLeader=LeaderAndEpoch{leader=localhost:9092 (id: 1 rack: null), >> epoch=0}} >> 2020-04-26 20:16:58.251 INFO 28096 --- [umer[cont_hist]] >> o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-10, >> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Member >> consumer-10-187aed26-5575-4285-a567-3deca28e099c sending LeaveGroup request >> to coordinator localhost:9092 (id: 2147483646 rack: null) >> 2020-04-26 20:16:58.274 INFO 28096 --- [umer[cont_hist]] >> o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values: >> allow.auto.create.topics = true >> auto.commit.interval.ms = 5000 >> auto.offset.reset = latest >> bootstrap.servers = [http://localhost:9092] >> >> [...] >> >> 2020-04-26 20:16:58.293 WARN 28096 --- [umer[cont_hist]] >> o.a.k.clients.consumer.ConsumerConfig : The configuration >> 'sasl.kerberos.principal.to.local.rules' was supplied but isn't a known >> config. >> 2020-04-26 20:16:58.294 INFO 28096 --- [umer[cont_hist]] >> o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.3.1 >> 2020-04-26 20:16:58.294 INFO 28096 --- [umer[cont_hist]] >> o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 18a913733fb71c01 >> 2020-04-26 20:16:58.294 INFO 28096 --- [umer[cont_hist]] >> o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1587925018294 >> 2020-04-26 20:16:58.294 INFO 28096 --- [umer[cont_hist]] >> o.a.camel.component.kafka.KafkaConsumer : Reconnecting cont_hist-Thread 0 >> to topic cont_hist after 5000 ms >> 2020-04-26 20:17:03.295 INFO 28096 --- [umer[cont_hist]] >> o.a.camel.component.kafka.KafkaConsumer : Subscribing cont_hist-Thread 0 >> to topic cont_hist >> 2020-04-26 20:17:03.295 INFO 28096 --- [umer[cont_hist]] >> o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-11, >> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Subscribed to topic(s): >> cont_hist >> 2020-04-26 20:17:03.305 INFO 28096 --- [umer[cont_hist]] >> org.apache.kafka.clients.Metadata : [Consumer clientId=consumer-11, >> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Cluster ID: >> yyC1KuR2Sv2BVVRNLdTnsg >> 2020-04-26 20:17:03.305 INFO 28096 --- [umer[cont_hist]] >> o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-11, >> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Discovered group coordinator >> localhost:9092 (id: 2147483646 rack: null) >> 2020-04-26 20:17:03.306 INFO 28096 --- [umer[cont_hist]] >> o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-11, >> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Revoking previously assigned >> partitions [] >> 2020-04-26 20:17:03.306 INFO 28096 --- [umer[cont_hist]] >> o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-11, >> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] (Re-)joining group >> 2020-04-26 20:17:03.312 INFO 28096 --- [umer[cont_hist]] >> o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-11, >> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] (Re-)joining group >> 2020-04-26 20:17:03.319 INFO 28096 --- [umer[cont_hist]] >> o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-11, >> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Successfully joined group >> with generation 21 >> 2020-04-26 20:17:03.320 INFO 28096 --- [umer[cont_hist]] >> o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-11, >> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Setting newly assigned >> partitions: cont_hist-0 >> 2020-04-26 20:17:03.324 INFO 28096 --- [umer[cont_hist]] >> o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-11, >> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Setting offset for partition >> cont_hist-0 to the committed offset FetchPosition{offset=4, >> offsetEpoch=Optional.empty, >> currentLeader=LeaderAndEpoch{leader=localhost:9092 (id: 1 rack: null), >> epoch=0}} >> 2020-04-26 20:17:03.347 INFO 28096 --- [umer[cont_hist]] >> o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-11, >> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Member >> consumer-11-936dbb66-cbce-4d6f-a740-b3764822db42 sending LeaveGroup request >> to coordinator localhost:9092 (id: 2147483646 rack: null) >> 2020-04-26 20:17:03.400 INFO 28096 --- [umer[cont_hist]] >> o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values: >> allow.auto.create.topics = true >> >