digging into the code (version 3.2.0 i repeat),

i can see in the class
`org.apache.camel.component.kafka.KafkaConsumer.KafkaFetchRecords.doRun`,
lign 406:

```
catch (KafkaException e) {
  // some kind of error in kafka, it may happen during
  // unsubscribing or during normal processing
  if (unsubscribing) {
    getExceptionHandler().handleException("Error unsubscribing " + threadId
+ " from kafka topic " + topicName, e);
  } else {
    LOG.debug("KafkaException consuming {} from topic {} causedby {}. Will
attempt to re-connect on next run", threadId, topicName, e.getMessage());
    reConnect = true;
  }
}
```

A `SerializationException` occurs, which extends KafkaException.
It definitely is not normal processing. And logging with debug level hides
the true cause.
I guess one would have to narrow down the classes of exception to be caught
in that catch clause, or as a quickfix, explicitly catch the
serializationException.

How to proceed ?
I am not super familiar with Camel, and overall with open source
contributions ^^.

Do i just open a ticket in some bug tracker, github maybe ?
Do you want me to open a pull request ?

I am toying with camel for a couple of weeks now, i would like to introduce
it in the projects i work with. But i am by no mean a camel guru,

And this bug might be a showstopper, so i would like to help fix it

Thank you

On Sun, 26 Apr 2020 at 20:56, Joseph M'BIMBI-BENE <joseph.mbi...@gmail.com>
wrote:

