Repository: kafka
Updated Branches:
  refs/heads/trunk c2af13ed0 -> 69d2a1771


KAFKA-5854; Handle SASL authentication failures as non-retriable exceptions in 
clients

This PR implements the client-side of KIP-152, by modifying `KafkaConsumer`, 
`KafkaProducer`, and `ConsumerGroupCommand` to throw a non-retriable exception 
when SASL authentication fails.

This PR is co-authored with rajinisivaram.

Author: Vahid Hashemian <vahidhashem...@us.ibm.com>

Reviewers: Jason Gustafson <ja...@confluent.io>, Rajini Sivaram 
<rajinisiva...@googlemail.com>, Ismael Juma <ism...@juma.me.uk>, tedyu 
<yuzhih...@gmail.com>

Closes #3832 from vahidhashemian/KAFKA-5854


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/69d2a177
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/69d2a177
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/69d2a177

Branch: refs/heads/trunk
Commit: 69d2a177101eb1c29b59b4c64d8c22f6d5e3d281
Parents: c2af13e
Author: Vahid Hashemian <vahidhashem...@us.ibm.com>
Authored: Wed Sep 20 22:51:53 2017 +0100
Committer: Rajini Sivaram <rajinisiva...@googlemail.com>
Committed: Wed Sep 20 22:51:53 2017 +0100

----------------------------------------------------------------------
 .../kafka/clients/ClusterConnectionStates.java  |  44 ++++-
 .../apache/kafka/clients/ConnectionState.java   |   7 +-
 .../org/apache/kafka/clients/KafkaClient.java   |  22 ++-
 .../kafka/clients/ManualMetadataUpdater.java    |  12 ++
 .../java/org/apache/kafka/clients/Metadata.java |  22 ++-
 .../apache/kafka/clients/MetadataUpdater.java   |   8 +
 .../org/apache/kafka/clients/NetworkClient.java |  28 ++-
 .../kafka/clients/consumer/KafkaConsumer.java   |  26 ++-
 .../clients/consumer/OffsetCommitCallback.java  |   2 +-
 .../consumer/internals/AbstractCoordinator.java |   7 +
 .../consumer/internals/ConsumerCoordinator.java |   2 +-
 .../internals/ConsumerNetworkClient.java        |  12 +-
 .../kafka/clients/producer/KafkaProducer.java   |  12 +-
 .../kafka/common/network/ChannelState.java      |  10 +-
 .../org/apache/kafka/clients/MetadataTest.java  |   4 +-
 .../org/apache/kafka/clients/MockClient.java    |   6 +
 .../ClientAuthenticationFailureTest.java        | 114 ++++++++++++
 .../main/scala/kafka/admin/AdminClient.scala    |   7 +-
 .../scala/kafka/tools/ConsoleConsumer.scala     |   5 +-
 .../SaslClientsWithInvalidCredentialsTest.scala | 176 +++++++++++++++++++
 .../SaslScramSslEndToEndAuthorizationTest.scala |  14 +-
 .../scala/integration/kafka/api/SaslSetup.scala |  15 +-
 22 files changed, 496 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/69d2a177/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java 
b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
index 4d4bedd..5bc25de 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
@@ -17,12 +17,15 @@
 package org.apache.kafka.clients;
 
 import java.util.concurrent.ThreadLocalRandom;
