Re: Kafka component error handling - consumer keeps leaving and rejoining the group

2020-04-27 Thread Claus Ibsen
Hi

Thanks for all your findings, this is great insight. You are surely
welcome to create a JIRA ticket about this bug.
We can then together work on a fix, and you can help test it.

Yeah it seems catching KafkaException is maybe too wide. And lets see
if we also can incorporate bridge error handler.

On Mon, Apr 27, 2020 at 10:06 AM Joseph M'BIMBI-BENE
 wrote:
>
> I also realize that the property "bridgeErrorHandler" seems to never be
> used, while another one, like "breakOnFirstError".
>
> Also, going back to the exception handling, at least a couple other
> subclasses of KafkaException would deserve not to be retried on. Just a few
> examples:
>
>
>- ConfigException: "Thrown if the user supplies an invalid
>configuration" -> a retry will not solve that
>- OAuthBearerConfigException: "Exception thrown when there is a problem
>with the configuration (an invalid option in a JAAS config, for example)" :
>this one seems to fall under the same category
>- and obviously the serializationException
>
>
>
> On Sun, 26 Apr 2020 at 21:55, Joseph M'BIMBI-BENE 
> wrote:
>
> > digging into the code (version 3.2.0 i repeat),
> >
> > i can see in the class
> > `org.apache.camel.component.kafka.KafkaConsumer.KafkaFetchRecords.doRun`,
> > lign 406:
> >
> > ```
> > catch (KafkaException e) {
> >   // some kind of error in kafka, it may happen during
> >   // unsubscribing or during normal processing
> >   if (unsubscribing) {
> > getExceptionHandler().handleException("Error unsubscribing " +
> > threadId + " from kafka topic " + topicName, e);
> >   } else {
> > LOG.debug("KafkaException consuming {} from topic {} causedby {}. Will
> > attempt to re-connect on next run", threadId, topicName, e.getMessage());
> > reConnect = true;
> >   }
> > }
> > ```
> >
> > A `SerializationException` occurs, which extends KafkaException.
> > It definitely is not normal processing. And logging with debug level hides
> > the true cause.
> > I guess one would have to narrow down the classes of exception to be
> > caught in that catch clause, or as a quickfix, explicitly catch the
> > serializationException.
> >
> > How to proceed ?
> > I am not super familiar with Camel, and overall with open source
> > contributions ^^.
> >
> > Do i just open a ticket in some bug tracker, github maybe ?
> > Do you want me to open a pull request ?
> >
> > I am toying with camel for a couple of weeks now, i would like to
> > introduce it in the projects i work with. But i am by no mean a camel guru,
> >
> > And this bug might be a showstopper, so i would like to help fix it
> >
> > Thank you
> >
> > On Sun, 26 Apr 2020 at 20:56, Joseph M'BIMBI-BENE 
> > wrote:
> >
> >> I  forgot to tell i am using version 3.2.0
> >>
> >> On Sun, 26 Apr 2020 at 20:45, Joseph M'BIMBI-BENE <
> >> joseph.mbi...@gmail.com> wrote:
> >>
> >>> Hello everyone,
> >>>
> >>> I'm having a problem with the Kafka component:
> >>> When the kafka consumer can't read a message (caused by some avro errors
> >>> after investigation), it continuously leaves the group and joins again.
> >>>
> >>> I would like it to just throw an exception and let me decide how to
> >>> handle it: dlq, ignore, etc.
> >>>
> >>> I configured the parameter  `bridgeErrorHandler` but ot no avail. The
> >>> behaviour is still the same
> >>>
> >>> Am i doing something wrong? Please help. Thank you
> >>>
> >>> --
> >>>
> >>> Here is the route definition :
> >>>
> >>> @Component
> >>> public class CamelConfiguration extends RouteBuilder {
> >>>
> >>> @Override
> >>> public void configure() throws Exception {
> >>> LocalDateTime now = LocalDateTime.now();
> >>> String kafkaCamelUri = String.format("kafka:cont_hist" +
> >>> "?brokers={{bootstrap-servers}}" +
> >>> "=http://localhost:8081; +
> >>> "=true" +
> >>> "=true" +
> >>> "=%s" +
> >>> "=%s",
> >>> StringDeserializer.class.getName(),
> >>> KafkaAvroDeserializer.class.getName());
> >>> from(kafkaCamelUri)
> >>> .errorHandler(defaultErrorHandler().disableRedelivery())
> >>> .to("log:coucou")
> >>> .to("sql-stored:classpath:procstoc.sql" +
> >>> "?outputHeader=outError"
> >>> )
> >>> .to("log:output")
> >>> .log("coucou ${headers.outError}");
> >>> }
> >>>
> >>> }
> >>>
> >>> ---
> >>>
> >>> And here are some log excerpts :
> >>>
> >>> 2020-04-26 20:16:53.193  INFO 28096 --- [umer[cont_hist]]
> >>> o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.3.1
> >>> 2020-04-26 20:16:53.193  INFO 28096 --- [umer[cont_hist]]
> >>> o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 
> >>> 18a913733fb71c01
> >>> 2020-04-26 20:16:53.193  INFO 28096 --- [umer[cont_hist]]
> >>> 

