[
https://issues.apache.org/jira/browse/KAFKA-5944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16361546#comment-16361546
]
ASF GitHub Bot commented on KAFKA-5944:
---
hachikuji closed pull request #3965: KAFKA-5944: Unit tests for handling SASL
authentication failures in clients
URL: https://github.com/apache/kafka/pull/3965
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index d843414fd7a..65255fe9648 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -20,6 +20,7 @@
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.utils.Time;
@@ -77,6 +78,7 @@ public FutureResponse(Node node,
private Node node = null;
private final Set ready = new HashSet<>();
private final Map blackedOut = new HashMap<>();
+private final Map authenticationException =
new HashMap<>();
// Use concurrent queue for requests so that requests may be queried from
a different thread
private final Queue requests = new
ConcurrentLinkedDeque<>();
// Use concurrent queue for responses so that responses may be updated
during poll() from a different thread.
@@ -102,7 +104,7 @@ public boolean isReady(Node node, long now) {
@Override
public boolean ready(Node node, long now) {
-if (isBlackedOut(node))
+if (isBlackedOut(node) || authenticationException(node) != null)
return false;
ready.add(node.idString());
return true;
@@ -117,6 +119,12 @@ public void blackout(Node node, long duration) {
blackedOut.put(node, time.milliseconds() + duration);
}
+public void authenticationFailed(Node node, long duration) {
+authenticationException.put(node, (AuthenticationException)
Errors.SASL_AUTHENTICATION_FAILED.exception());
+disconnect(node.idString());
+blackout(node, duration);
+}
+
private boolean isBlackedOut(Node node) {
if (blackedOut.containsKey(node)) {
long expiration = blackedOut.get(node);
@@ -137,7 +145,7 @@ public boolean connectionFailed(Node node) {
@Override
public AuthenticationException authenticationException(Node node) {
-return null;
+return authenticationException.get(node);
}
@Override
@@ -347,6 +355,7 @@ public void reset() {
responses.clear();
futureResponses.clear();
metadataUpdates.clear();
+authenticationException.clear();
}
public void prepareMetadataUpdate(Cluster cluster, Set
unavailableTopics) {
diff --git
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 186ccf06cb5..f08a99b6ddc 100644
---
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -30,6 +30,7 @@
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.LeaderNotAvailableException;
import org.apache.kafka.common.errors.NotLeaderForPartitionException;
@@ -248,6 +249,75 @@ public void testInvalidTopicNames() throws Exception {
}
}
+@Test
+public void
testAdminClientApisWithinBlackoutPeriodAfterAuthenticationFailure() throws
Exception {
+AdminClientUnitTestEnv env = mockClientEnv();
+Node node = env.cluster().controller();
+env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+env.kafkaClient().setNode(node);
+env.kafkaClient().authenticationFailed(node, 300);
+
+callAdminClientApisAndExpectAnAuthenticationError(env);
+
+// wait less than the blackout period, the connection should fail and
the authentication error should remain
+env.time().sleep(30);
+assertTrue(env.kafkaClient().connectionFailed(node));
+