+
+import org.apache.kafka.common.errors.AuthenticationException;
+
 import java.util.HashMap;
 import java.util.Map;
 
 /**
  * The state of our connection to each node in the cluster.
- * 
+ *
  */
 final class ClusterConnectionStates {
     private final long reconnectBackoffInitMs;
@@ -50,7 +53,8 @@ final class ClusterConnectionStates {
         if (state == null)
             return true;
         else
-            return state.state == ConnectionState.DISCONNECTED && now - 
state.lastConnectAttemptMs >= state.reconnectBackoffMs;
+            return state.state.isDisconnected() &&
+                   now - state.lastConnectAttemptMs >= 
state.reconnectBackoffMs;
     }
 
     /**
@@ -63,7 +67,8 @@ final class ClusterConnectionStates {
         if (state == null)
             return false;
         else
-            return state.state == ConnectionState.DISCONNECTED && now - 
state.lastConnectAttemptMs < state.reconnectBackoffMs;
+            return state.state.isDisconnected() &&
+                   now - state.lastConnectAttemptMs < state.reconnectBackoffMs;
     }
 
     /**
@@ -77,7 +82,7 @@ final class ClusterConnectionStates {
         NodeConnectionState state = nodeState.get(id);
         if (state == null) return 0;
         long timeWaited = now - state.lastConnectAttemptMs;
-        if (state.state == ConnectionState.DISCONNECTED) {
+        if (state.state.isDisconnected()) {
             return Math.max(state.reconnectBackoffMs - timeWaited, 0);
         } else {
             // When connecting or connected, we should be able to delay 
indefinitely since other events (connection or
@@ -136,6 +141,20 @@ final class ClusterConnectionStates {
     }
 
     /**
+     * Enter the authentication failed state for the given node.
+     * @param id the connection identifier
+     * @param now the current time
+     * @param exception the authentication exception
+     */
+    public void authenticationFailed(String id, long now, 
AuthenticationException exception) {
+        NodeConnectionState nodeState = nodeState(id);
+        nodeState.authenticationException = exception;
+        nodeState.state = ConnectionState.AUTHENTICATION_FAILED;
+        nodeState.lastConnectAttemptMs = now;
+        updateReconnectBackoff(nodeState);
+    }
+
+    /**
      * Return true if the connection is ready.
      * @param id the connection identifier
      */
@@ -162,7 +181,16 @@ final class ClusterConnectionStates {
      */
     public boolean isDisconnected(String id) {
         NodeConnectionState state = nodeState.get(id);
-        return state != null && state.state == ConnectionState.DISCONNECTED;
+        return state != null && state.state.isDisconnected();
+    }
+
+    /**
+     * Return authentication exception if an authentication error occurred
+     * @param id The id of the node to check
+     */
+    public AuthenticationException authenticationException(String id) {
+        NodeConnectionState state = nodeState.get(id);
+        return state != null ? state.authenticationException : null;
     }
 
     /**
@@ -205,7 +233,7 @@ final class ClusterConnectionStates {
     public void remove(String id) {
         nodeState.remove(id);
     }
-    
+
     /**
      * Get the state of a given connection.
      * @param id the id of the connection
@@ -225,19 +253,21 @@ final class ClusterConnectionStates {
             throw new IllegalStateException("No entry found for connection " + 
id);
         return state;
     }
-    
+
     /**
      * The state of our connection to a node.
      */
     private static class NodeConnectionState {
 
         ConnectionState state;
+        AuthenticationException authenticationException;
         long lastConnectAttemptMs;
         long failedAttempts;
         long reconnectBackoffMs;
 
         public NodeConnectionState(ConnectionState state, long 
lastConnectAttempt, long reconnectBackoffMs) {
             this.state = state;
+            this.authenticationException = null;
             this.lastConnectAttemptMs = lastConnectAttempt;
             this.failedAttempts = 0;
             this.reconnectBackoffMs = reconnectBackoffMs;

http://git-wip-us.apache.org/repos/asf/kafka/blob/69d2a177/clients/src/main/java/org/apache/kafka/clients/ConnectionState.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/ConnectionState.java 
b/clients/src/main/java/org/apache/kafka/clients/ConnectionState.java
index 62ffada..28b43d6 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ConnectionState.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ConnectionState.java
@@ -23,7 +23,12 @@ package org.apache.kafka.clients;
  * CONNECTING: connection is under progress
  * CHECKING_API_VERSIONS: connection has been established and api versions 
check is in progress. Failure of this check will cause connection to close
  * READY: connection is ready to send requests
+ * AUTHENTICATION_FAILED: connection failed due to an authentication error
  */
 public enum ConnectionState {
-    DISCONNECTED, CONNECTING, CHECKING_API_VERSIONS, READY
+    DISCONNECTED, CONNECTING, CHECKING_API_VERSIONS, READY, 
AUTHENTICATION_FAILED;
+
+    public boolean isDisconnected() {
+        return this == AUTHENTICATION_FAILED || this == DISCONNECTED;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/69d2a177/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 
b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
index 2faebfd..0a9b519 100644
--- a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.clients;
 
 import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.requests.AbstractRequest;
 
 import java.io.Closeable;
@@ -30,7 +31,7 @@ public interface KafkaClient extends Closeable {
     /**
      * Check if we are currently ready to send another request to the given 
node but don't attempt to connect if we
      * aren't.
-     * 
+     *
      * @param node The node to check
      * @param now The current timestamp
      */
@@ -39,7 +40,7 @@ public interface KafkaClient extends Closeable {
     /**
      * Initiate a connection to the given node (if necessary), and return true 
if already connected. The readiness of a
      * node will change only when poll is invoked.
-     * 
+     *
      * @param node The node to connect to.
      * @param now The current time
      * @return true iff we are ready to immediately initiate the sending of 
another request to the given node.
@@ -50,7 +51,7 @@ public interface KafkaClient extends Closeable {
      * Returns the number of milliseconds to wait, based on the connection 
state, before attempting to send data. When
      * disconnected, this respects the reconnect backoff time. When connecting 
or connected, this handles slow/stalled
      * connections.
-     * 
+     *
      * @param node The node to check
      * @param now The current timestamp
      * @return The number of milliseconds to wait.
@@ -68,6 +69,15 @@ public interface KafkaClient extends Closeable {
     boolean connectionFailed(Node node);
 
     /**
+     * Check if authentication to this node has failed, based on the 
connection state. Authentication failures are
+     * propagated without any retries.
+     *
+     * @param node the node to check
+     * @return an AuthenticationException iff authentication has failed, null 
otherwise
+     */
+    AuthenticationException authenticationException(Node node);
+
+    /**
      * Queue up the given request for sending. Requests can only be sent on 
ready connections.
      * @param request The request
      * @param now The current timestamp
@@ -76,7 +86,7 @@ public interface KafkaClient extends Closeable {
 
     /**
      * Do actual reads and writes from sockets.
-     * 
+     *
      * @param timeout The maximum amount of time to wait for responses in ms, 
must be non-negative. The implementation
      *                is free to use a lower value if appropriate (common 
reasons for this are a lower request or
      *                metadata update timeout)
@@ -106,7 +116,7 @@ public interface KafkaClient extends Closeable {
      * Choose the node with the fewest outstanding requests. This method will 
prefer a node with an existing connection,
      * but will potentially choose a node for which we don't yet have a 
connection if all existing connections are in
      * use.
-     * 
+     *
      * @param now The current time in ms
      * @return The node with the fewest in-flight requests.
      */
@@ -124,7 +134,7 @@ public interface KafkaClient extends Closeable {
 
     /**
      * Get the total in-flight requests for a particular node
-     * 
+     *
      * @param nodeId The id of the node
      */
     int inFlightRequestCount(String nodeId);

http://git-wip-us.apache.org/repos/asf/kafka/blob/69d2a177/clients/src/main/java/org/apache/kafka/clients/ManualMetadataUpdater.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/ManualMetadataUpdater.java 
b/clients/src/main/java/org/apache/kafka/clients/ManualMetadataUpdater.java
index ed149fb..8252cf3 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ManualMetadataUpdater.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ManualMetadataUpdater.java
@@ -17,8 +17,11 @@
 package org.apache.kafka.clients;
 
 import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.requests.MetadataResponse;
 import org.apache.kafka.common.requests.RequestHeader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -34,6 +37,8 @@ import java.util.List;
  */
 public class ManualMetadataUpdater implements MetadataUpdater {
 
+    private static final Logger log = 
LoggerFactory.getLogger(ManualMetadataUpdater.class);
+
     private List<Node> nodes;
 
     public ManualMetadataUpdater() {
@@ -69,6 +74,13 @@ public class ManualMetadataUpdater implements 
MetadataUpdater {
     }
 
     @Override
+    public void handleAuthenticationFailure(AuthenticationException exception) 
{
+        // We don't fail the broker on authentication failures, but there is 
sufficient information in the broker logs
+        // to identify the failure.
+        log.debug("An authentication error occurred in broker-to-broker 
communication.", exception);
+    }
+
+    @Override
     public void handleCompletedMetadataResponse(RequestHeader requestHeader, 
long now, MetadataResponse response) {
         // Do nothing
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/69d2a177/clients/src/main/java/org/apache/kafka/clients/Metadata.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java 
b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
index b4521c3..5790d88 100644
--- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.internals.ClusterResourceListeners;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -59,6 +60,7 @@ public final class Metadata {
     private int version;
     private long lastRefreshMs;
     private long lastSuccessfulRefreshMs;
+    private AuthenticationException authenticationException;
     private Cluster cluster;
     private boolean needUpdate;
     /* Topics with expiry time */
@@ -146,15 +148,28 @@ public final class Metadata {
     }
 
     /**
+     * If any non-retriable authentication exceptions were encountered during
+     * metadata update, clear and throw the exception.
+     */
+    public synchronized void maybeThrowAuthenticationException() {
+        if (authenticationException != null) {
+            AuthenticationException exception = authenticationException;
+            authenticationException = null;
+            throw exception;
+        }
+    }
+
+    /**
      * Wait for metadata update until the current version is larger than the 
last version we know of
      */
     public synchronized void awaitUpdate(final int lastVersion, final long 
maxWaitMs) throws InterruptedException {
         if (maxWaitMs < 0) {
-            throw new IllegalArgumentException("Max time to wait for metadata 
updates should not be < 0 milli seconds");
+            throw new IllegalArgumentException("Max time to wait for metadata 
updates should not be < 0 milliseconds");
         }
         long begin = System.currentTimeMillis();
         long remainingWaitMs = maxWaitMs;
         while (this.version <= lastVersion) {
+            maybeThrowAuthenticationException();
             if (remainingWaitMs != 0)
                 wait(remainingWaitMs);
             long elapsed = System.currentTimeMillis() - begin;
@@ -256,8 +271,11 @@ public final class Metadata {
      * Record an attempt to update the metadata that failed. We need to keep 
track of this
      * to avoid retrying immediately.
      */
-    public synchronized void failedUpdate(long now) {
+    public synchronized void failedUpdate(long now, AuthenticationException 
authenticationException) {
         this.lastRefreshMs = now;
+        this.authenticationException = authenticationException;
+        if (authenticationException != null)
+            this.notifyAll();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/69d2a177/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java 
b/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java
index 55901b5..cb821d6 100644
--- a/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java
+++ b/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.clients;
 
 import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.requests.MetadataResponse;
 import org.apache.kafka.common.requests.RequestHeader;
 
@@ -62,6 +63,13 @@ interface MetadataUpdater {
     void handleDisconnection(String destination);
 
     /**
+     * Handle authentication failure. Propagate the authentication exception 
if awaiting metadata.
+     *
+     * @param exception authentication exception from broker
+     */
+    void handleAuthenticationFailure(AuthenticationException exception);
+
+    /**
      * If `request` is a metadata request, handles it and returns `true`. 
Otherwise, returns `false`.
      *
      * This provides a mechanism for the `MetadataUpdater` implementation to 
use the NetworkClient instance for its own

http://git-wip-us.apache.org/repos/asf/kafka/blob/69d2a177/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index c3c15df..f8da42c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -18,6 +18,7 @@ package org.apache.kafka.clients;
 
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.network.ChannelState;
@@ -313,6 +314,18 @@ public class NetworkClient implements KafkaClient {
     }
 
     /**
+     * Check if authentication to this node has failed, based on the 
connection state. Authentication failures are
+     * propagated without any retries.
+     *
+     * @param node the node to check
+     * @return an AuthenticationException iff authentication has failed, null 
otherwise
+     */
+    @Override
+    public AuthenticationException authenticationException(Node node) {
+        return connectionStates.authenticationException(node.idString());
+    }
+
+    /**
      * Check if the node with the given id is ready to send more requests.
      *
      * @param node The node
@@ -589,6 +602,7 @@ public class NetworkClient implements KafkaClient {
         nodesNeedingApiVersionsFetch.remove(nodeId);
         switch (disconnectState.state()) {
             case AUTHENTICATION_FAILED:
+                connectionStates.authenticationFailed(nodeId, now, 
disconnectState.exception());
                 log.error("Connection to node {} failed authentication due to: 
{}", nodeId, disconnectState.exception().getMessage());
                 break;
             case AUTHENTICATE:
@@ -610,6 +624,9 @@ public class NetworkClient implements KafkaClient {
             else
                 responses.add(request.disconnected(now));
         }
+        AuthenticationException authenticationException = 
connectionStates.authenticationException(nodeId);
+        if (authenticationException != null)
+            
metadataUpdater.handleAuthenticationFailure(authenticationException);
     }
 
     /**
@@ -848,6 +865,13 @@ public class NetworkClient implements KafkaClient {
         }
 
         @Override
+        public void handleAuthenticationFailure(AuthenticationException 
exception) {
+            metadataFetchInProgress = false;
+            if (metadata.updateRequested())
+                metadata.failedUpdate(time.milliseconds(), exception);
+        }
+
+        @Override
         public void handleCompletedMetadataResponse(RequestHeader 
requestHeader, long now, MetadataResponse response) {
             this.metadataFetchInProgress = false;
             Cluster cluster = response.cluster();
@@ -862,7 +886,7 @@ public class NetworkClient implements KafkaClient {
                 this.metadata.update(cluster, response.unavailableTopics(), 
now);
             } else {
                 log.trace("Ignoring empty metadata response with correlation 
id {}.", requestHeader.correlationId());
-                this.metadata.failedUpdate(now);
+                this.metadata.failedUpdate(now, null);
             }
         }
 
@@ -979,7 +1003,7 @@ public class NetworkClient implements KafkaClient {
         public ClientResponse disconnected(long timeMs) {
             return new ClientResponse(header, callback, destination, 
createdTimeMs, timeMs, true, null, null);
         }
-        
+
         @Override
         public String toString() {
             return "InFlightRequest(header=" + header +

http://git-wip-us.apache.org/repos/asf/kafka/blob/69d2a177/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index c2f2f5f..3ea0394 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -1064,8 +1064,9 @@ public class KafkaConsumer<K, V> implements Consumer<K, 
V> {
      *             function is called
      * @throws org.apache.kafka.common.errors.InterruptException if the 
calling thread is interrupted before or while
      *             this function is called
+     * @throws org.apache.kafka.common.errors.AuthenticationException if 
authentication fails. See the exception for more details
      * @throws org.apache.kafka.common.errors.AuthorizationException if caller 
lacks Read access to any of the subscribed
-     *             topics or to the configured groupId
+     *             topics or to the configured groupId. See the exception for 
more details
      * @throws org.apache.kafka.common.KafkaException for any other 
unrecoverable errors (e.g. invalid groupId or
      *             session timeout, errors deserializing key/value pairs, or 
any new error cases in future versions)
      * @throws java.lang.IllegalArgumentException if the timeout value is 
negative
@@ -1176,8 +1177,9 @@ public class KafkaConsumer<K, V> implements Consumer<K, 
V> {
      *             function is called
      * @throws org.apache.kafka.common.errors.InterruptException if the 
calling thread is interrupted before or while
      *             this function is called
+     * @throws org.apache.kafka.common.errors.AuthenticationException if 
authentication fails. See the exception for more details
      * @throws org.apache.kafka.common.errors.AuthorizationException if not 
authorized to the topic or to the
-     *             configured groupId
+     *             configured groupId. See the exception for more details
      * @throws org.apache.kafka.common.KafkaException for any other 
unrecoverable errors (e.g. if offset metadata
      *             is too large or if the topic does not exist).
      */
@@ -1213,8 +1215,9 @@ public class KafkaConsumer<K, V> implements Consumer<K, 
V> {
      *             function is called
      * @throws org.apache.kafka.common.errors.InterruptException if the 
calling thread is interrupted before or while
      *             this function is called
+     * @throws org.apache.kafka.common.errors.AuthenticationException if 
authentication fails. See the exception for more details
      * @throws org.apache.kafka.common.errors.AuthorizationException if not 
authorized to the topic or to the
-     *             configured groupId
+     *             configured groupId. See the exception for more details
      * @throws java.lang.IllegalArgumentException if the committed offset is 
negative
      * @throws org.apache.kafka.common.KafkaException for any other 
unrecoverable errors (e.g. if offset metadata
      *             is too large or if the topic does not exist).
@@ -1379,8 +1382,9 @@ public class KafkaConsumer<K, V> implements Consumer<K, 
V> {
      *             function is called
      * @throws org.apache.kafka.common.errors.InterruptException if the 
calling thread is interrupted before or while
      *             this function is called
+     * @throws org.apache.kafka.common.errors.AuthenticationException if 
authentication fails. See the exception for more details
      * @throws org.apache.kafka.common.errors.AuthorizationException if not 
authorized to the topic or to the
-     *             configured groupId
+     *             configured groupId. See the exception for more details
      * @throws org.apache.kafka.common.KafkaException for any other 
unrecoverable errors
      */
     public long position(TopicPartition partition) {
@@ -1412,8 +1416,9 @@ public class KafkaConsumer<K, V> implements Consumer<K, 
V> {
      *             function is called
      * @throws org.apache.kafka.common.errors.InterruptException if the 
calling thread is interrupted before or while
      *             this function is called
+     * @throws org.apache.kafka.common.errors.AuthenticationException if 
authentication fails. See the exception for more details
      * @throws org.apache.kafka.common.errors.AuthorizationException if not 
authorized to the topic or to the
-     *             configured groupId
+     *             configured groupId. See the exception for more details
      * @throws org.apache.kafka.common.KafkaException for any other 
unrecoverable errors
      */
     @Override
@@ -1445,7 +1450,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, 
V> {
      *             function is called
      * @throws org.apache.kafka.common.errors.InterruptException if the 
calling thread is interrupted before or while
      *             this function is called
-     * @throws org.apache.kafka.common.errors.AuthorizationException if not 
authorized to the specified topic
+     * @throws org.apache.kafka.common.errors.AuthenticationException if 
authentication fails. See the exception for more details
+     * @throws org.apache.kafka.common.errors.AuthorizationException if not 
authorized to the specified topic. See the exception for more details
      * @throws org.apache.kafka.common.errors.TimeoutException if the topic 
metadata could not be fetched before
      *             expiration of the configured request timeout
      * @throws org.apache.kafka.common.KafkaException for any other 
unrecoverable errors
@@ -1560,6 +1566,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, 
V> {
      * @return a mapping from partition to the timestamp and offset of the 
first message with timestamp greater
      *         than or equal to the target timestamp. {@code null} will be 
returned for the partition if there is no
      *         such message.
+     * @throws AuthenticationException if authentication fails. See the 
exception for more details
      * @throws IllegalArgumentException if the target timestamp is negative.
      * @throws org.apache.kafka.common.errors.TimeoutException if the offset 
metadata could not be fetched before
      *         expiration of the configured request timeout
@@ -1593,6 +1600,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, 
V> {
      *
      * @param partitions the partitions to get the earliest offsets.
      * @return The earliest available offsets for the given partitions
+     * @throws org.apache.kafka.common.errors.AuthenticationException if 
authentication fails. See the exception for more details
      * @throws org.apache.kafka.common.errors.TimeoutException if the offset 
metadata could not be fetched before
      *         expiration of the configured request timeout
      */
@@ -1621,6 +1629,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, 
V> {
      *
      * @param partitions the partitions to get the end offsets.
      * @return The end offsets for the given partitions.
+     * @throws org.apache.kafka.common.errors.AuthenticationException if 
authentication fails. See the exception for more details
      * @throws org.apache.kafka.common.errors.TimeoutException if the offset 
metadata could not be fetched before
      *         expiration of the configured request timeout
      */
@@ -1641,8 +1650,9 @@ public class KafkaConsumer<K, V> implements Consumer<K, 
V> {
      * timeout. See {@link #close(long, TimeUnit)} for details. Note that 
{@link #wakeup()}
      * cannot be used to interrupt close.
      *
+     * @throws org.apache.kafka.common.errors.AuthenticationException if 
authentication fails. See the exception for more details
      * @throws org.apache.kafka.common.errors.InterruptException if the 
calling thread is interrupted
-     * before or while this function is called
+     *             before or while this function is called
      */
     @Override
     public void close() {
@@ -1660,6 +1670,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, 
V> {
      * @param timeout The maximum time to wait for consumer to close 
gracefully. The value must be
      *                non-negative. Specifying a timeout of zero means do not 
wait for pending requests to complete.
      * @param timeUnit The time unit for the {@code timeout}
+     * @throws AuthenticationException if authentication fails. See the 
exception for more details
      * @throws InterruptException If the thread is interrupted before or while 
this function is called
      * @throws IllegalArgumentException If the {@code timeout} is negative.
      */
@@ -1729,6 +1740,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, 
V> {
      * or reset it using the offset reset policy the user has configured.
      *
      * @param partitions The partitions that needs updating fetch positions
+     * @throws AuthenticationException if authentication fails. See the 
exception for more details
      * @throws NoOffsetForPartitionException If no offset is stored for a 
given partition and no offset reset policy is
      *             defined
      */

http://git-wip-us.apache.org/repos/asf/kafka/blob/69d2a177/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetCommitCallback.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetCommitCallback.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetCommitCallback.java
index 2fef79e..b217a63 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetCommitCallback.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetCommitCallback.java
@@ -42,7 +42,7 @@ public interface OffsetCommitCallback {
      * @throws org.apache.kafka.common.errors.InterruptException if the 
calling thread is interrupted before or while
      *             this function is called
      * @throws org.apache.kafka.common.errors.AuthorizationException if not 
authorized to the topic or to the
-     *             configured groupId
+     *             configured groupId. See the exception for more details
      * @throws org.apache.kafka.common.KafkaException for any other 
unrecoverable errors (e.g. if offset metadata
      *             is too large or if the committed offset is invalid).
      */

http://git-wip-us.apache.org/repos/asf/kafka/blob/69d2a177/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index bf75242..86e4f2b 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -19,6 +19,7 @@ package org.apache.kafka.clients.consumer.internals;
 import org.apache.kafka.clients.ClientResponse;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.errors.DisconnectException;
 import org.apache.kafka.common.errors.GroupAuthorizationException;
 import org.apache.kafka.common.errors.IllegalGenerationException;
@@ -982,6 +983,12 @@ public abstract class AbstractCoordinator implements 
Closeable {
                         }
                     }
                 }
+            } catch (AuthenticationException e) {
+                log.error("An authentication error occurred in the heartbeat 
thread", e);
+                this.failed.set(e);
+            } catch (GroupAuthorizationException e) {
+                log.error("A group authorization error occurred in the 
heartbeat thread for group {}", groupId, e);
+                this.failed.set(e);
             } catch (InterruptedException | InterruptException e) {
                 Thread.interrupted();
                 log.error("Unexpected interrupt received in heartbeat thread 
for group {}", groupId, e);

http://git-wip-us.apache.org/repos/asf/kafka/blob/69d2a177/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index e740ba7..38ca041 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -574,7 +574,7 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
      * or an unrecoverable error is encountered.
      * @param offsets The offsets to be committed
      * @throws org.apache.kafka.common.errors.AuthorizationException if the 
consumer is not authorized to the group
-     *             or to any of the specified partitions
+     *             or to any of the specified partitions. See the exception 
for more details
      * @throws CommitFailedException if an unrecoverable error occurs before 
the commit can be completed
      * @return If the offset commit was successfully sent and a successful 
response was received from
      *         the coordinator

http://git-wip-us.apache.org/repos/asf/kafka/blob/69d2a177/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
index 803a853..43c6358 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
@@ -22,6 +22,7 @@ import org.apache.kafka.clients.KafkaClient;
 import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.RequestCompletionHandler;
 import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.errors.DisconnectException;
 import org.apache.kafka.common.errors.InterruptException;
 import org.apache.kafka.common.errors.TimeoutException;
@@ -134,6 +135,7 @@ public class ConsumerNetworkClient implements Closeable {
         int version = this.metadata.requestUpdate();
         do {
             poll(timeout);
+            this.metadata.maybeThrowAuthenticationException();
         } while (this.metadata.version() == version && time.milliseconds() - 
startMs < timeout);
         return this.metadata.version() > version;
     }
@@ -367,9 +369,13 @@ public class ConsumerNetworkClient implements Closeable {
                 Collection<ClientRequest> requests = unsent.remove(node);
                 for (ClientRequest request : requests) {
                     RequestFutureCompletionHandler handler = 
(RequestFutureCompletionHandler) request.callback();
-                    handler.onComplete(new 
ClientResponse(request.makeHeader(request.requestBuilder().desiredOrLatestVersion()),
-                        request.callback(), request.destination(), 
request.createdTimeMs(), now, true,
-                        null, null));
+                    AuthenticationException authenticationException = 
client.authenticationException(node);
+                    if (authenticationException != null)
+                        handler.onFailure(authenticationException);
+                    else
+                        handler.onComplete(new 
ClientResponse(request.makeHeader(request.requestBuilder().desiredOrLatestVersion()),
+                            request.callback(), request.destination(), 
request.createdTimeMs(), now, true,
+                            null, null));
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/69d2a177/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 7dcec5c..66760e2 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -537,7 +537,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
      * @throws org.apache.kafka.common.errors.UnsupportedVersionException 
fatal error indicating the broker
      *         does not support transactions (i.e. if its version is lower 
than 0.11.0.0)
      * @throws org.apache.kafka.common.errors.AuthorizationException fatal 
error indicating that the configured
-     *         transactional.id is not authorized
+     *         transactional.id is not authorized. See the exception for more 
details
      * @throws KafkaException if the producer has encountered a previous fatal 
error or for any other unexpected error
      */
     public void initTransactions() {
@@ -557,7 +557,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
      * @throws org.apache.kafka.common.errors.UnsupportedVersionException 
fatal error indicating the broker
      *         does not support transactions (i.e. if its version is lower 
than 0.11.0.0)
      * @throws org.apache.kafka.common.errors.AuthorizationException fatal 
error indicating that the configured
-     *         transactional.id is not authorized
+     *         transactional.id is not authorized. See the exception for more 
details
      * @throws KafkaException if the producer has encountered a previous fatal 
error or for any other unexpected error
      */
     public void beginTransaction() throws ProducerFencedException {
@@ -585,7 +585,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
      * @throws 
org.apache.kafka.common.errors.UnsupportedForMessageFormatException  fatal 
error indicating the message
      *         format used for the offsets topic on the broker does not 
support transactions
      * @throws org.apache.kafka.common.errors.AuthorizationException fatal 
error indicating that the configured
-     *         transactional.id is not authorized
+     *         transactional.id is not authorized. See the exception for more 
details
      * @throws KafkaException if the producer has encountered a previous fatal 
or abortable error, or for any
      *         other unexpected error
      */
@@ -609,7 +609,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
      * @throws org.apache.kafka.common.errors.UnsupportedVersionException 
fatal error indicating the broker
      *         does not support transactions (i.e. if its version is lower 
than 0.11.0.0)
      * @throws org.apache.kafka.common.errors.AuthorizationException fatal 
error indicating that the configured
-     *         transactional.id is not authorized
+     *         transactional.id is not authorized. See the exception for more 
details
      * @throws KafkaException if the producer has encountered a previous fatal 
or abortable error, or for any
      *         other unexpected error
      */
@@ -630,7 +630,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
      * @throws org.apache.kafka.common.errors.UnsupportedVersionException 
fatal error indicating the broker
      *         does not support transactions (i.e. if its version is lower 
than 0.11.0.0)
      * @throws org.apache.kafka.common.errors.AuthorizationException fatal 
error indicating that the configured
-     *         transactional.id is not authorized
+     *         transactional.id is not authorized. See the exception for more 
details
      * @throws KafkaException if the producer has encountered a previous fatal 
error or for any other unexpected error
      */
     public void abortTransaction() throws ProducerFencedException {
@@ -746,6 +746,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
      * @param callback A user-supplied callback to execute when the record has 
been acknowledged by the server (null
      *        indicates no callback)
      *
+     * @throws AuthenticationException if authentication fails. See the 
exception for more details
      * @throws IllegalStateException if a transactional.id has been configured 
and no transaction has been started
      * @throws InterruptException If the thread is interrupted while blocked
      * @throws SerializationException If the key or value are not valid 
objects given the configured serializers
@@ -968,6 +969,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
 
     /**
      * Get the partition metadata for the given topic. This can be used for 
custom partitioning.
+     * @throws AuthenticationException if authentication fails. See the 
exception for more details
      * @throws InterruptException If the thread is interrupted while blocked
      */
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/69d2a177/clients/src/main/java/org/apache/kafka/common/network/ChannelState.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/network/ChannelState.java 
b/clients/src/main/java/org/apache/kafka/common/network/ChannelState.java
index 4370fd8..08ed1a0 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/ChannelState.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/ChannelState.java
@@ -16,7 +16,7 @@
  */
 package org.apache.kafka.common.network;
 
-import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.AuthenticationException;
 
 /**
  * States for KafkaChannel:
@@ -73,11 +73,11 @@ public class ChannelState {
     public static final ChannelState LOCAL_CLOSE = new 
ChannelState(State.LOCAL_CLOSE);
 
     private final State state;
-    private final ApiException exception;
+    private final AuthenticationException exception;
     public ChannelState(State state) {
         this(state, null);
     }
-    public ChannelState(State state, ApiException exception) {
+    public ChannelState(State state, AuthenticationException exception) {
         this.state = state;
         this.exception = exception;
     }
@@ -86,7 +86,7 @@ public class ChannelState {
         return state;
     }
 
-    public ApiException exception() {
+    public AuthenticationException exception() {
         return exception;
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/69d2a177/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java 
b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
index 407eb9f..3f2a94c 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
@@ -135,7 +135,7 @@ public class MetadataTest {
         long now = 10000;
 
         // lastRefreshMs updated to now.
-        metadata.failedUpdate(now);
+        metadata.failedUpdate(now, null);
 
         // Backing off. Remaining time until next try should be returned.
         assertEquals(refreshBackoffMs, metadata.timeToNextUpdate(now));
@@ -216,7 +216,7 @@ public class MetadataTest {
         metadata.update(Cluster.empty(), Collections.<String>emptySet(), time);
 
         assertEquals(100, metadata.timeToNextUpdate(1000));
-        metadata.failedUpdate(1100);
+        metadata.failedUpdate(1100, null);
 
         assertEquals(100, metadata.timeToNextUpdate(1100));
         assertEquals(100, metadata.lastSuccessfulUpdate());

http://git-wip-us.apache.org/repos/asf/kafka/blob/69d2a177/clients/src/test/java/org/apache/kafka/clients/MockClient.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java 
b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index 71e32ff..66ff253 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -18,6 +18,7 @@ package org.apache.kafka.clients;
 
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.requests.AbstractRequest;
 import org.apache.kafka.common.requests.AbstractResponse;
@@ -135,6 +136,11 @@ public class MockClient implements KafkaClient {
     }
 
     @Override
+    public AuthenticationException authenticationException(Node node) {
+        return null;
+    }
+
+    @Override
     public void disconnect(String node) {
         long now = time.milliseconds();
         Iterator<ClientRequest> iter = requests.iterator();

http://git-wip-us.apache.org/repos/asf/kafka/blob/69d2a177/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java
 
b/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java
new file mode 100644
index 0000000..b4460f7
--- /dev/null
+++ 
b/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security.authenticator;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
+import org.apache.kafka.common.errors.AuthenticationFailedException;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.network.NetworkTestUtils;
+import org.apache.kafka.common.network.NioEchoServer;
+import org.apache.kafka.common.protocol.SecurityProtocol;
+import org.apache.kafka.common.security.TestSecurityConfig;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class ClientAuthenticationFailureTest {
+
+    private NioEchoServer server;
+    private Map<String, Object> saslServerConfigs;
+    private Map<String, Object> saslClientConfigs;
+    private final String topic = "test";
+    private TestJaasConfig testJaasConfig;
+
+    @Before
+    public void setup() throws Exception {
+        SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT;
+
+        saslServerConfigs = new HashMap<>();
+        
saslServerConfigs.put(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG, 
Arrays.asList("PLAIN"));
+
+        saslClientConfigs = new HashMap<>();
+        saslClientConfigs.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
"SASL_PLAINTEXT");
+        saslClientConfigs.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
+
+        testJaasConfig = TestJaasConfig.createConfiguration("PLAIN", 
Arrays.asList("PLAIN"));
+        testJaasConfig.setClientOptions("PLAIN", TestJaasConfig.USERNAME, 
"anotherpassword");
+        server = createEchoServer(securityProtocol);
+    }
+
+    @After
+    public void teardown() throws Exception {
+        if (server != null)
+            server.close();
+    }
+
+    @Test
+    public void testConsumerWithInvalidCredentials() {
+        saslClientConfigs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:" + server.port());
+        saslClientConfigs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringDeserializer");
+        saslClientConfigs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringDeserializer");
+
+        try (KafkaConsumer<String, String> consumer = new 
KafkaConsumer<>(saslClientConfigs)) {
+            consumer.subscribe(Arrays.asList(topic));
+            consumer.poll(100);
+            fail("Expected an authentication error!");
+        } catch (AuthenticationFailedException e) {
+            // OK
+        } catch (Exception e) {
+            fail("Expected only an authentication error, but another error 
occurred: " + e.getMessage());
+        }
+    }
+
+    @Test
+    public void testProducerWithInvalidCredentials() {
+        saslClientConfigs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:" + server.port());
+        saslClientConfigs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer");
+        saslClientConfigs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer");
+
+        ProducerRecord<String, String> record = new ProducerRecord<>(topic, 
"message");
+        try (KafkaProducer<String, String> producer = new 
KafkaProducer<>(saslClientConfigs)) {
+            producer.send(record).get();
+            fail("Expected an authentication error!");
+        } catch (Exception e) {
+            assertTrue("Expected an exception of type 
AuthenticationFailedException", e.getCause() instanceof 
AuthenticationFailedException);
+        }
+    }
+
+    private NioEchoServer createEchoServer(SecurityProtocol securityProtocol) 
throws Exception {
+        return 
createEchoServer(ListenerName.forSecurityProtocol(securityProtocol), 
securityProtocol);
+    }
+
+    private NioEchoServer createEchoServer(ListenerName listenerName, 
SecurityProtocol securityProtocol) throws Exception {
+        return NetworkTestUtils.createEchoServer(listenerName, 
securityProtocol,
+                new TestSecurityConfig(saslServerConfigs), new 
CredentialCache());
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/69d2a177/core/src/main/scala/kafka/admin/AdminClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala 
b/core/src/main/scala/kafka/admin/AdminClient.scala
index fadacec..24149d7 100644
--- a/core/src/main/scala/kafka/admin/AdminClient.scala
+++ b/core/src/main/scala/kafka/admin/AdminClient.scala
@@ -26,7 +26,7 @@ import org.apache.kafka.clients._
 import org.apache.kafka.clients.consumer.internals.{ConsumerNetworkClient, 
ConsumerProtocol, RequestFuture, RequestFutureAdapter}
 import org.apache.kafka.common.config.ConfigDef.{Importance, Type}
 import org.apache.kafka.common.config.{AbstractConfig, ConfigDef}
-import org.apache.kafka.common.errors.TimeoutException
+import org.apache.kafka.common.errors.{AuthenticationException, 
TimeoutException}
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.network.Selector
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
@@ -57,9 +57,8 @@ class AdminClient(val time: Time,
   val networkThread = new KafkaThread("admin-client-network-thread", new 
Runnable {
     override def run() {
       try {
-        while (running) {
+        while (running)
           client.poll(Long.MaxValue)
-        }
       } catch {
         case t : Throwable =>
           error("admin-client-network-thread exited", t)
@@ -96,6 +95,8 @@ class AdminClient(val time: Time,
       try {
         return send(broker, api, request)
       } catch {
+        case e: AuthenticationException =>
+          throw e
         case e: Exception =>
           debug(s"Request $api failed against node $broker", e)
       }

http://git-wip-us.apache.org/repos/asf/kafka/blob/69d2a177/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala 
b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
index a0818bc..c014caf 100755
--- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
@@ -31,7 +31,7 @@ import kafka.metrics.KafkaMetricsReporter
 import kafka.utils._
 import kafka.utils.Implicits._
 import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
-import org.apache.kafka.common.errors.WakeupException
+import org.apache.kafka.common.errors.{AuthenticationException, 
WakeupException}
 import org.apache.kafka.common.record.TimestampType
 import org.apache.kafka.common.serialization.Deserializer
 import org.apache.kafka.common.utils.Utils
@@ -53,6 +53,9 @@ object ConsoleConsumer extends Logging {
     try {
       run(conf)
     } catch {
+      case e: AuthenticationException =>
+        error("Authentication failed: terminating consumer process", e)
+        Exit.exit(1)
       case e: Throwable =>
         error("Unknown error when running consumer: ", e)
         Exit.exit(1)

http://git-wip-us.apache.org/repos/asf/kafka/blob/69d2a177/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
 
b/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
new file mode 100644
index 0000000..52fbdba
--- /dev/null
+++ 
b/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
@@ -0,0 +1,176 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
+  * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
+  * to You under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
+  * License. You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
+  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
+  * specific language governing permissions and limitations under the License.
+  */
+package kafka.api
+
+import java.io.FileOutputStream
+import java.util.concurrent.{ExecutionException, Future, TimeUnit}
+import scala.collection.JavaConverters.seqAsJavaListConverter
+
+import org.apache.kafka.clients.consumer.{KafkaConsumer, ConsumerConfig}
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, 
RecordMetadata}
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.AuthenticationFailedException
+import org.apache.kafka.common.protocol.SecurityProtocol
+import org.apache.kafka.common.serialization.ByteArrayDeserializer
+import org.junit.{After, Before, Test}
+import org.junit.Assert._
+
+import kafka.admin.ConsumerGroupCommand.{ConsumerGroupCommandOptions, 
KafkaConsumerGroupService}
+import kafka.server.KafkaConfig
+import kafka.utils.{JaasTestUtils, TestUtils, ZkUtils}
+
+class SaslClientsWithInvalidCredentialsTest extends IntegrationTestHarness 
with SaslSetup {
+  private val kafkaClientSaslMechanism = "SCRAM-SHA-256"
+  private val kafkaServerSaslMechanisms = List(kafkaClientSaslMechanism)
+  override protected val securityProtocol = SecurityProtocol.SASL_PLAINTEXT
+  override protected val serverSaslProperties = 
Some(kafkaServerSaslProperties(kafkaServerSaslMechanisms, 
kafkaClientSaslMechanism))
+  override protected val clientSaslProperties = 
Some(kafkaClientSaslProperties(kafkaClientSaslMechanism))
+  val consumerCount = 1
+  val producerCount = 1
+  val serverCount = 1
+
+  this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest")
+
+  val topic = "topic"
+  val tp = new TopicPartition(topic, 0)
+
+  override def configureSecurityBeforeServersStart() {
+    super.configureSecurityBeforeServersStart()
+    zkUtils.makeSurePersistentPathExists(ZkUtils.ConfigChangesPath)
+    // Create broker credentials before starting brokers
+    createScramCredentials(zkConnect, JaasTestUtils.KafkaScramAdmin, 
JaasTestUtils.KafkaScramAdminPassword)
+  }
+
+  @Before
+  override def setUp(): Unit = {
+    startSasl(jaasSections(kafkaServerSaslMechanisms, 
Some(kafkaClientSaslMechanism), Both,
+      JaasTestUtils.KafkaServerContextName))
+    super.setUp()
+    TestUtils.createTopic(this.zkUtils, topic, 1, serverCount, this.servers)
+  }
+
+  @After
+  override def tearDown(): Unit = {
+    super.tearDown()
+    closeSasl()
+  }
+
+  @Test
+  def testProducerWithAuthenticationFailure() {
+    verifyAuthenticationException(() => sendOneRecord(10000))
+
+    createClientCredential()
+    verifyWithRetry(() => sendOneRecord())
+  }
+
+  @Test
+  def testConsumerWithAuthenticationFailure() {
+    val consumer = this.consumers.head
+    consumer.subscribe(List(topic).asJava)
+    verifyConsumerWithAuthenticationFailure(consumer)
+  }
+
+  @Test
+  def testManualAssignmentConsumerWithAuthenticationFailure() {
+    val consumer = this.consumers.head
+    consumer.assign(List(tp).asJava)
+    verifyConsumerWithAuthenticationFailure(consumer)
+  }
+
+  @Test
+  def 
testManualAssignmentConsumerWithAutoCommitDisabledWithAuthenticationFailure() {
+    this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
false.toString)
+    val consumer = new KafkaConsumer(this.consumerConfig, new 
ByteArrayDeserializer(), new ByteArrayDeserializer())
+    consumers += consumer
+    consumer.assign(List(tp).asJava)
+    consumer.seek(tp, 0)
+
+    verifyConsumerWithAuthenticationFailure(consumer)
+  }
+
+  private def verifyConsumerWithAuthenticationFailure(consumer: 
KafkaConsumer[Array[Byte], Array[Byte]]) {
+    verifyAuthenticationException(() => consumer.poll(10000))
+
+    createClientCredential()
+    verifyWithRetry(() => sendOneRecord())
+    verifyWithRetry(() => assertEquals(1, consumer.poll(1000).count))
+  }
+
+  @Test
+  def testConsumerGroupServiceWithAuthenticationFailure() {
+    val propsFile = TestUtils.tempFile()
+    val propsStream = new FileOutputStream(propsFile)
+    propsStream.write("security.protocol=SASL_PLAINTEXT\n".getBytes())
+    propsStream.write(s"sasl.mechanism=$kafkaClientSaslMechanism".getBytes())
+    propsStream.close()
+
+    val cgcArgs = Array("--bootstrap-server", brokerList,
+                        "--describe",
+                        "--group", "test.group",
+                        "--command-config", propsFile.getAbsolutePath)
+    val opts = new ConsumerGroupCommandOptions(cgcArgs)
+    val consumerGroupService = new KafkaConsumerGroupService(opts)
+
+    val consumer = consumers.head
+    consumer.subscribe(List(topic).asJava)
+
+    verifyAuthenticationException(() => consumerGroupService.listGroups)
+    createClientCredential()
+    verifyWithRetry(() => consumer.poll(1000))
+    assertEquals(1, consumerGroupService.listGroups.size)
+  }
+
+  private def createClientCredential(): Unit = {
+    createScramCredentials(zkConnect, JaasTestUtils.KafkaScramUser2, 
JaasTestUtils.KafkaScramPassword2)
+  }
+
+  private def sendOneRecord(maxWaitMs: Long = 15000): Unit = {
+    val producer = this.producers.head
+    val record = new ProducerRecord(tp.topic(), tp.partition(), 0L, 
"key".getBytes, "value".getBytes)
+    val future = producer.send(record)
+    producer.flush()
+    try {
+      val recordMetadata = future.get(maxWaitMs, TimeUnit.MILLISECONDS)
+      assertTrue(s"Invalid offset $recordMetadata", recordMetadata.offset >= 0)
+    } catch {
+      case e: ExecutionException => throw e.getCause
+    }
+  }
+
+  private def verifyAuthenticationException(f: () => Unit): Unit = {
+    val startMs = System.currentTimeMillis
+    try {
+      f()
+      fail("Expected an authentication exception")
+    } catch {
+      case e: AuthenticationFailedException =>
+        // expected exception
+        val elapsedMs = System.currentTimeMillis - startMs
+        assertTrue(s"Poll took too long, elapsed=$elapsedMs", elapsedMs <= 
5000)
+        assertTrue(s"Exception message not useful: $e", 
e.getMessage.contains("invalid credentials"))
+    }
+  }
+
+  private def verifyWithRetry(f: () => Unit): Unit = {
+    var attempts = 0
+    TestUtils.waitUntilTrue(() => {
+      try {
+        attempts += 1
+        f()
+        true
+      } catch {
+        case _: AuthenticationFailedException => false
+      }
+    }, s"Operation did not succeed within timeout after $attempts")
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/69d2a177/core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala
 
b/core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala
index 0bc4e50..ed1c77b 100644
--- 
a/core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala
@@ -35,22 +35,14 @@ class SaslScramSslEndToEndAuthorizationTest extends 
SaslEndToEndAuthorizationTes
     super.configureSecurityBeforeServersStart()
     zkUtils.makeSurePersistentPathExists(ZkUtils.ConfigChangesPath)
     // Create broker credentials before starting brokers
-    ConfigCommand.main(configCommandArgs(kafkaPrincipal, kafkaPassword))
+    createScramCredentials(zkConnect, kafkaPrincipal, kafkaPassword)
   }
 
   @Before
   override def setUp() {
     super.setUp()
     // Create client credentials after starting brokers so that dynamic 
credential creation is also tested
-    ConfigCommand.main(configCommandArgs(clientPrincipal, clientPassword))
-    ConfigCommand.main(configCommandArgs(JaasTestUtils.KafkaScramUser2, 
JaasTestUtils.KafkaScramPassword2))
-  }
-
-  private def configCommandArgs(username: String, password: String) : 
Array[String] = {
-    val credentials = kafkaServerSaslMechanisms.map(m => 
s"$m=[iterations=4096,password=$password]")
-    Array("--zookeeper", zkConnect,
-          "--alter", "--add-config", credentials.mkString(","),
-          "--entity-type", "users",
-          "--entity-name", username)
+    createScramCredentials(zkConnect, clientPrincipal, clientPassword)
+    createScramCredentials(zkConnect, JaasTestUtils.KafkaScramUser2, 
JaasTestUtils.KafkaScramPassword2)
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/69d2a177/core/src/test/scala/integration/kafka/api/SaslSetup.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/SaslSetup.scala 
b/core/src/test/scala/integration/kafka/api/SaslSetup.scala
index f874f4e..51deb85 100644
--- a/core/src/test/scala/integration/kafka/api/SaslSetup.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslSetup.scala
@@ -21,14 +21,16 @@ import java.io.File
 import java.util.Properties
 import javax.security.auth.login.Configuration
 
+import kafka.admin.ConfigCommand
 import kafka.security.minikdc.MiniKdc
 import kafka.server.KafkaConfig
 import kafka.utils.JaasTestUtils.{JaasSection, Krb5LoginModule, ZkDigestModule}
 import kafka.utils.{JaasTestUtils, TestUtils}
-import org.apache.kafka.common.security.JaasUtils
-import org.apache.kafka.common.security.authenticator.LoginManager
 import org.apache.kafka.common.config.SaslConfigs
 import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
+import org.apache.kafka.common.security.JaasUtils
+import org.apache.kafka.common.security.authenticator.LoginManager
+import org.apache.kafka.common.security.scram.ScramMechanism
 
 /*
  * Implements an enumeration for the modes enabled here:
@@ -135,4 +137,13 @@ trait SaslSetup {
   def jaasClientLoginModule(clientSaslMechanism: String): String =
     JaasTestUtils.clientLoginModule(clientSaslMechanism, clientKeytabFile)
 
+  def createScramCredentials(zkConnect: String, userName: String, password: 
String): Unit = {
+    val credentials = ScramMechanism.values.map(m => 
s"${m.mechanismName}=[iterations=4096,password=$password]")
+    val args = Array("--zookeeper", zkConnect,
+      "--alter", "--add-config", credentials.mkString(","),
+      "--entity-type", "users",
+      "--entity-name", userName)
+    ConfigCommand.main(args)
+  }
+
 }

Reply via email to