kirktrue commented on code in PR #20212:
URL: https://github.com/apache/kafka/pull/20212#discussion_r2396346901
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java:
##########
@@ -1283,6 +1292,48 @@ private void updateLastSeenEpochIfNewer(TopicPartition
topicPartition, OffsetAnd
offsetAndMetadata.leaderEpoch().ifPresent(epoch ->
metadata.updateLastSeenEpochIfNewer(topicPartition, epoch));
}
+ /**
+ * Check for authentication failures that should be surfaced to the
application.
+ * This method checks for authentication exceptions on connected nodes and
throws
+ * appropriate exceptions for permanent failures like certificate
expiration.
+ *
+ * @throws AuthenticationException if authentication has failed
+ */
+ private void checkForAuthenticationFailures() {
+ // Check authentication failures for all known nodes
+ for (Node node : metadata.fetch().nodes()) {
+ AuthenticationException authException =
client.authenticationException(node);
+ if (authException != null) {
+ log.error("Authentication failed for node {}: {}", node,
authException.getMessage());
+
+ // Check for specific SSL certificate-related errors
+ if (authException instanceof SslAuthenticationException) {
+ SslAuthenticationException sslException =
(SslAuthenticationException) authException;
+ String message = sslException.getMessage();
+
+ // Check for certificate expiration patterns in the error
message
+ if (message != null && (
+ message.contains("certificate expired") ||
+ message.contains("Certificate expired") ||
+ message.contains("CERTIFICATE_EXPIRED") ||
+ message.contains("certificate has expired") ||
+ message.contains("expired certificate"))) {
+ throw new CertificateExpiredAuthenticationException(
+ "SSL certificate has expired for node " + node +
": " + message, sslException);
+ }
+
+ // For other SSL authentication failures, throw
PersistentAuthenticationException
+ throw new PersistentAuthenticationException(
+ "SSL authentication failed for node " + node + ": " +
message, sslException);
+ }
+
+ // For non-SSL authentication failures, also treat as
persistent
+ throw new PersistentAuthenticationException(
+ "Authentication failed for node " + node + ": " +
authException.getMessage(), authException);
Review Comment:
Error handling that depends on strings appearing in the error messages is
always a bit brittle. Regardless, this seems like something that should happen
at a different layer so that the admin and producer clients can use it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]