ravikalla opened a new pull request, #20212: URL: https://github.com/apache/kafka/pull/20212
## Summary This PR implements KAFKA-10840 to expose authentication failures in the KafkaConsumer.poll() method, allowing applications to catch authentication issues immediately instead of experiencing silent failures. ## Problem Previously, when SSL certificates expired or other authentication issues occurred, the consumer would stop fetching data without clear indication of the underlying problem. This led to "data flow stops without indication" scenarios that were difficult to troubleshoot and handle gracefully. ## Solution ### New Exception Classes - **CertificateExpiredAuthenticationException**: Specifically for SSL certificate expiration scenarios - **PersistentAuthenticationException**: For non-retriable authentication failures (SASL, general SSL handshake failures) ### Core Changes - Modified `ClassicKafkaConsumer.poll()` to actively check all known cluster nodes for authentication exceptions before proceeding with fetch operations - Added `authenticationException(Node)` method to `ConsumerNetworkClient` to expose authentication state from the underlying `KafkaClient` - Enhanced `MockClient` with `setNodeAuthenticationFailure()` method for testing authentication failure scenarios ### Error Detection Logic The implementation detects certificate expiration by checking for specific patterns in SSL exception messages: - "certificate expired" - "Certificate expired" - "CERTIFICATE_EXPIRED" - "certificate has expired" - "expired certificate" ## Usage Example ```java try { ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1)); } catch (CertificateExpiredAuthenticationException e) { log.error("SSL certificate expired: {}", e.getMessage()); // Handle certificate renewal } catch (PersistentAuthenticationException e) { log.error("Authentication failed: {}", e.getMessage()); // Handle authentication configuration } ``` ## Test Plan - [x] All existing Kafka consumer tests pass - [x] All ConsumerNetworkClient tests pass - [x] Code passes checkstyle and spotbugs checks - [x] Implementation is backward compatible - [x] Authentication failure scenarios can be tested using MockClient enhancements ## Files Changed 1. `clients/src/main/java/org/apache/kafka/common/errors/CertificateExpiredAuthenticationException.java` (NEW) 2. `clients/src/main/java/org/apache/kafka/common/errors/PersistentAuthenticationException.java` (NEW) 3. `clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java` (MODIFIED) 4. `clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java` (MODIFIED) 5. `clients/src/test/java/org/apache/kafka/clients/MockClient.java` (MODIFIED) This addresses the core issue where "data flow stops without indication" when authentication fails, enabling applications to detect and handle these failures proactively rather than experiencing silent timeouts. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org