> I  forgot to tell i am using version 3.2.0
>
> On Sun, 26 Apr 2020 at 20:45, Joseph M'BIMBI-BENE <joseph.mbi...@gmail.com>
> wrote:
>
>> Hello everyone,
>>
>> I'm having a problem with the Kafka component:
>> When the kafka consumer can't read a message (caused by some avro errors
>> after investigation), it continuously leaves the group and joins again.
>>
>> I would like it to just throw an exception and let me decide how to
>> handle it: dlq, ignore, etc.
>>
>> I configured the parameter  `bridgeErrorHandler` but ot no avail. The
>> behaviour is still the same
>>
>> Am i doing something wrong? Please help. Thank you
>>
>> ----------
>>
>> Here is the route definition :
>>
>> @Component
>> public class CamelConfiguration extends RouteBuilder {
>>
>>     @Override
>>     public void configure() throws Exception {
>>         LocalDateTime now = LocalDateTime.now();
>>         String kafkaCamelUri = String.format("kafka:cont_hist" +
>>                         "?brokers={{bootstrap-servers}}" +
>>                         "&schemaRegistryURL=http://localhost:8081"; +
>>                         "&specificAvroReader=true" +
>>                         "&bridgeErrorHandler=true" +
>>                         "&keyDeserializer=%s" +
>>                         "&valueDeserializer=%s",
>>                 StringDeserializer.class.getName(),
>>                 KafkaAvroDeserializer.class.getName());
>>         from(kafkaCamelUri)
>>                 .errorHandler(defaultErrorHandler().disableRedelivery())
>>                 .to("log:coucou")
>>                 .to("sql-stored:classpath:procstoc.sql" +
>>                         "?outputHeader=outError"
>>                 )
>>                 .to("log:output")
>>                 .log("coucou ${headers.outError}");
>>     }
>>
>> }
>>
>> -----------
>>
>> And here are some log excerpts :
>>
>> 2020-04-26 20:16:53.193  INFO 28096 --- [umer[cont_hist]]
>> o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.3.1
>> 2020-04-26 20:16:53.193  INFO 28096 --- [umer[cont_hist]]
>> o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 18a913733fb71c01
>> 2020-04-26 20:16:53.193  INFO 28096 --- [umer[cont_hist]]
>> o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1587925013193
>> 2020-04-26 20:16:53.194  INFO 28096 --- [umer[cont_hist]]
>> o.a.camel.component.kafka.KafkaConsumer  : Reconnecting cont_hist-Thread 0
>> to topic cont_hist after 5000 ms
>> 2020-04-26 20:16:58.194  INFO 28096 --- [umer[cont_hist]]
>> o.a.camel.component.kafka.KafkaConsumer  : Subscribing cont_hist-Thread 0
>> to topic cont_hist
>> 2020-04-26 20:16:58.194  INFO 28096 --- [umer[cont_hist]]
>> o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-10,
>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Subscribed to topic(s):
>> cont_hist
>> 2020-04-26 20:16:58.208  INFO 28096 --- [umer[cont_hist]]
>> org.apache.kafka.clients.Metadata        : [Consumer clientId=consumer-10,
>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Cluster ID:
>> yyC1KuR2Sv2BVVRNLdTnsg
>> 2020-04-26 20:16:58.209  INFO 28096 --- [umer[cont_hist]]
>> o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-10,
>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Discovered group coordinator
>> localhost:9092 (id: 2147483646 rack: null)
>> 2020-04-26 20:16:58.210  INFO 28096 --- [umer[cont_hist]]
>> o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-10,
>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Revoking previously assigned
>> partitions []
>> 2020-04-26 20:16:58.211  INFO 28096 --- [umer[cont_hist]]
>> o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-10,
>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] (Re-)joining group
>> 2020-04-26 20:16:58.221  INFO 28096 --- [umer[cont_hist]]
>> o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-10,
>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] (Re-)joining group
>> 2020-04-26 20:16:58.229  INFO 28096 --- [umer[cont_hist]]
>> o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-10,
>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Successfully joined group
>> with generation 19
>> 2020-04-26 20:16:58.232  INFO 28096 --- [umer[cont_hist]]
>> o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-10,
>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Setting newly assigned
>> partitions: cont_hist-0
>> 2020-04-26 20:16:58.236  INFO 28096 --- [umer[cont_hist]]
>> o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-10,
>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Setting offset for partition
>> cont_hist-0 to the committed offset FetchPosition{offset=4,
>> offsetEpoch=Optional.empty,
>> currentLeader=LeaderAndEpoch{leader=localhost:9092 (id: 1 rack: null),
>> epoch=0}}
>> 2020-04-26 20:16:58.251  INFO 28096 --- [umer[cont_hist]]
>> o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-10,
>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Member
>> consumer-10-187aed26-5575-4285-a567-3deca28e099c sending LeaveGroup request
>> to coordinator localhost:9092 (id: 2147483646 rack: null)
>> 2020-04-26 20:16:58.274  INFO 28096 --- [umer[cont_hist]]
>> o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values:
>> allow.auto.create.topics = true
>> auto.commit.interval.ms = 5000
>> auto.offset.reset = latest
>> bootstrap.servers = [http://localhost:9092]
>>
>> [...]
>>
>> 2020-04-26 20:16:58.293  WARN 28096 --- [umer[cont_hist]]
>> o.a.k.clients.consumer.ConsumerConfig    : The configuration
>> 'sasl.kerberos.principal.to.local.rules' was supplied but isn't a known
>> config.
>> 2020-04-26 20:16:58.294  INFO 28096 --- [umer[cont_hist]]
>> o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.3.1
>> 2020-04-26 20:16:58.294  INFO 28096 --- [umer[cont_hist]]
>> o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 18a913733fb71c01
>> 2020-04-26 20:16:58.294  INFO 28096 --- [umer[cont_hist]]
>> o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1587925018294
>> 2020-04-26 20:16:58.294  INFO 28096 --- [umer[cont_hist]]
>> o.a.camel.component.kafka.KafkaConsumer  : Reconnecting cont_hist-Thread 0
>> to topic cont_hist after 5000 ms
>> 2020-04-26 20:17:03.295  INFO 28096 --- [umer[cont_hist]]
>> o.a.camel.component.kafka.KafkaConsumer  : Subscribing cont_hist-Thread 0
>> to topic cont_hist
>> 2020-04-26 20:17:03.295  INFO 28096 --- [umer[cont_hist]]
>> o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-11,
>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Subscribed to topic(s):
>> cont_hist
>> 2020-04-26 20:17:03.305  INFO 28096 --- [umer[cont_hist]]
>> org.apache.kafka.clients.Metadata        : [Consumer clientId=consumer-11,
>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Cluster ID:
>> yyC1KuR2Sv2BVVRNLdTnsg
>> 2020-04-26 20:17:03.305  INFO 28096 --- [umer[cont_hist]]
>> o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-11,
>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Discovered group coordinator
>> localhost:9092 (id: 2147483646 rack: null)
>> 2020-04-26 20:17:03.306  INFO 28096 --- [umer[cont_hist]]
>> o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-11,
>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Revoking previously assigned
>> partitions []
>> 2020-04-26 20:17:03.306  INFO 28096 --- [umer[cont_hist]]
>> o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-11,
>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] (Re-)joining group
>> 2020-04-26 20:17:03.312  INFO 28096 --- [umer[cont_hist]]
>> o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-11,
>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] (Re-)joining group
>> 2020-04-26 20:17:03.319  INFO 28096 --- [umer[cont_hist]]
>> o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-11,
>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Successfully joined group
>> with generation 21
>> 2020-04-26 20:17:03.320  INFO 28096 --- [umer[cont_hist]]
>> o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-11,
>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Setting newly assigned
>> partitions: cont_hist-0
>> 2020-04-26 20:17:03.324  INFO 28096 --- [umer[cont_hist]]
>> o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-11,
>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Setting offset for partition
>> cont_hist-0 to the committed offset FetchPosition{offset=4,
>> offsetEpoch=Optional.empty,
>> currentLeader=LeaderAndEpoch{leader=localhost:9092 (id: 1 rack: null),
>> epoch=0}}
>> 2020-04-26 20:17:03.347  INFO 28096 --- [umer[cont_hist]]
>> o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-11,
>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Member
>> consumer-11-936dbb66-cbce-4d6f-a740-b3764822db42 sending LeaveGroup request
>> to coordinator localhost:9092 (id: 2147483646 rack: null)
>> 2020-04-26 20:17:03.400  INFO 28096 --- [umer[cont_hist]]
>> o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values:
>> allow.auto.create.topics = true
>>
>

Reply via email to