kirktrue commented on code in PR #20212:
URL: https://github.com/apache/kafka/pull/20212#discussion_r2396346901


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java:
##########
@@ -1283,6 +1292,48 @@ private void updateLastSeenEpochIfNewer(TopicPartition 
topicPartition, OffsetAnd
             offsetAndMetadata.leaderEpoch().ifPresent(epoch -> 
metadata.updateLastSeenEpochIfNewer(topicPartition, epoch));
     }
 
+    /**
+     * Check for authentication failures that should be surfaced to the 
application.
+     * This method checks for authentication exceptions on connected nodes and 
throws
+     * appropriate exceptions for permanent failures like certificate 
expiration.
+     * 
+     * @throws AuthenticationException if authentication has failed
+     */
+    private void checkForAuthenticationFailures() {
+        // Check authentication failures for all known nodes
+        for (Node node : metadata.fetch().nodes()) {
+            AuthenticationException authException = 
client.authenticationException(node);
+            if (authException != null) {
+                log.error("Authentication failed for node {}: {}", node, 
authException.getMessage());
+                
+                // Check for specific SSL certificate-related errors
+                if (authException instanceof SslAuthenticationException) {
+                    SslAuthenticationException sslException = 
(SslAuthenticationException) authException;
+                    String message = sslException.getMessage();
+                    
+                    // Check for certificate expiration patterns in the error 
message
+                    if (message != null && (
+                            message.contains("certificate expired") || 
+                            message.contains("Certificate expired") ||
+                            message.contains("CERTIFICATE_EXPIRED") ||
+                            message.contains("certificate has expired") ||
+                            message.contains("expired certificate"))) {
+                        throw new CertificateExpiredAuthenticationException(
+                            "SSL certificate has expired for node " + node + 
": " + message, sslException);
+                    }
+                    
+                    // For other SSL authentication failures, throw 
PersistentAuthenticationException
+                    throw new PersistentAuthenticationException(
+                        "SSL authentication failed for node " + node + ": " + 
message, sslException);
+                }
+                
+                // For non-SSL authentication failures, also treat as 
persistent
+                throw new PersistentAuthenticationException(
+                    "Authentication failed for node " + node + ": " + 
authException.getMessage(), authException);

Review Comment:
   Error handling that depends on strings appearing in the error messages is 
always a bit brittle. Regardless, this seems like something that should happen 
at a different layer so that the admin and producer clients can use it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to