I also realize that the property "bridgeErrorHandler" seems to never be used, while another one, like "breakOnFirstError".
Also, going back to the exception handling, at least a couple other subclasses of KafkaException would deserve not to be retried on. Just a few examples: - ConfigException: "Thrown if the user supplies an invalid configuration" -> a retry will not solve that - OAuthBearerConfigException: "Exception thrown when there is a problem with the configuration (an invalid option in a JAAS config, for example)" : this one seems to fall under the same category - and obviously the serializationException On Sun, 26 Apr 2020 at 21:55, Joseph M'BIMBI-BENE <joseph.mbi...@gmail.com> wrote: > digging into the code (version 3.2.0 i repeat), > > i can see in the class > `org.apache.camel.component.kafka.KafkaConsumer.KafkaFetchRecords.doRun`, > lign 406: > > ``` > catch (KafkaException e) { > // some kind of error in kafka, it may happen during > // unsubscribing or during normal processing > if (unsubscribing) { > getExceptionHandler().handleException("Error unsubscribing " + > threadId + " from kafka topic " + topicName, e); > } else { > LOG.debug("KafkaException consuming {} from topic {} causedby {}. Will > attempt to re-connect on next run", threadId, topicName, e.getMessage()); > reConnect = true; > } > } > ``` > > A `SerializationException` occurs, which extends KafkaException. > It definitely is not normal processing. And logging with debug level hides > the true cause. > I guess one would have to narrow down the classes of exception to be > caught in that catch clause, or as a quickfix, explicitly catch the > serializationException. > > How to proceed ? > I am not super familiar with Camel, and overall with open source > contributions ^^. > > Do i just open a ticket in some bug tracker, github maybe ? > Do you want me to open a pull request ? > > I am toying with camel for a couple of weeks now, i would like to > introduce it in the projects i work with. But i am by no mean a camel guru, > > And this bug might be a showstopper, so i would like to help fix it > > Thank you > > On Sun, 26 Apr 2020 at 20:56, Joseph M'BIMBI-BENE <joseph.mbi...@gmail.com> > wrote: > >> I forgot to tell i am using version 3.2.0 >> >> On Sun, 26 Apr 2020 at 20:45, Joseph M'BIMBI-BENE < >> joseph.mbi...@gmail.com> wrote: >> >>> Hello everyone, >>> >>> I'm having a problem with the Kafka component: >>> When the kafka consumer can't read a message (caused by some avro errors >>> after investigation), it continuously leaves the group and joins again. >>> >>> I would like it to just throw an exception and let me decide how to >>> handle it: dlq, ignore, etc. >>> >>> I configured the parameter `bridgeErrorHandler` but ot no avail. The >>> behaviour is still the same >>> >>> Am i doing something wrong? Please help. Thank you >>> >>> ---------- >>> >>> Here is the route definition : >>> >>> @Component >>> public class CamelConfiguration extends RouteBuilder { >>> >>> @Override >>> public void configure() throws Exception { >>> LocalDateTime now = LocalDateTime.now(); >>> String kafkaCamelUri = String.format("kafka:cont_hist" + >>> "?brokers={{bootstrap-servers}}" + >>> "&schemaRegistryURL=http://localhost:8081" + >>> "&specificAvroReader=true" + >>> "&bridgeErrorHandler=true" + >>> "&keyDeserializer=%s" + >>> "&valueDeserializer=%s", >>> StringDeserializer.class.getName(), >>> KafkaAvroDeserializer.class.getName()); >>> from(kafkaCamelUri) >>> .errorHandler(defaultErrorHandler().disableRedelivery()) >>> .to("log:coucou") >>> .to("sql-stored:classpath:procstoc.sql" + >>> "?outputHeader=outError" >>> ) >>> .to("log:output") >>> .log("coucou ${headers.outError}"); >>> } >>> >>> } >>> >>> ----------- >>> >>> And here are some log excerpts : >>> >>> 2020-04-26 20:16:53.193 INFO 28096 --- [umer[cont_hist]] >>> o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.3.1 >>> 2020-04-26 20:16:53.193 INFO 28096 --- [umer[cont_hist]] >>> o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 18a913733fb71c01 >>> 2020-04-26 20:16:53.193 INFO 28096 --- [umer[cont_hist]] >>> o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1587925013193 >>> 2020-04-26 20:16:53.194 INFO 28096 --- [umer[cont_hist]] >>> o.a.camel.component.kafka.KafkaConsumer : Reconnecting cont_hist-Thread 0 >>> to topic cont_hist after 5000 ms >>> 2020-04-26 20:16:58.194 INFO 28096 --- [umer[cont_hist]] >>> o.a.camel.component.kafka.KafkaConsumer : Subscribing cont_hist-Thread 0 >>> to topic cont_hist >>> 2020-04-26 20:16:58.194 INFO 28096 --- [umer[cont_hist]] >>> o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-10, >>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Subscribed to topic(s): >>> cont_hist >>> 2020-04-26 20:16:58.208 INFO 28096 --- [umer[cont_hist]] >>> org.apache.kafka.clients.Metadata : [Consumer clientId=consumer-10, >>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Cluster ID: >>> yyC1KuR2Sv2BVVRNLdTnsg >>> 2020-04-26 20:16:58.209 INFO 28096 --- [umer[cont_hist]] >>> o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-10, >>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Discovered group coordinator >>> localhost:9092 (id: 2147483646 rack: null) >>> 2020-04-26 20:16:58.210 INFO 28096 --- [umer[cont_hist]] >>> o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-10, >>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Revoking previously assigned >>> partitions [] >>> 2020-04-26 20:16:58.211 INFO 28096 --- [umer[cont_hist]] >>> o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-10, >>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] (Re-)joining group >>> 2020-04-26 20:16:58.221 INFO 28096 --- [umer[cont_hist]] >>> o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-10, >>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] (Re-)joining group >>> 2020-04-26 20:16:58.229 INFO 28096 --- [umer[cont_hist]] >>> o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-10, >>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Successfully joined group >>> with generation 19 >>> 2020-04-26 20:16:58.232 INFO 28096 --- [umer[cont_hist]] >>> o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-10, >>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Setting newly assigned >>> partitions: cont_hist-0 >>> 2020-04-26 20:16:58.236 INFO 28096 --- [umer[cont_hist]] >>> o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-10, >>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Setting offset for partition >>> cont_hist-0 to the committed offset FetchPosition{offset=4, >>> offsetEpoch=Optional.empty, >>> currentLeader=LeaderAndEpoch{leader=localhost:9092 (id: 1 rack: null), >>> epoch=0}} >>> 2020-04-26 20:16:58.251 INFO 28096 --- [umer[cont_hist]] >>> o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-10, >>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Member >>> consumer-10-187aed26-5575-4285-a567-3deca28e099c sending LeaveGroup request >>> to coordinator localhost:9092 (id: 2147483646 rack: null) >>> 2020-04-26 20:16:58.274 INFO 28096 --- [umer[cont_hist]] >>> o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values: >>> allow.auto.create.topics = true >>> auto.commit.interval.ms = 5000 >>> auto.offset.reset = latest >>> bootstrap.servers = [http://localhost:9092] >>> >>> [...] >>> >>> 2020-04-26 20:16:58.293 WARN 28096 --- [umer[cont_hist]] >>> o.a.k.clients.consumer.ConsumerConfig : The configuration >>> 'sasl.kerberos.principal.to.local.rules' was supplied but isn't a known >>> config. >>> 2020-04-26 20:16:58.294 INFO 28096 --- [umer[cont_hist]] >>> o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.3.1 >>> 2020-04-26 20:16:58.294 INFO 28096 --- [umer[cont_hist]] >>> o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 18a913733fb71c01 >>> 2020-04-26 20:16:58.294 INFO 28096 --- [umer[cont_hist]] >>> o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1587925018294 >>> 2020-04-26 20:16:58.294 INFO 28096 --- [umer[cont_hist]] >>> o.a.camel.component.kafka.KafkaConsumer : Reconnecting cont_hist-Thread 0 >>> to topic cont_hist after 5000 ms >>> 2020-04-26 20:17:03.295 INFO 28096 --- [umer[cont_hist]] >>> o.a.camel.component.kafka.KafkaConsumer : Subscribing cont_hist-Thread 0 >>> to topic cont_hist >>> 2020-04-26 20:17:03.295 INFO 28096 --- [umer[cont_hist]] >>> o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-11, >>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Subscribed to topic(s): >>> cont_hist >>> 2020-04-26 20:17:03.305 INFO 28096 --- [umer[cont_hist]] >>> org.apache.kafka.clients.Metadata : [Consumer clientId=consumer-11, >>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Cluster ID: >>> yyC1KuR2Sv2BVVRNLdTnsg >>> 2020-04-26 20:17:03.305 INFO 28096 --- [umer[cont_hist]] >>> o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-11, >>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Discovered group coordinator >>> localhost:9092 (id: 2147483646 rack: null) >>> 2020-04-26 20:17:03.306 INFO 28096 --- [umer[cont_hist]] >>> o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-11, >>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Revoking previously assigned >>> partitions [] >>> 2020-04-26 20:17:03.306 INFO 28096 --- [umer[cont_hist]] >>> o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-11, >>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] (Re-)joining group >>> 2020-04-26 20:17:03.312 INFO 28096 --- [umer[cont_hist]] >>> o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-11, >>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] (Re-)joining group >>> 2020-04-26 20:17:03.319 INFO 28096 --- [umer[cont_hist]] >>> o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-11, >>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Successfully joined group >>> with generation 21 >>> 2020-04-26 20:17:03.320 INFO 28096 --- [umer[cont_hist]] >>> o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-11, >>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Setting newly assigned >>> partitions: cont_hist-0 >>> 2020-04-26 20:17:03.324 INFO 28096 --- [umer[cont_hist]] >>> o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-11, >>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Setting offset for partition >>> cont_hist-0 to the committed offset FetchPosition{offset=4, >>> offsetEpoch=Optional.empty, >>> currentLeader=LeaderAndEpoch{leader=localhost:9092 (id: 1 rack: null), >>> epoch=0}} >>> 2020-04-26 20:17:03.347 INFO 28096 --- [umer[cont_hist]] >>> o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-11, >>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Member >>> consumer-11-936dbb66-cbce-4d6f-a740-b3764822db42 sending LeaveGroup request >>> to coordinator localhost:9092 (id: 2147483646 rack: null) >>> 2020-04-26 20:17:03.400 INFO 28096 --- [umer[cont_hist]] >>> o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values: >>> allow.auto.create.topics = true >>> >>