[ 
https://issues.apache.org/jira/browse/CAMEL-18796?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17643555#comment-17643555
 ] 

Luca Burgazzoli edited comment on CAMEL-18796 at 12/5/22 9:11 PM:
------------------------------------------------------------------

/cc [~orpiske]


was (Author: lb):
/cc @orpiske

> camel-kafka: kafka consumer stops in case of an authentication issue
> --------------------------------------------------------------------
>
>                 Key: CAMEL-18796
>                 URL: https://issues.apache.org/jira/browse/CAMEL-18796
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-kafka
>    Affects Versions: 3.18.0, 3.19.0
>            Reporter: Luca Burgazzoli
>            Priority: Major
>
> I'm running in a strange behavior of the camle-kafka component in case of a 
> glitch/temporary authentication issue. Assuming we have the following code:
> {code:java}
> //usr/bin/env jbang "$0" "$@" ; exit $?
> //
> //DEPS io.quarkus.platform:quarkus-camel-bom:2.14.2.Final@pom
> //DEPS org.apache.camel.quarkus:camel-quarkus-kafka
> //DEPS org.apache.camel.quarkus:camel-quarkus-log
> //DEPS org.apache.camel.quarkus:camel-quarkus-direct
> //
> //JAVAC_OPTIONS -parameters
> //JAVA_OPTIONS -Djava.util.logging.manager=org.jboss.logmanager.LogManager
> //
> import org.apache.camel.ExtendedCamelContext;
> import org.apache.camel.builder.endpoint.EndpointRouteBuilder;
> public class ck extends EndpointRouteBuilder {
>     @Override
>     public void configure() throws Exception {
>         getCamelContext().adapt(ExtendedCamelContext.class)
>             .setErrorHandlerFactory(
>                 deadLetterChannel("direct:dlq")
>             );
>         var kafka = kafka("demo")
>             .brokers("{{test.kafka.broker}}")
>             .autoOffsetReset("earliest")
>             .securityProtocol("SASL_SSL")  
>             .saslMechanism("PLAIN")
>             
> .saslJaasConfig("org.apache.kafka.common.security.plain.PlainLoginModule 
> required username='{{test.kafka.username}}' 
> password='{{test.kafka.password}}';");      
>         from("direct:dlq")
>             .to("log:dlq?showAll=true&multiline=true");
>         from(kafka)
>             .to("log:kafka?showAll=true&multiline=true");
>     }
> }
> {code}
> What this route is doing is:
> 1. set-up a global error handler (send to a DLQ)
> 2. poll data from a kafka topic
> If for some reason there is a glitch in the authentication machinery, then 
> the KafkaConsumer thread is terminated and no more poll/reconnection attempt 
> are made.
> {code}
> 2022-12-05 21:52:48,728 DEBUG 
> [org.apa.cam.com.kaf.con.sup.KafkaRecordProcessorFacade] (Camel (camel-1) 
> thread #1 - KafkaConsumer[demo]) Last poll on thread demo-Thread 0 resulted 
> on 0 records to process
> 2022-12-05 21:52:53,729 DEBUG 
> [org.apa.cam.com.kaf.con.sup.KafkaRecordProcessorFacade] (Camel (camel-1) 
> thread #1 - KafkaConsumer[demo]) Last poll on thread demo-Thread 0 resulted 
> on 0 records to process
> 2022-12-05 21:52:58,730 DEBUG 
> [org.apa.cam.com.kaf.con.sup.KafkaRecordProcessorFacade] (Camel (camel-1) 
> thread #1 - KafkaConsumer[demo]) Last poll on thread demo-Thread 0 resulted 
> on 0 records to process
> 2022-12-05 21:53:03,731 DEBUG 
> [org.apa.cam.com.kaf.con.sup.KafkaRecordProcessorFacade] (Camel (camel-1) 
> thread #1 - KafkaConsumer[demo]) Last poll on thread demo-Thread 0 resulted 
> on 0 records to process
> 2022-12-05 21:53:08,732 DEBUG 
> [org.apa.cam.com.kaf.con.sup.KafkaRecordProcessorFacade] (Camel (camel-1) 
> thread #1 - KafkaConsumer[demo]) Last poll on thread demo-Thread 0 resulted 
> on 0 records to process
> 2022-12-05 21:53:09,598 INFO  [org.apa.kaf.com.net.Selector] (Camel (camel-1) 
> thread #1 - KafkaConsumer[demo]) [Consumer 
> clientId=consumer-9fc21222-980b-4dd7-8e2b-0a228a4f3fe5-1, 
> groupId=9fc21222-980b-4dd7-8e2b-0a228a4f3fe5] Failed re-authentication with 
> broker-0-lb-cos-ce---l--votu-g----ig.bf2.kafka.rhcloud.com/34.247.249.77 
> (channelId=2147483647) (Authentication failed: credentials for user could not 
> be verified)
> 2022-12-05 21:53:09,602 ERROR [org.apa.kaf.cli.NetworkClient] (Camel 
> (camel-1) thread #1 - KafkaConsumer[demo]) [Consumer 
> clientId=consumer-9fc21222-980b-4dd7-8e2b-0a228a4f3fe5-1, 
> groupId=9fc21222-980b-4dd7-8e2b-0a228a4f3fe5] Connection to node 2147483647 
> (broker-0-lb-cos-ce---l--votu-g----ig.bf2.kafka.rhcloud.com/34.247.249.77:443)
>  failed authentication due to: Authentication failed: credentials for user 
> could not be verified
> 2022-12-05 21:53:09,605 WARN  [org.apa.cam.com.kaf.KafkaFetchRecords] (Camel 
> (camel-1) thread #1 - KafkaConsumer[demo]) Exception 
> org.apache.kafka.common.errors.SaslAuthenticationException caught by thread 
> demo-Thread 0 while polling topic demo from kafka: Authentication failed: 
> credentials for user could not be verified: 
> org.apache.kafka.common.errors.SaslAuthenticationException: Authentication 
> failed: credentials for user could not be verified
> 2022-12-05 21:53:09,609 WARN  
> [org.apa.cam.com.kaf.con.err.BridgeErrorStrategy] (Camel (camel-1) thread #1 
> - KafkaConsumer[demo]) Deferring processing to the exception handler based on 
> polling exception strategy
> 2022-12-05 21:53:09,624 DEBUG [org.apa.cam.pro.err.DeadLetterChannel] (Camel 
> (camel-1) thread #1 - KafkaConsumer[demo]) Failed delivery for (MessageId: 
> 386B9AF6D607152-0000000000000000 on ExchangeId: 
> 386B9AF6D607152-0000000000000000). On delivery attempt: 0 caught: 
> org.apache.kafka.common.errors.SaslAuthenticationException: Authentication 
> failed: credentials for user could not be verified
> 2022-12-05 21:53:09,628 DEBUG [org.apa.cam.pro.SendProcessor] (Camel 
> (camel-1) thread #1 - KafkaConsumer[demo]) >>>> direct://dlq 
> Exchange[386B9AF6D607152-0000000000000000]
> 2022-12-05 21:53:09,628 DEBUG [org.apa.cam.pro.SendProcessor] (Camel 
> (camel-1) thread #1 - KafkaConsumer[demo]) >>>> 
> log://dlq?multiline=true&showAll=true 
> Exchange[386B9AF6D607152-0000000000000000]
> 2022-12-05 21:53:09,629 INFO  [dlq] (Camel (camel-1) thread #1 - 
> KafkaConsumer[demo]) Exchange[
>   Id: 386B9AF6D607152-0000000000000000
>   ExchangePattern: InOnly
>   Properties: {CamelErrorHandlerBridge=true, 
> CamelExceptionCaught=org.apache.kafka.common.errors.SaslAuthenticationException:
>  Authentication failed: credentials for user could not be verified, 
> CamelFailureRouteId=route2, CamelFatalFallbackErrorHandler=[route2], 
> CamelToEndpoint=log://dlq?multiline=true&showAll=true}
>   Headers: {}
>   BodyType: null
>   Body: [Body is null]
>   CaughtExceptionType: 
> org.apache.kafka.common.errors.SaslAuthenticationException  
> CaughtExceptionMessage: Authentication failed: credentials for user could not 
> be verified  StackTrace: 
> org.apache.kafka.common.errors.SaslAuthenticationException: Authentication 
> failed: credentials for user could not be verified
> ]
> 2022-12-05 21:53:09,635 INFO  [org.apa.cam.com.kaf.con.err.SeekUtil] (Camel 
> (camel-1) thread #1 - KafkaConsumer[demo]) Consumer seeking to next offset 1 
> to continue polling next message from topic demo on partition 0
> 2022-12-05 21:53:09,636 DEBUG [org.apa.cam.com.kaf.KafkaFetchRecords] (Camel 
> (camel-1) thread #1 - KafkaConsumer[demo]) Closing consumer demo-Thread 0
> 2022-12-05 21:53:09,636 DEBUG 
> [org.apa.cam.com.kaf.con.sup.PartitionAssignmentListener] (Camel (camel-1) 
> thread #1 - KafkaConsumer[demo]) onPartitionsRevoked: demo-Thread 0 from demo
> 2022-12-05 21:53:09,643 INFO  [org.apa.cam.com.kaf.KafkaFetchRecords] (Camel 
> (camel-1) thread #1 - KafkaConsumer[demo]) Terminating KafkaConsumer thread 
> demo-Thread 0 receiving from topic demo
> {code}
>  
> However according to the documentation, if the pollOnError is set to 
> ERROR_HANDLER as in this case (it is the default), the strategy should use 
> Camel’s error handler to process the exception, and afterwards continue to 
> poll next message but this does not seems to be the case.
> This seems to be somehow related to:
> - https://issues.apache.org/jira/browse/CAMEL-17424
> - 
> https://github.com/apache/camel/commit/55df049a96fd8f52265ef7e7a0cc9ca5a28ab6b3



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to