ravikalla commented on code in PR #20212:
URL: https://github.com/apache/kafka/pull/20212#discussion_r2389231526
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java:
##########
@@ -593,6 +593,15 @@ public void tryConnect(Node node) {
}
}
+ /**
+ * Get the authentication exception for a given node, if any.
+ * @param node the node to check
+ * @return an AuthenticationException iff authentication has failed, null
otherwise
+ */
+ public AuthenticationException authenticationException(Node node) {
Review Comment:
@lianetm You're absolutely right - this was duplicating
`maybeThrowAuthFailure()`. I've removed this method entirely and enhanced the
existing `maybeThrowAuthFailure()` instead. Much cleaner now, thanks for
catching that!
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java:
##########
@@ -640,6 +646,9 @@ private ConsumerRecords<K, V> poll(final Timer timer) {
do {
client.maybeTriggerWakeup();
+ // Check for authentication failures that should be surfaced
to the application
+ checkForAuthenticationFailures();
Review Comment:
@lianetm You're absolutely right! After investigating further, I found
that the existing `maybeThrowAuthFailure(node)` at line
444 in AbstractFetch does handle authentication failures, but it only
throws the generic `AuthenticationException`.
## The Gap I Was Trying to Address
The issue in KAFKA-10840 is that when SSL certificates expire,
applications receive a generic `AuthenticationException` that doesn't
distinguish between:
1. **Temporary auth issues** (network glitches, transient SASL failures)
2. **Permanent certificate expiration** (requires certificate renewal)
3. **Other persistent auth failures** (wrong credentials, permission
changes)
This makes it difficult for applications to implement appropriate error
handling - they can't tell if they should retry, alert operators about
certificate expiration, or reconfigure credentials.
## What Happens Currently
When a certificate expires:
1. `AbstractFetch.prepareFetchRequests()` calls
`maybeThrowAuthFailure(node)` ✅
2. This throws a generic `AuthenticationException` ✅
3. The application catches it but **can't determine it's specifically a
certificate expiration** ❌
## My Solution (After Your Feedback)
I've now enhanced the existing
`ConsumerNetworkClient.maybeThrowAuthFailure()` to examine the
`AuthenticationException` and throw specific subtypes:
```java
// In ConsumerNetworkClient.maybeThrowAuthFailure()
if (authException instanceof SslAuthenticationException) {
String message = sslException.getMessage();
if (message.contains("certificate expired")) {
throw new CertificateExpiredAuthenticationException(...); // New
specific type
}
throw new PersistentAuthenticationException(...); // Other SSL
failures
}
This way:
- The existing flow in AbstractFetch remains unchanged
- Authentication failures are still caught at line 444 as you pointed out
- But now applications receive specific exception types they can handle
differently
Example Use Case
try {
records = consumer.poll(Duration.ofSeconds(1));
} catch (CertificateExpiredAuthenticationException e) {
// Alert ops team about certificate renewal
alertOpsTeam("Certificate expired, needs renewal!");
throw e;
} catch (PersistentAuthenticationException e) {
// Different handling for other auth failures
log.error("Auth configuration error", e);
reconfigureAuth();
}
You were correct that the mechanism exists - I just needed to enhance it
to provide more specific exception types rather than adding duplicate checking.
Thank you for pointing me in the right direction!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]