[ https://issues.apache.org/jira/browse/CAMEL-18796?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17643555#comment-17643555 ]
Luca Burgazzoli edited comment on CAMEL-18796 at 12/5/22 9:11 PM: ------------------------------------------------------------------ /cc [~orpiske] was (Author: lb): /cc @orpiske > camel-kafka: kafka consumer stops in case of an authentication issue > -------------------------------------------------------------------- > > Key: CAMEL-18796 > URL: https://issues.apache.org/jira/browse/CAMEL-18796 > Project: Camel > Issue Type: Bug > Components: camel-kafka > Affects Versions: 3.18.0, 3.19.0 > Reporter: Luca Burgazzoli > Priority: Major > > I'm running in a strange behavior of the camle-kafka component in case of a > glitch/temporary authentication issue. Assuming we have the following code: > {code:java} > //usr/bin/env jbang "$0" "$@" ; exit $? > // > //DEPS io.quarkus.platform:quarkus-camel-bom:2.14.2.Final@pom > //DEPS org.apache.camel.quarkus:camel-quarkus-kafka > //DEPS org.apache.camel.quarkus:camel-quarkus-log > //DEPS org.apache.camel.quarkus:camel-quarkus-direct > // > //JAVAC_OPTIONS -parameters > //JAVA_OPTIONS -Djava.util.logging.manager=org.jboss.logmanager.LogManager > // > import org.apache.camel.ExtendedCamelContext; > import org.apache.camel.builder.endpoint.EndpointRouteBuilder; > public class ck extends EndpointRouteBuilder { > @Override > public void configure() throws Exception { > getCamelContext().adapt(ExtendedCamelContext.class) > .setErrorHandlerFactory( > deadLetterChannel("direct:dlq") > ); > var kafka = kafka("demo") > .brokers("{{test.kafka.broker}}") > .autoOffsetReset("earliest") > .securityProtocol("SASL_SSL") > .saslMechanism("PLAIN") > > .saslJaasConfig("org.apache.kafka.common.security.plain.PlainLoginModule > required username='{{test.kafka.username}}' > password='{{test.kafka.password}}';"); > from("direct:dlq") > .to("log:dlq?showAll=true&multiline=true"); > from(kafka) > .to("log:kafka?showAll=true&multiline=true"); > } > } > {code} > What this route is doing is: > 1. set-up a global error handler (send to a DLQ) > 2. poll data from a kafka topic > If for some reason there is a glitch in the authentication machinery, then > the KafkaConsumer thread is terminated and no more poll/reconnection attempt > are made. > {code} > 2022-12-05 21:52:48,728 DEBUG > [org.apa.cam.com.kaf.con.sup.KafkaRecordProcessorFacade] (Camel (camel-1) > thread #1 - KafkaConsumer[demo]) Last poll on thread demo-Thread 0 resulted > on 0 records to process > 2022-12-05 21:52:53,729 DEBUG > [org.apa.cam.com.kaf.con.sup.KafkaRecordProcessorFacade] (Camel (camel-1) > thread #1 - KafkaConsumer[demo]) Last poll on thread demo-Thread 0 resulted > on 0 records to process > 2022-12-05 21:52:58,730 DEBUG > [org.apa.cam.com.kaf.con.sup.KafkaRecordProcessorFacade] (Camel (camel-1) > thread #1 - KafkaConsumer[demo]) Last poll on thread demo-Thread 0 resulted > on 0 records to process > 2022-12-05 21:53:03,731 DEBUG > [org.apa.cam.com.kaf.con.sup.KafkaRecordProcessorFacade] (Camel (camel-1) > thread #1 - KafkaConsumer[demo]) Last poll on thread demo-Thread 0 resulted > on 0 records to process > 2022-12-05 21:53:08,732 DEBUG > [org.apa.cam.com.kaf.con.sup.KafkaRecordProcessorFacade] (Camel (camel-1) > thread #1 - KafkaConsumer[demo]) Last poll on thread demo-Thread 0 resulted > on 0 records to process > 2022-12-05 21:53:09,598 INFO [org.apa.kaf.com.net.Selector] (Camel (camel-1) > thread #1 - KafkaConsumer[demo]) [Consumer > clientId=consumer-9fc21222-980b-4dd7-8e2b-0a228a4f3fe5-1, > groupId=9fc21222-980b-4dd7-8e2b-0a228a4f3fe5] Failed re-authentication with > broker-0-lb-cos-ce---l--votu-g----ig.bf2.kafka.rhcloud.com/34.247.249.77 > (channelId=2147483647) (Authentication failed: credentials for user could not > be verified) > 2022-12-05 21:53:09,602 ERROR [org.apa.kaf.cli.NetworkClient] (Camel > (camel-1) thread #1 - KafkaConsumer[demo]) [Consumer > clientId=consumer-9fc21222-980b-4dd7-8e2b-0a228a4f3fe5-1, > groupId=9fc21222-980b-4dd7-8e2b-0a228a4f3fe5] Connection to node 2147483647 > (broker-0-lb-cos-ce---l--votu-g----ig.bf2.kafka.rhcloud.com/34.247.249.77:443) > failed authentication due to: Authentication failed: credentials for user > could not be verified > 2022-12-05 21:53:09,605 WARN [org.apa.cam.com.kaf.KafkaFetchRecords] (Camel > (camel-1) thread #1 - KafkaConsumer[demo]) Exception > org.apache.kafka.common.errors.SaslAuthenticationException caught by thread > demo-Thread 0 while polling topic demo from kafka: Authentication failed: > credentials for user could not be verified: > org.apache.kafka.common.errors.SaslAuthenticationException: Authentication > failed: credentials for user could not be verified > 2022-12-05 21:53:09,609 WARN > [org.apa.cam.com.kaf.con.err.BridgeErrorStrategy] (Camel (camel-1) thread #1 > - KafkaConsumer[demo]) Deferring processing to the exception handler based on > polling exception strategy > 2022-12-05 21:53:09,624 DEBUG [org.apa.cam.pro.err.DeadLetterChannel] (Camel > (camel-1) thread #1 - KafkaConsumer[demo]) Failed delivery for (MessageId: > 386B9AF6D607152-0000000000000000 on ExchangeId: > 386B9AF6D607152-0000000000000000). On delivery attempt: 0 caught: > org.apache.kafka.common.errors.SaslAuthenticationException: Authentication > failed: credentials for user could not be verified > 2022-12-05 21:53:09,628 DEBUG [org.apa.cam.pro.SendProcessor] (Camel > (camel-1) thread #1 - KafkaConsumer[demo]) >>>> direct://dlq > Exchange[386B9AF6D607152-0000000000000000] > 2022-12-05 21:53:09,628 DEBUG [org.apa.cam.pro.SendProcessor] (Camel > (camel-1) thread #1 - KafkaConsumer[demo]) >>>> > log://dlq?multiline=true&showAll=true > Exchange[386B9AF6D607152-0000000000000000] > 2022-12-05 21:53:09,629 INFO [dlq] (Camel (camel-1) thread #1 - > KafkaConsumer[demo]) Exchange[ > Id: 386B9AF6D607152-0000000000000000 > ExchangePattern: InOnly > Properties: {CamelErrorHandlerBridge=true, > CamelExceptionCaught=org.apache.kafka.common.errors.SaslAuthenticationException: > Authentication failed: credentials for user could not be verified, > CamelFailureRouteId=route2, CamelFatalFallbackErrorHandler=[route2], > CamelToEndpoint=log://dlq?multiline=true&showAll=true} > Headers: {} > BodyType: null > Body: [Body is null] > CaughtExceptionType: > org.apache.kafka.common.errors.SaslAuthenticationException > CaughtExceptionMessage: Authentication failed: credentials for user could not > be verified StackTrace: > org.apache.kafka.common.errors.SaslAuthenticationException: Authentication > failed: credentials for user could not be verified > ] > 2022-12-05 21:53:09,635 INFO [org.apa.cam.com.kaf.con.err.SeekUtil] (Camel > (camel-1) thread #1 - KafkaConsumer[demo]) Consumer seeking to next offset 1 > to continue polling next message from topic demo on partition 0 > 2022-12-05 21:53:09,636 DEBUG [org.apa.cam.com.kaf.KafkaFetchRecords] (Camel > (camel-1) thread #1 - KafkaConsumer[demo]) Closing consumer demo-Thread 0 > 2022-12-05 21:53:09,636 DEBUG > [org.apa.cam.com.kaf.con.sup.PartitionAssignmentListener] (Camel (camel-1) > thread #1 - KafkaConsumer[demo]) onPartitionsRevoked: demo-Thread 0 from demo > 2022-12-05 21:53:09,643 INFO [org.apa.cam.com.kaf.KafkaFetchRecords] (Camel > (camel-1) thread #1 - KafkaConsumer[demo]) Terminating KafkaConsumer thread > demo-Thread 0 receiving from topic demo > {code} > > However according to the documentation, if the pollOnError is set to > ERROR_HANDLER as in this case (it is the default), the strategy should use > Camel’s error handler to process the exception, and afterwards continue to > poll next message but this does not seems to be the case. > This seems to be somehow related to: > - https://issues.apache.org/jira/browse/CAMEL-17424 > - > https://github.com/apache/camel/commit/55df049a96fd8f52265ef7e7a0cc9ca5a28ab6b3 -- This message was sent by Atlassian Jira (v8.20.10#820010)