I also realize that the property "bridgeErrorHandler" seems to never be
used, while another one, like "breakOnFirstError".

Also, going back to the exception handling, at least a couple other
subclasses of KafkaException would deserve not to be retried on. Just a few
examples:


   - ConfigException: "Thrown if the user supplies an invalid
   configuration" -> a retry will not solve that
   - OAuthBearerConfigException: "Exception thrown when there is a problem
   with the configuration (an invalid option in a JAAS config, for example)" :
   this one seems to fall under the same category
   - and obviously the serializationException



On Sun, 26 Apr 2020 at 21:55, Joseph M'BIMBI-BENE <joseph.mbi...@gmail.com>
wrote:

> digging into the code (version 3.2.0 i repeat),
>
> i can see in the class
> `org.apache.camel.component.kafka.KafkaConsumer.KafkaFetchRecords.doRun`,
> lign 406:
>
> ```
> catch (KafkaException e) {
>   // some kind of error in kafka, it may happen during
>   // unsubscribing or during normal processing
>   if (unsubscribing) {
>     getExceptionHandler().handleException("Error unsubscribing " +
> threadId + " from kafka topic " + topicName, e);
>   } else {
>     LOG.debug("KafkaException consuming {} from topic {} causedby {}. Will
> attempt to re-connect on next run", threadId, topicName, e.getMessage());
>     reConnect = true;
>   }
> }
> ```
>
> A `SerializationException` occurs, which extends KafkaException.
> It definitely is not normal processing. And logging with debug level hides
> the true cause.
> I guess one would have to narrow down the classes of exception to be
> caught in that catch clause, or as a quickfix, explicitly catch the
> serializationException.
>
> How to proceed ?
> I am not super familiar with Camel, and overall with open source
> contributions ^^.
>
> Do i just open a ticket in some bug tracker, github maybe ?
> Do you want me to open a pull request ?
>
> I am toying with camel for a couple of weeks now, i would like to
> introduce it in the projects i work with. But i am by no mean a camel guru,
>
> And this bug might be a showstopper, so i would like to help fix it
>
> Thank you
>
> On Sun, 26 Apr 2020 at 20:56, Joseph M'BIMBI-BENE <joseph.mbi...@gmail.com>
> wrote:
>
>> I  forgot to tell i am using version 3.2.0
>>
>> On Sun, 26 Apr 2020 at 20:45, Joseph M'BIMBI-BENE <
>> joseph.mbi...@gmail.com> wrote:
>>
>>> Hello everyone,
>>>
>>> I'm having a problem with the Kafka component:
>>> When the kafka consumer can't read a message (caused by some avro errors
>>> after investigation), it continuously leaves the group and joins again.
>>>
>>> I would like it to just throw an exception and let me decide how to
>>> handle it: dlq, ignore, etc.
>>>
>>> I configured the parameter  `bridgeErrorHandler` but ot no avail. The
>>> behaviour is still the same
>>>
>>> Am i doing something wrong? Please help. Thank you
>>>
>>> ----------
>>>
>>> Here is the route definition :
>>>
>>> @Component
>>> public class CamelConfiguration extends RouteBuilder {
>>>
>>>     @Override
>>>     public void configure() throws Exception {
>>>         LocalDateTime now = LocalDateTime.now();
>>>         String kafkaCamelUri = String.format("kafka:cont_hist" +
>>>                         "?brokers={{bootstrap-servers}}" +
>>>                         "&schemaRegistryURL=http://localhost:8081"; +
>>>                         "&specificAvroReader=true" +
>>>                         "&bridgeErrorHandler=true" +
>>>                         "&keyDeserializer=%s" +
>>>                         "&valueDeserializer=%s",
>>>                 StringDeserializer.class.getName(),
>>>                 KafkaAvroDeserializer.class.getName());
>>>         from(kafkaCamelUri)
>>>                 .errorHandler(defaultErrorHandler().disableRedelivery())
>>>                 .to("log:coucou")
>>>                 .to("sql-stored:classpath:procstoc.sql" +
>>>                         "?outputHeader=outError"
>>>                 )
>>>                 .to("log:output")
>>>                 .log("coucou ${headers.outError}");
>>>     }
>>>
>>> }
>>>
>>> -----------
>>>
>>> And here are some log excerpts :
>>>
>>> 2020-04-26 20:16:53.193  INFO 28096 --- [umer[cont_hist]]
>>> o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.3.1
>>> 2020-04-26 20:16:53.193  INFO 28096 --- [umer[cont_hist]]
>>> o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 18a913733fb71c01
>>> 2020-04-26 20:16:53.193  INFO 28096 --- [umer[cont_hist]]
>>> o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1587925013193
>>> 2020-04-26 20:16:53.194  INFO 28096 --- [umer[cont_hist]]
>>> o.a.camel.component.kafka.KafkaConsumer  : Reconnecting cont_hist-Thread 0
>>> to topic cont_hist after 5000 ms
>>> 2020-04-26 20:16:58.194  INFO 28096 --- [umer[cont_hist]]
>>> o.a.camel.component.kafka.KafkaConsumer  : Subscribing cont_hist-Thread 0
>>> to topic cont_hist
>>> 2020-04-26 20:16:58.194  INFO 28096 --- [umer[cont_hist]]
>>> o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-10,
>>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Subscribed to topic(s):
>>> cont_hist
>>> 2020-04-26 20:16:58.208  INFO 28096 --- [umer[cont_hist]]
>>> org.apache.kafka.clients.Metadata        : [Consumer clientId=consumer-10,
>>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Cluster ID:
>>> yyC1KuR2Sv2BVVRNLdTnsg
>>> 2020-04-26 20:16:58.209  INFO 28096 --- [umer[cont_hist]]
>>> o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-10,
>>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Discovered group coordinator
>>> localhost:9092 (id: 2147483646 rack: null)
>>> 2020-04-26 20:16:58.210  INFO 28096 --- [umer[cont_hist]]
>>> o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-10,
>>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Revoking previously assigned
>>> partitions []
>>> 2020-04-26 20:16:58.211  INFO 28096 --- [umer[cont_hist]]
>>> o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-10,
>>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] (Re-)joining group
>>> 2020-04-26 20:16:58.221  INFO 28096 --- [umer[cont_hist]]
>>> o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-10,
>>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] (Re-)joining group
>>> 2020-04-26 20:16:58.229  INFO 28096 --- [umer[cont_hist]]
>>> o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-10,
>>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Successfully joined group
>>> with generation 19
>>> 2020-04-26 20:16:58.232  INFO 28096 --- [umer[cont_hist]]
>>> o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-10,
>>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Setting newly assigned
>>> partitions: cont_hist-0
>>> 2020-04-26 20:16:58.236  INFO 28096 --- [umer[cont_hist]]
>>> o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-10,
>>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Setting offset for partition
>>> cont_hist-0 to the committed offset FetchPosition{offset=4,
>>> offsetEpoch=Optional.empty,
>>> currentLeader=LeaderAndEpoch{leader=localhost:9092 (id: 1 rack: null),
>>> epoch=0}}
>>> 2020-04-26 20:16:58.251  INFO 28096 --- [umer[cont_hist]]
>>> o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-10,
>>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Member
>>> consumer-10-187aed26-5575-4285-a567-3deca28e099c sending LeaveGroup request
>>> to coordinator localhost:9092 (id: 2147483646 rack: null)
>>> 2020-04-26 20:16:58.274  INFO 28096 --- [umer[cont_hist]]
>>> o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values:
>>> allow.auto.create.topics = true
>>> auto.commit.interval.ms = 5000
>>> auto.offset.reset = latest
>>> bootstrap.servers = [http://localhost:9092]
>>>
>>> [...]
>>>
>>> 2020-04-26 20:16:58.293  WARN 28096 --- [umer[cont_hist]]
>>> o.a.k.clients.consumer.ConsumerConfig    : The configuration
>>> 'sasl.kerberos.principal.to.local.rules' was supplied but isn't a known
>>> config.
>>> 2020-04-26 20:16:58.294  INFO 28096 --- [umer[cont_hist]]
>>> o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.3.1
>>> 2020-04-26 20:16:58.294  INFO 28096 --- [umer[cont_hist]]
>>> o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 18a913733fb71c01
>>> 2020-04-26 20:16:58.294  INFO 28096 --- [umer[cont_hist]]
>>> o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1587925018294
>>> 2020-04-26 20:16:58.294  INFO 28096 --- [umer[cont_hist]]
>>> o.a.camel.component.kafka.KafkaConsumer  : Reconnecting cont_hist-Thread 0
>>> to topic cont_hist after 5000 ms
>>> 2020-04-26 20:17:03.295  INFO 28096 --- [umer[cont_hist]]
>>> o.a.camel.component.kafka.KafkaConsumer  : Subscribing cont_hist-Thread 0
>>> to topic cont_hist
>>> 2020-04-26 20:17:03.295  INFO 28096 --- [umer[cont_hist]]
>>> o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-11,
>>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Subscribed to topic(s):
>>> cont_hist
>>> 2020-04-26 20:17:03.305  INFO 28096 --- [umer[cont_hist]]
>>> org.apache.kafka.clients.Metadata        : [Consumer clientId=consumer-11,
>>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Cluster ID:
>>> yyC1KuR2Sv2BVVRNLdTnsg
>>> 2020-04-26 20:17:03.305  INFO 28096 --- [umer[cont_hist]]
>>> o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-11,
>>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Discovered group coordinator
>>> localhost:9092 (id: 2147483646 rack: null)
>>> 2020-04-26 20:17:03.306  INFO 28096 --- [umer[cont_hist]]
>>> o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-11,
>>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Revoking previously assigned
>>> partitions []
>>> 2020-04-26 20:17:03.306  INFO 28096 --- [umer[cont_hist]]
>>> o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-11,
>>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] (Re-)joining group
>>> 2020-04-26 20:17:03.312  INFO 28096 --- [umer[cont_hist]]
>>> o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-11,
>>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] (Re-)joining group
>>> 2020-04-26 20:17:03.319  INFO 28096 --- [umer[cont_hist]]
>>> o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-11,
>>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Successfully joined group
>>> with generation 21
>>> 2020-04-26 20:17:03.320  INFO 28096 --- [umer[cont_hist]]
>>> o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-11,
>>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Setting newly assigned
>>> partitions: cont_hist-0
>>> 2020-04-26 20:17:03.324  INFO 28096 --- [umer[cont_hist]]
>>> o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-11,
>>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Setting offset for partition
>>> cont_hist-0 to the committed offset FetchPosition{offset=4,
>>> offsetEpoch=Optional.empty,
>>> currentLeader=LeaderAndEpoch{leader=localhost:9092 (id: 1 rack: null),
>>> epoch=0}}
>>> 2020-04-26 20:17:03.347  INFO 28096 --- [umer[cont_hist]]
>>> o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-11,
>>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Member
>>> consumer-11-936dbb66-cbce-4d6f-a740-b3764822db42 sending LeaveGroup request
>>> to coordinator localhost:9092 (id: 2147483646 rack: null)
>>> 2020-04-26 20:17:03.400  INFO 28096 --- [umer[cont_hist]]
>>> o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values:
>>> allow.auto.create.topics = true
>>>
>>

Reply via email to