Re: Kafka component error handling - consumer keeps leaving and rejoining the group

2020-04-27 Thread Joseph M'BIMBI-BENE
digging into the code (version 3.2.0 i repeat),

i can see in the class
`org.apache.camel.component.kafka.KafkaConsumer.KafkaFetchRecords.doRun`,
lign 406:

```
catch (KafkaException e) {
  // some kind of error in kafka, it may happen during
  // unsubscribing or during normal processing
  if (unsubscribing) {
getExceptionHandler().handleException("Error unsubscribing " + threadId
+ " from kafka topic " + topicName, e);
  } else {
LOG.debug("KafkaException consuming {} from topic {} causedby {}. Will
attempt to re-connect on next run", threadId, topicName, e.getMessage());
reConnect = true;
  }
}
```

A `SerializationException` occurs, which extends KafkaException.
It definitely is not normal processing. And logging with debug level hides
the true cause.
I guess one would have to narrow down the classes of exception to be caught
in that catch clause, or as a quickfix, explicitly catch the
serializationException.

How to proceed ?
I am not super familiar with Camel, and overall with open source
contributions ^^.

Do i just open a ticket in some bug tracker, github maybe ?
Do you want me to open a pull request ?

I am toying with camel for a couple of weeks now, i would like to introduce
it in the projects i work with. But i am by no mean a camel guru,

And this bug might be a showstopper, so i would like to help fix it

Thank you

On Sun, 26 Apr 2020 at 20:56, Joseph M'BIMBI-BENE 
wrote:

