lianetm commented on code in PR #20212:
URL: https://github.com/apache/kafka/pull/20212#discussion_r2388581949
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java:
##########
@@ -640,6 +646,9 @@ private ConsumerRecords<K, V> poll(final Timer timer) {
do {
client.maybeTriggerWakeup();
+ // Check for authentication failures that should be surfaced
to the application
+ checkForAuthenticationFailures();
Review Comment:
this intention is already implemented on poll as part of the fetch logic.
Whenever attempting to send a fetch request to a node, if the node is not
available, the consumer will check for auth errors and propagate them to the
app poll call , this
https://github.com/apache/kafka/blob/d1a821226c1beebae6db66e02a7ae8ada28ffabc/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java#L453
I get that for some reason this logic is not propagating the auth errors as
you expect in your example, right? could you maybe check to understand what
exactly happens here in your scenario?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java:
##########
@@ -593,6 +593,15 @@ public void tryConnect(Node node) {
}
}
+ /**
+ * Get the authentication exception for a given node, if any.
+ * @param node the node to check
+ * @return an AuthenticationException iff authentication has failed, null
otherwise
+ */
+ public AuthenticationException authenticationException(Node node) {
Review Comment:
this seems to serve the same purpose as the existing `maybeThrowAuthFailure`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]