> I  forgot to tell i am using version 3.2.0
>
> On Sun, 26 Apr 2020 at 20:45, Joseph M'BIMBI-BENE 
> wrote:
>
>> Hello everyone,
>>
>> I'm having a problem with the Kafka component:
>> When the kafka consumer can't read a message (caused by some avro errors
>> after investigation), it continuously leaves the group and joins again.
>>
>> I would like it to just throw an exception and let me decide how to
>> handle it: dlq, ignore, etc.
>>
>> I configured the parameter  `bridgeErrorHandler` but ot no avail. The
>> behaviour is still the same
>>
>> Am i doing something wrong? Please help. Thank you
>>
>> --
>>
>> Here is the route definition :
>>
>> @Component
>> public class CamelConfiguration extends RouteBuilder {
>>
>> @Override
>> public void configure() throws Exception {
>> LocalDateTime now = LocalDateTime.now();
>> String kafkaCamelUri = String.format("kafka:cont_hist" +
>> "?brokers={{bootstrap-servers}}" +
>> "=http://localhost:8081; +
>> "=true" +
>> "=true" +
>> "=%s" +
>> "=%s",
>> StringDeserializer.class.getName(),
>> KafkaAvroDeserializer.class.getName());
>> from(kafkaCamelUri)
>> .errorHandler(defaultErrorHandler().disableRedelivery())
>> .to("log:coucou")
>> .to("sql-stored:classpath:procstoc.sql" +
>> "?outputHeader=outError"
>> )
>> .to("log:output")
>> .log("coucou ${headers.outError}");
>> }
>>
>> }
>>
>> ---
>>
>> And here are some log excerpts :
>>
>> 2020-04-26 20:16:53.193  INFO 28096 --- [umer[cont_hist]]
>> o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.3.1
>> 2020-04-26 20:16:53.193  INFO 28096 --- [umer[cont_hist]]
>> o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 18a913733fb71c01
>> 2020-04-26 20:16:53.193  INFO 28096 --- [umer[cont_hist]]
>> o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1587925013193
>> 2020-04-26 20:16:53.194  INFO 28096 --- [umer[cont_hist]]
>> o.a.camel.component.kafka.KafkaConsumer  : Reconnecting cont_hist-Thread 0
>> to topic cont_hist after 5000 ms
>> 2020-04-26 20:16:58.194  INFO 28096 --- [umer[cont_hist]]
>> o.a.camel.component.kafka.KafkaConsumer  : Subscribing cont_hist-Thread 0
>> to topic cont_hist
>> 2020-04-26 20:16:58.194  INFO 28096 --- [umer[cont_hist]]
>> o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-10,
>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Subscribed to topic(s):
>> cont_hist
>> 2020-04-26 20:16:58.208  INFO 28096 --- [umer[cont_hist]]
>> org.apache.kafka.clients.Metadata: [Consumer clientId=consumer-10,
>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Cluster ID:
>> yyC1KuR2Sv2BVVRNLdTnsg
>> 2020-04-26 20:16:58.209  INFO 28096 --- [umer[cont_hist]]
>> o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-10,
>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Discovered group coordinator
>> localhost:9092 (id: 2147483646 rack: null)
>> 2020-04-26 20:16:58.210  INFO 28096 --- [umer[cont_hist]]
>> o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-10,
>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Revoking previously assigned
>> partitions []
>> 2020-04-26 20:16:58.211  INFO 28096 --- [umer[cont_hist]]
>> o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-10,

Re: Kafka component error handling - consumer keeps leaving and rejoining the group

2020-04-27 Thread Joseph M'BIMBI-BENE
I  forgot to tell i am using version 3.2.0

On Sun, 26 Apr 2020 at 20:45, Joseph M'BIMBI-BENE 
wrote:

> Hello everyone,
>
> I'm having a problem with the Kafka component:
> When the kafka consumer can't read a message (caused by some avro errors
> after investigation), it continuously leaves the group and joins again.
>
> I would like it to just throw an exception and let me decide how to handle
> it: dlq, ignore, etc.
>
> I configured the parameter  `bridgeErrorHandler` but ot no avail. The
> behaviour is still the same
>
> Am i doing something wrong? Please help. Thank you
>
> --
>
> Here is the route definition :
>
> @Component
> public class CamelConfiguration extends RouteBuilder {
>
> @Override
> public void configure() throws Exception {
> LocalDateTime now = LocalDateTime.now();
> String kafkaCamelUri = String.format("kafka:cont_hist" +
> "?brokers={{bootstrap-servers}}" +
> "=http://localhost:8081; +
> "=true" +
> "=true" +
> "=%s" +
> "=%s",
> StringDeserializer.class.getName(),
> KafkaAvroDeserializer.class.getName());
> from(kafkaCamelUri)
> .errorHandler(defaultErrorHandler().disableRedelivery())
> .to("log:coucou")
> .to("sql-stored:classpath:procstoc.sql" +
> "?outputHeader=outError"
> )
> .to("log:output")
> .log("coucou ${headers.outError}");
> }
>
> }
>
> ---
>
> And here are some log excerpts :
>
> 2020-04-26 20:16:53.193  INFO 28096 --- [umer[cont_hist]]
> o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.3.1
> 2020-04-26 20:16:53.193  INFO 28096 --- [umer[cont_hist]]
> o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 18a913733fb71c01
> 2020-04-26 20:16:53.193  INFO 28096 --- [umer[cont_hist]]
> o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1587925013193
> 2020-04-26 20:16:53.194  INFO 28096 --- [umer[cont_hist]]
> o.a.camel.component.kafka.KafkaConsumer  : Reconnecting cont_hist-Thread 0
> to topic cont_hist after 5000 ms
> 2020-04-26 20:16:58.194  INFO 28096 --- [umer[cont_hist]]
> o.a.camel.component.kafka.KafkaConsumer  : Subscribing cont_hist-Thread 0
> to topic cont_hist
> 2020-04-26 20:16:58.194  INFO 28096 --- [umer[cont_hist]]
> o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-10,
> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Subscribed to topic(s):
> cont_hist
> 2020-04-26 20:16:58.208  INFO 28096 --- [umer[cont_hist]]
> org.apache.kafka.clients.Metadata: [Consumer clientId=consumer-10,
> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Cluster ID:
> yyC1KuR2Sv2BVVRNLdTnsg
> 2020-04-26 20:16:58.209  INFO 28096 --- [umer[cont_hist]]
> o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-10,
> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Discovered group coordinator
> localhost:9092 (id: 2147483646 rack: null)
> 2020-04-26 20:16:58.210  INFO 28096 --- [umer[cont_hist]]
> o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-10,
> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Revoking previously assigned
> partitions []
> 2020-04-26 20:16:58.211  INFO 28096 --- [umer[cont_hist]]
> o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-10,
> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] (Re-)joining group
> 2020-04-26 20:16:58.221  INFO 28096 --- [umer[cont_hist]]
> o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-10,
> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] (Re-)joining group
> 2020-04-26 20:16:58.229  INFO 28096 --- [umer[cont_hist]]
> o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-10,
> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Successfully joined group
> with generation 19
> 2020-04-26 20:16:58.232  INFO 28096 --- [umer[cont_hist]]
> o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-10,
> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Setting newly assigned
> partitions: cont_hist-0
> 2020-04-26 20:16:58.236  INFO 28096 --- [umer[cont_hist]]
> o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-10,
> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Setting offset for partition
> cont_hist-0 to the committed offset FetchPosition{offset=4,
> offsetEpoch=Optional.empty,
> currentLeader=LeaderAndEpoch{leader=localhost:9092 (id: 1 rack: null),
> epoch=0}}
> 2020-04-26 20:16:58.251  INFO 28096 --- [umer[cont_hist]]
> o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-10,
> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Member
> consumer-10-187aed26-5575-4285-a567-3deca28e099c sending LeaveGroup request
> to coordinator localhost:9092 (id: 2147483646 rack: null)
> 2020-04-26 20:16:58.274  INFO 28096 --- 

Re: Kafka component error handling - consumer keeps leaving and rejoining the group

2020-04-27 Thread Joseph M'BIMBI-BENE
I also realize that the property "bridgeErrorHandler" seems to never be
used, while another one, like "breakOnFirstError".

Also, going back to the exception handling, at least a couple other
subclasses of KafkaException would deserve not to be retried on. Just a few
examples:


   - ConfigException: "Thrown if the user supplies an invalid
   configuration" -> a retry will not solve that
   - OAuthBearerConfigException: "Exception thrown when there is a problem
   with the configuration (an invalid option in a JAAS config, for example)" :
   this one seems to fall under the same category
   - and obviously the serializationException



On Sun, 26 Apr 2020 at 21:55, Joseph M'BIMBI-BENE 
wrote:

> digging into the code (version 3.2.0 i repeat),
>
> i can see in the class
> `org.apache.camel.component.kafka.KafkaConsumer.KafkaFetchRecords.doRun`,
> lign 406:
>
> ```
> catch (KafkaException e) {
>   // some kind of error in kafka, it may happen during
>   // unsubscribing or during normal processing
>   if (unsubscribing) {
> getExceptionHandler().handleException("Error unsubscribing " +
> threadId + " from kafka topic " + topicName, e);
>   } else {
> LOG.debug("KafkaException consuming {} from topic {} causedby {}. Will
> attempt to re-connect on next run", threadId, topicName, e.getMessage());
> reConnect = true;
>   }
> }
> ```
>
> A `SerializationException` occurs, which extends KafkaException.
> It definitely is not normal processing. And logging with debug level hides
> the true cause.
> I guess one would have to narrow down the classes of exception to be
> caught in that catch clause, or as a quickfix, explicitly catch the
> serializationException.
>
> How to proceed ?
> I am not super familiar with Camel, and overall with open source
> contributions ^^.
>
> Do i just open a ticket in some bug tracker, github maybe ?
> Do you want me to open a pull request ?
>
> I am toying with camel for a couple of weeks now, i would like to
> introduce it in the projects i work with. But i am by no mean a camel guru,
>
> And this bug might be a showstopper, so i would like to help fix it
>
> Thank you
>
> On Sun, 26 Apr 2020 at 20:56, Joseph M'BIMBI-BENE 
> wrote:
>
>> I  forgot to tell i am using version 3.2.0
>>
>> On Sun, 26 Apr 2020 at 20:45, Joseph M'BIMBI-BENE <
>> joseph.mbi...@gmail.com> wrote:
>>
>>> Hello everyone,
>>>
>>> I'm having a problem with the Kafka component:
>>> When the kafka consumer can't read a message (caused by some avro errors
>>> after investigation), it continuously leaves the group and joins again.
>>>
>>> I would like it to just throw an exception and let me decide how to
>>> handle it: dlq, ignore, etc.
>>>
>>> I configured the parameter  `bridgeErrorHandler` but ot no avail. The
>>> behaviour is still the same
>>>
>>> Am i doing something wrong? Please help. Thank you
>>>
>>> --
>>>
>>> Here is the route definition :
>>>
>>> @Component
>>> public class CamelConfiguration extends RouteBuilder {
>>>
>>> @Override
>>> public void configure() throws Exception {
>>> LocalDateTime now = LocalDateTime.now();
>>> String kafkaCamelUri = String.format("kafka:cont_hist" +
>>> "?brokers={{bootstrap-servers}}" +
>>> "=http://localhost:8081; +
>>> "=true" +
>>> "=true" +
>>> "=%s" +
>>> "=%s",
>>> StringDeserializer.class.getName(),
>>> KafkaAvroDeserializer.class.getName());
>>> from(kafkaCamelUri)
>>> .errorHandler(defaultErrorHandler().disableRedelivery())
>>> .to("log:coucou")
>>> .to("sql-stored:classpath:procstoc.sql" +
>>> "?outputHeader=outError"
>>> )
>>> .to("log:output")
>>> .log("coucou ${headers.outError}");
>>> }
>>>
>>> }
>>>
>>> ---
>>>
>>> And here are some log excerpts :
>>>
>>> 2020-04-26 20:16:53.193  INFO 28096 --- [umer[cont_hist]]
>>> o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.3.1
>>> 2020-04-26 20:16:53.193  INFO 28096 --- [umer[cont_hist]]
>>> o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 18a913733fb71c01
>>> 2020-04-26 20:16:53.193  INFO 28096 --- [umer[cont_hist]]
>>> o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1587925013193
>>> 2020-04-26 20:16:53.194  INFO 28096 --- [umer[cont_hist]]
>>> o.a.camel.component.kafka.KafkaConsumer  : Reconnecting cont_hist-Thread 0
>>> to topic cont_hist after 5000 ms
>>> 2020-04-26 20:16:58.194  INFO 28096 --- [umer[cont_hist]]
>>> o.a.camel.component.kafka.KafkaConsumer  : Subscribing cont_hist-Thread 0
>>> to topic cont_hist
>>> 2020-04-26 20:16:58.194  INFO 28096 --- [umer[cont_hist]]
>>> o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-10,
>>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Subscribed to