Repository: kafka Updated Branches: refs/heads/trunk c2af13ed0 -> 69d2a1771
KAFKA-5854; Handle SASL authentication failures as non-retriable exceptions in clients This PR implements the client-side of KIP-152, by modifying `KafkaConsumer`, `KafkaProducer`, and `ConsumerGroupCommand` to throw a non-retriable exception when SASL authentication fails. This PR is co-authored with rajinisivaram. Author: Vahid Hashemian <vahidhashem...@us.ibm.com> Reviewers: Jason Gustafson <ja...@confluent.io>, Rajini Sivaram <rajinisiva...@googlemail.com>, Ismael Juma <ism...@juma.me.uk>, tedyu <yuzhih...@gmail.com> Closes #3832 from vahidhashemian/KAFKA-5854 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/69d2a177 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/69d2a177 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/69d2a177 Branch: refs/heads/trunk Commit: 69d2a177101eb1c29b59b4c64d8c22f6d5e3d281 Parents: c2af13e Author: Vahid Hashemian <vahidhashem...@us.ibm.com> Authored: Wed Sep 20 22:51:53 2017 +0100 Committer: Rajini Sivaram <rajinisiva...@googlemail.com> Committed: Wed Sep 20 22:51:53 2017 +0100 ---------------------------------------------------------------------- .../kafka/clients/ClusterConnectionStates.java | 44 ++++- .../apache/kafka/clients/ConnectionState.java | 7 +- .../org/apache/kafka/clients/KafkaClient.java | 22 ++- .../kafka/clients/ManualMetadataUpdater.java | 12 ++ .../java/org/apache/kafka/clients/Metadata.java | 22 ++- .../apache/kafka/clients/MetadataUpdater.java | 8 + .../org/apache/kafka/clients/NetworkClient.java | 28 ++- .../kafka/clients/consumer/KafkaConsumer.java | 26 ++- .../clients/consumer/OffsetCommitCallback.java | 2 +- .../consumer/internals/AbstractCoordinator.java | 7 + .../consumer/internals/ConsumerCoordinator.java | 2 +- .../internals/ConsumerNetworkClient.java | 12 +- .../kafka/clients/producer/KafkaProducer.java | 12 +- .../kafka/common/network/ChannelState.java | 10 +- .../org/apache/kafka/clients/MetadataTest.java | 4 +- .../org/apache/kafka/clients/MockClient.java | 6 + .../ClientAuthenticationFailureTest.java | 114 ++++++++++++ .../main/scala/kafka/admin/AdminClient.scala | 7 +- .../scala/kafka/tools/ConsoleConsumer.scala | 5 +- .../SaslClientsWithInvalidCredentialsTest.scala | 176 +++++++++++++++++++ .../SaslScramSslEndToEndAuthorizationTest.scala | 14 +- .../scala/integration/kafka/api/SaslSetup.scala | 15 +- 22 files changed, 496 insertions(+), 59 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/69d2a177/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java index 4d4bedd..5bc25de 100644 --- a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java +++ b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java @@ -17,12 +17,15 @@ package org.apache.kafka.clients; import java.util.concurrent.ThreadLocalRandom; + +import org.apache.kafka.common.errors.AuthenticationException; + import java.util.HashMap; import java.util.Map; /** * The state of our connection to each node in the cluster. - * + * */ final class ClusterConnectionStates { private final long reconnectBackoffInitMs; @@ -50,7 +53,8 @@ final class ClusterConnectionStates { if (state == null) return true; else - return state.state == ConnectionState.DISCONNECTED && now - state.lastConnectAttemptMs >= state.reconnectBackoffMs; + return state.state.isDisconnected() && + now - state.lastConnectAttemptMs >= state.reconnectBackoffMs; } /** @@ -63,7 +67,8 @@ final class ClusterConnectionStates { if (state == null) return false; else - return state.state == ConnectionState.DISCONNECTED && now - state.lastConnectAttemptMs < state.reconnectBackoffMs; + return state.state.isDisconnected() && + now - state.lastConnectAttemptMs < state.reconnectBackoffMs; } /** @@ -77,7 +82,7 @@ final class ClusterConnectionStates { NodeConnectionState state = nodeState.get(id); if (state == null) return 0; long timeWaited = now - state.lastConnectAttemptMs; - if (state.state == ConnectionState.DISCONNECTED) { + if (state.state.isDisconnected()) { return Math.max(state.reconnectBackoffMs - timeWaited, 0); } else { // When connecting or connected, we should be able to delay indefinitely since other events (connection or @@ -136,6 +141,20 @@ final class ClusterConnectionStates { } /** + * Enter the authentication failed state for the given node. + * @param id the connection identifier + * @param now the current time + * @param exception the authentication exception + */ + public void authenticationFailed(String id, long now, AuthenticationException exception) { + NodeConnectionState nodeState = nodeState(id); + nodeState.authenticationException = exception; + nodeState.state = ConnectionState.AUTHENTICATION_FAILED; + nodeState.lastConnectAttemptMs = now; + updateReconnectBackoff(nodeState); + } + + /** * Return true if the connection is ready. * @param id the connection identifier */ @@ -162,7 +181,16 @@ final class ClusterConnectionStates { */ public boolean isDisconnected(String id) { NodeConnectionState state = nodeState.get(id); - return state != null && state.state == ConnectionState.DISCONNECTED; + return state != null && state.state.isDisconnected(); + } + + /** + * Return authentication exception if an authentication error occurred + * @param id The id of the node to check + */ + public AuthenticationException authenticationException(String id) { + NodeConnectionState state = nodeState.get(id); + return state != null ? state.authenticationException : null; } /** @@ -205,7 +233,7 @@ final class ClusterConnectionStates { public void remove(String id) { nodeState.remove(id); } - + /** * Get the state of a given connection. * @param id the id of the connection @@ -225,19 +253,21 @@ final class ClusterConnectionStates { throw new IllegalStateException("No entry found for connection " + id); return state; } - + /** * The state of our connection to a node. */ private static class NodeConnectionState { ConnectionState state; + AuthenticationException authenticationException; long lastConnectAttemptMs; long failedAttempts; long reconnectBackoffMs; public NodeConnectionState(ConnectionState state, long lastConnectAttempt, long reconnectBackoffMs) { this.state = state; + this.authenticationException = null; this.lastConnectAttemptMs = lastConnectAttempt; this.failedAttempts = 0; this.reconnectBackoffMs = reconnectBackoffMs; http://git-wip-us.apache.org/repos/asf/kafka/blob/69d2a177/clients/src/main/java/org/apache/kafka/clients/ConnectionState.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/ConnectionState.java b/clients/src/main/java/org/apache/kafka/clients/ConnectionState.java index 62ffada..28b43d6 100644 --- a/clients/src/main/java/org/apache/kafka/clients/ConnectionState.java +++ b/clients/src/main/java/org/apache/kafka/clients/ConnectionState.java @@ -23,7 +23,12 @@ package org.apache.kafka.clients; * CONNECTING: connection is under progress * CHECKING_API_VERSIONS: connection has been established and api versions check is in progress. Failure of this check will cause connection to close * READY: connection is ready to send requests + * AUTHENTICATION_FAILED: connection failed due to an authentication error */ public enum ConnectionState { - DISCONNECTED, CONNECTING, CHECKING_API_VERSIONS, READY + DISCONNECTED, CONNECTING, CHECKING_API_VERSIONS, READY, AUTHENTICATION_FAILED; + + public boolean isDisconnected() { + return this == AUTHENTICATION_FAILED || this == DISCONNECTED; + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/69d2a177/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java index 2faebfd..0a9b519 100644 --- a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java @@ -17,6 +17,7 @@ package org.apache.kafka.clients; import org.apache.kafka.common.Node; +import org.apache.kafka.common.errors.AuthenticationException; import org.apache.kafka.common.requests.AbstractRequest; import java.io.Closeable; @@ -30,7 +31,7 @@ public interface KafkaClient extends Closeable { /** * Check if we are currently ready to send another request to the given node but don't attempt to connect if we * aren't. - * + * * @param node The node to check * @param now The current timestamp */ @@ -39,7 +40,7 @@ public interface KafkaClient extends Closeable { /** * Initiate a connection to the given node (if necessary), and return true if already connected. The readiness of a * node will change only when poll is invoked. - * + * * @param node The node to connect to. * @param now The current time * @return true iff we are ready to immediately initiate the sending of another request to the given node. @@ -50,7 +51,7 @@ public interface KafkaClient extends Closeable { * Returns the number of milliseconds to wait, based on the connection state, before attempting to send data. When * disconnected, this respects the reconnect backoff time. When connecting or connected, this handles slow/stalled * connections. - * + * * @param node The node to check * @param now The current timestamp * @return The number of milliseconds to wait. @@ -68,6 +69,15 @@ public interface KafkaClient extends Closeable { boolean connectionFailed(Node node); /** + * Check if authentication to this node has failed, based on the connection state. Authentication failures are + * propagated without any retries. + * + * @param node the node to check + * @return an AuthenticationException iff authentication has failed, null otherwise + */ + AuthenticationException authenticationException(Node node); + + /** * Queue up the given request for sending. Requests can only be sent on ready connections. * @param request The request * @param now The current timestamp @@ -76,7 +86,7 @@ public interface KafkaClient extends Closeable { /** * Do actual reads and writes from sockets. - * + * * @param timeout The maximum amount of time to wait for responses in ms, must be non-negative. The implementation * is free to use a lower value if appropriate (common reasons for this are a lower request or * metadata update timeout) @@ -106,7 +116,7 @@ public interface KafkaClient extends Closeable { * Choose the node with the fewest outstanding requests. This method will prefer a node with an existing connection, * but will potentially choose a node for which we don't yet have a connection if all existing connections are in * use. - * + * * @param now The current time in ms * @return The node with the fewest in-flight requests. */ @@ -124,7 +134,7 @@ public interface KafkaClient extends Closeable { /** * Get the total in-flight requests for a particular node - * + * * @param nodeId The id of the node */ int inFlightRequestCount(String nodeId); http://git-wip-us.apache.org/repos/asf/kafka/blob/69d2a177/clients/src/main/java/org/apache/kafka/clients/ManualMetadataUpdater.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/ManualMetadataUpdater.java b/clients/src/main/java/org/apache/kafka/clients/ManualMetadataUpdater.java index ed149fb..8252cf3 100644 --- a/clients/src/main/java/org/apache/kafka/clients/ManualMetadataUpdater.java +++ b/clients/src/main/java/org/apache/kafka/clients/ManualMetadataUpdater.java @@ -17,8 +17,11 @@ package org.apache.kafka.clients; import org.apache.kafka.common.Node; +import org.apache.kafka.common.errors.AuthenticationException; import org.apache.kafka.common.requests.MetadataResponse; import org.apache.kafka.common.requests.RequestHeader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.List; @@ -34,6 +37,8 @@ import java.util.List; */ public class ManualMetadataUpdater implements MetadataUpdater { + private static final Logger log = LoggerFactory.getLogger(ManualMetadataUpdater.class); + private List<Node> nodes; public ManualMetadataUpdater() { @@ -69,6 +74,13 @@ public class ManualMetadataUpdater implements MetadataUpdater { } @Override + public void handleAuthenticationFailure(AuthenticationException exception) { + // We don't fail the broker on authentication failures, but there is sufficient information in the broker logs + // to identify the failure. + log.debug("An authentication error occurred in broker-to-broker communication.", exception); + } + + @Override public void handleCompletedMetadataResponse(RequestHeader requestHeader, long now, MetadataResponse response) { // Do nothing } http://git-wip-us.apache.org/repos/asf/kafka/blob/69d2a177/clients/src/main/java/org/apache/kafka/clients/Metadata.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java index b4521c3..5790d88 100644 --- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java @@ -20,6 +20,7 @@ import org.apache.kafka.common.Cluster; import org.apache.kafka.common.internals.ClusterResourceListeners; import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.errors.AuthenticationException; import org.apache.kafka.common.errors.TimeoutException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -59,6 +60,7 @@ public final class Metadata { private int version; private long lastRefreshMs; private long lastSuccessfulRefreshMs; + private AuthenticationException authenticationException; private Cluster cluster; private boolean needUpdate; /* Topics with expiry time */ @@ -146,15 +148,28 @@ public final class Metadata { } /** + * If any non-retriable authentication exceptions were encountered during + * metadata update, clear and throw the exception. + */ + public synchronized void maybeThrowAuthenticationException() { + if (authenticationException != null) { + AuthenticationException exception = authenticationException; + authenticationException = null; + throw exception; + } + } + + /** * Wait for metadata update until the current version is larger than the last version we know of */ public synchronized void awaitUpdate(final int lastVersion, final long maxWaitMs) throws InterruptedException { if (maxWaitMs < 0) { - throw new IllegalArgumentException("Max time to wait for metadata updates should not be < 0 milli seconds"); + throw new IllegalArgumentException("Max time to wait for metadata updates should not be < 0 milliseconds"); } long begin = System.currentTimeMillis(); long remainingWaitMs = maxWaitMs; while (this.version <= lastVersion) { + maybeThrowAuthenticationException(); if (remainingWaitMs != 0) wait(remainingWaitMs); long elapsed = System.currentTimeMillis() - begin; @@ -256,8 +271,11 @@ public final class Metadata { * Record an attempt to update the metadata that failed. We need to keep track of this * to avoid retrying immediately. */ - public synchronized void failedUpdate(long now) { + public synchronized void failedUpdate(long now, AuthenticationException authenticationException) { this.lastRefreshMs = now; + this.authenticationException = authenticationException; + if (authenticationException != null) + this.notifyAll(); } /** http://git-wip-us.apache.org/repos/asf/kafka/blob/69d2a177/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java b/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java index 55901b5..cb821d6 100644 --- a/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java +++ b/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java @@ -17,6 +17,7 @@ package org.apache.kafka.clients; import org.apache.kafka.common.Node; +import org.apache.kafka.common.errors.AuthenticationException; import org.apache.kafka.common.requests.MetadataResponse; import org.apache.kafka.common.requests.RequestHeader; @@ -62,6 +63,13 @@ interface MetadataUpdater { void handleDisconnection(String destination); /** + * Handle authentication failure. Propagate the authentication exception if awaiting metadata. + * + * @param exception authentication exception from broker + */ + void handleAuthenticationFailure(AuthenticationException exception); + + /** * If `request` is a metadata request, handles it and returns `true`. Otherwise, returns `false`. * * This provides a mechanism for the `MetadataUpdater` implementation to use the NetworkClient instance for its own http://git-wip-us.apache.org/repos/asf/kafka/blob/69d2a177/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index c3c15df..f8da42c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -18,6 +18,7 @@ package org.apache.kafka.clients; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Node; +import org.apache.kafka.common.errors.AuthenticationException; import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.network.ChannelState; @@ -313,6 +314,18 @@ public class NetworkClient implements KafkaClient { } /** + * Check if authentication to this node has failed, based on the connection state. Authentication failures are + * propagated without any retries. + * + * @param node the node to check + * @return an AuthenticationException iff authentication has failed, null otherwise + */ + @Override + public AuthenticationException authenticationException(Node node) { + return connectionStates.authenticationException(node.idString()); + } + + /** * Check if the node with the given id is ready to send more requests. * * @param node The node @@ -589,6 +602,7 @@ public class NetworkClient implements KafkaClient { nodesNeedingApiVersionsFetch.remove(nodeId); switch (disconnectState.state()) { case AUTHENTICATION_FAILED: + connectionStates.authenticationFailed(nodeId, now, disconnectState.exception()); log.error("Connection to node {} failed authentication due to: {}", nodeId, disconnectState.exception().getMessage()); break; case AUTHENTICATE: @@ -610,6 +624,9 @@ public class NetworkClient implements KafkaClient { else responses.add(request.disconnected(now)); } + AuthenticationException authenticationException = connectionStates.authenticationException(nodeId); + if (authenticationException != null) + metadataUpdater.handleAuthenticationFailure(authenticationException); } /** @@ -848,6 +865,13 @@ public class NetworkClient implements KafkaClient { } @Override + public void handleAuthenticationFailure(AuthenticationException exception) { + metadataFetchInProgress = false; + if (metadata.updateRequested()) + metadata.failedUpdate(time.milliseconds(), exception); + } + + @Override public void handleCompletedMetadataResponse(RequestHeader requestHeader, long now, MetadataResponse response) { this.metadataFetchInProgress = false; Cluster cluster = response.cluster(); @@ -862,7 +886,7 @@ public class NetworkClient implements KafkaClient { this.metadata.update(cluster, response.unavailableTopics(), now); } else { log.trace("Ignoring empty metadata response with correlation id {}.", requestHeader.correlationId()); - this.metadata.failedUpdate(now); + this.metadata.failedUpdate(now, null); } } @@ -979,7 +1003,7 @@ public class NetworkClient implements KafkaClient { public ClientResponse disconnected(long timeMs) { return new ClientResponse(header, callback, destination, createdTimeMs, timeMs, true, null, null); } - + @Override public String toString() { return "InFlightRequest(header=" + header + http://git-wip-us.apache.org/repos/asf/kafka/blob/69d2a177/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index c2f2f5f..3ea0394 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -1064,8 +1064,9 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { * function is called * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while * this function is called + * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details * @throws org.apache.kafka.common.errors.AuthorizationException if caller lacks Read access to any of the subscribed - * topics or to the configured groupId + * topics or to the configured groupId. See the exception for more details * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors (e.g. invalid groupId or * session timeout, errors deserializing key/value pairs, or any new error cases in future versions) * @throws java.lang.IllegalArgumentException if the timeout value is negative @@ -1176,8 +1177,9 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { * function is called * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while * this function is called + * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic or to the - * configured groupId + * configured groupId. See the exception for more details * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors (e.g. if offset metadata * is too large or if the topic does not exist). */ @@ -1213,8 +1215,9 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { * function is called * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while * this function is called + * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic or to the - * configured groupId + * configured groupId. See the exception for more details * @throws java.lang.IllegalArgumentException if the committed offset is negative * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors (e.g. if offset metadata * is too large or if the topic does not exist). @@ -1379,8 +1382,9 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { * function is called * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while * this function is called + * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic or to the - * configured groupId + * configured groupId. See the exception for more details * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors */ public long position(TopicPartition partition) { @@ -1412,8 +1416,9 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { * function is called * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while * this function is called + * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic or to the - * configured groupId + * configured groupId. See the exception for more details * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors */ @Override @@ -1445,7 +1450,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { * function is called * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while * this function is called - * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the specified topic + * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details + * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the specified topic. See the exception for more details * @throws org.apache.kafka.common.errors.TimeoutException if the topic metadata could not be fetched before * expiration of the configured request timeout * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors @@ -1560,6 +1566,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { * @return a mapping from partition to the timestamp and offset of the first message with timestamp greater * than or equal to the target timestamp. {@code null} will be returned for the partition if there is no * such message. + * @throws AuthenticationException if authentication fails. See the exception for more details * @throws IllegalArgumentException if the target timestamp is negative. * @throws org.apache.kafka.common.errors.TimeoutException if the offset metadata could not be fetched before * expiration of the configured request timeout @@ -1593,6 +1600,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { * * @param partitions the partitions to get the earliest offsets. * @return The earliest available offsets for the given partitions + * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details * @throws org.apache.kafka.common.errors.TimeoutException if the offset metadata could not be fetched before * expiration of the configured request timeout */ @@ -1621,6 +1629,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { * * @param partitions the partitions to get the end offsets. * @return The end offsets for the given partitions. + * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details * @throws org.apache.kafka.common.errors.TimeoutException if the offset metadata could not be fetched before * expiration of the configured request timeout */ @@ -1641,8 +1650,9 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { * timeout. See {@link #close(long, TimeUnit)} for details. Note that {@link #wakeup()} * cannot be used to interrupt close. * + * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted - * before or while this function is called + * before or while this function is called */ @Override public void close() { @@ -1660,6 +1670,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { * @param timeout The maximum time to wait for consumer to close gracefully. The value must be * non-negative. Specifying a timeout of zero means do not wait for pending requests to complete. * @param timeUnit The time unit for the {@code timeout} + * @throws AuthenticationException if authentication fails. See the exception for more details * @throws InterruptException If the thread is interrupted before or while this function is called * @throws IllegalArgumentException If the {@code timeout} is negative. */ @@ -1729,6 +1740,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { * or reset it using the offset reset policy the user has configured. * * @param partitions The partitions that needs updating fetch positions + * @throws AuthenticationException if authentication fails. See the exception for more details * @throws NoOffsetForPartitionException If no offset is stored for a given partition and no offset reset policy is * defined */ http://git-wip-us.apache.org/repos/asf/kafka/blob/69d2a177/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetCommitCallback.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetCommitCallback.java b/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetCommitCallback.java index 2fef79e..b217a63 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetCommitCallback.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetCommitCallback.java @@ -42,7 +42,7 @@ public interface OffsetCommitCallback { * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while * this function is called * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic or to the - * configured groupId + * configured groupId. See the exception for more details * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors (e.g. if offset metadata * is too large or if the committed offset is invalid). */ http://git-wip-us.apache.org/repos/asf/kafka/blob/69d2a177/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index bf75242..86e4f2b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -19,6 +19,7 @@ package org.apache.kafka.clients.consumer.internals; import org.apache.kafka.clients.ClientResponse; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Node; +import org.apache.kafka.common.errors.AuthenticationException; import org.apache.kafka.common.errors.DisconnectException; import org.apache.kafka.common.errors.GroupAuthorizationException; import org.apache.kafka.common.errors.IllegalGenerationException; @@ -982,6 +983,12 @@ public abstract class AbstractCoordinator implements Closeable { } } } + } catch (AuthenticationException e) { + log.error("An authentication error occurred in the heartbeat thread", e); + this.failed.set(e); + } catch (GroupAuthorizationException e) { + log.error("A group authorization error occurred in the heartbeat thread for group {}", groupId, e); + this.failed.set(e); } catch (InterruptedException | InterruptException e) { Thread.interrupted(); log.error("Unexpected interrupt received in heartbeat thread for group {}", groupId, e); http://git-wip-us.apache.org/repos/asf/kafka/blob/69d2a177/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index e740ba7..38ca041 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -574,7 +574,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator { * or an unrecoverable error is encountered. * @param offsets The offsets to be committed * @throws org.apache.kafka.common.errors.AuthorizationException if the consumer is not authorized to the group - * or to any of the specified partitions + * or to any of the specified partitions. See the exception for more details * @throws CommitFailedException if an unrecoverable error occurs before the commit can be completed * @return If the offset commit was successfully sent and a successful response was received from * the coordinator http://git-wip-us.apache.org/repos/asf/kafka/blob/69d2a177/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java index 803a853..43c6358 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java @@ -22,6 +22,7 @@ import org.apache.kafka.clients.KafkaClient; import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.RequestCompletionHandler; import org.apache.kafka.common.Node; +import org.apache.kafka.common.errors.AuthenticationException; import org.apache.kafka.common.errors.DisconnectException; import org.apache.kafka.common.errors.InterruptException; import org.apache.kafka.common.errors.TimeoutException; @@ -134,6 +135,7 @@ public class ConsumerNetworkClient implements Closeable { int version = this.metadata.requestUpdate(); do { poll(timeout); + this.metadata.maybeThrowAuthenticationException(); } while (this.metadata.version() == version && time.milliseconds() - startMs < timeout); return this.metadata.version() > version; } @@ -367,9 +369,13 @@ public class ConsumerNetworkClient implements Closeable { Collection<ClientRequest> requests = unsent.remove(node); for (ClientRequest request : requests) { RequestFutureCompletionHandler handler = (RequestFutureCompletionHandler) request.callback(); - handler.onComplete(new ClientResponse(request.makeHeader(request.requestBuilder().desiredOrLatestVersion()), - request.callback(), request.destination(), request.createdTimeMs(), now, true, - null, null)); + AuthenticationException authenticationException = client.authenticationException(node); + if (authenticationException != null) + handler.onFailure(authenticationException); + else + handler.onComplete(new ClientResponse(request.makeHeader(request.requestBuilder().desiredOrLatestVersion()), + request.callback(), request.destination(), request.createdTimeMs(), now, true, + null, null)); } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/69d2a177/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 7dcec5c..66760e2 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -537,7 +537,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> { * @throws org.apache.kafka.common.errors.UnsupportedVersionException fatal error indicating the broker * does not support transactions (i.e. if its version is lower than 0.11.0.0) * @throws org.apache.kafka.common.errors.AuthorizationException fatal error indicating that the configured - * transactional.id is not authorized + * transactional.id is not authorized. See the exception for more details * @throws KafkaException if the producer has encountered a previous fatal error or for any other unexpected error */ public void initTransactions() { @@ -557,7 +557,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> { * @throws org.apache.kafka.common.errors.UnsupportedVersionException fatal error indicating the broker * does not support transactions (i.e. if its version is lower than 0.11.0.0) * @throws org.apache.kafka.common.errors.AuthorizationException fatal error indicating that the configured - * transactional.id is not authorized + * transactional.id is not authorized. See the exception for more details * @throws KafkaException if the producer has encountered a previous fatal error or for any other unexpected error */ public void beginTransaction() throws ProducerFencedException { @@ -585,7 +585,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> { * @throws org.apache.kafka.common.errors.UnsupportedForMessageFormatException fatal error indicating the message * format used for the offsets topic on the broker does not support transactions * @throws org.apache.kafka.common.errors.AuthorizationException fatal error indicating that the configured - * transactional.id is not authorized + * transactional.id is not authorized. See the exception for more details * @throws KafkaException if the producer has encountered a previous fatal or abortable error, or for any * other unexpected error */ @@ -609,7 +609,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> { * @throws org.apache.kafka.common.errors.UnsupportedVersionException fatal error indicating the broker * does not support transactions (i.e. if its version is lower than 0.11.0.0) * @throws org.apache.kafka.common.errors.AuthorizationException fatal error indicating that the configured - * transactional.id is not authorized + * transactional.id is not authorized. See the exception for more details * @throws KafkaException if the producer has encountered a previous fatal or abortable error, or for any * other unexpected error */ @@ -630,7 +630,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> { * @throws org.apache.kafka.common.errors.UnsupportedVersionException fatal error indicating the broker * does not support transactions (i.e. if its version is lower than 0.11.0.0) * @throws org.apache.kafka.common.errors.AuthorizationException fatal error indicating that the configured - * transactional.id is not authorized + * transactional.id is not authorized. See the exception for more details * @throws KafkaException if the producer has encountered a previous fatal error or for any other unexpected error */ public void abortTransaction() throws ProducerFencedException { @@ -746,6 +746,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> { * @param callback A user-supplied callback to execute when the record has been acknowledged by the server (null * indicates no callback) * + * @throws AuthenticationException if authentication fails. See the exception for more details * @throws IllegalStateException if a transactional.id has been configured and no transaction has been started * @throws InterruptException If the thread is interrupted while blocked * @throws SerializationException If the key or value are not valid objects given the configured serializers @@ -968,6 +969,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> { /** * Get the partition metadata for the given topic. This can be used for custom partitioning. + * @throws AuthenticationException if authentication fails. See the exception for more details * @throws InterruptException If the thread is interrupted while blocked */ @Override http://git-wip-us.apache.org/repos/asf/kafka/blob/69d2a177/clients/src/main/java/org/apache/kafka/common/network/ChannelState.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/network/ChannelState.java b/clients/src/main/java/org/apache/kafka/common/network/ChannelState.java index 4370fd8..08ed1a0 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/ChannelState.java +++ b/clients/src/main/java/org/apache/kafka/common/network/ChannelState.java @@ -16,7 +16,7 @@ */ package org.apache.kafka.common.network; -import org.apache.kafka.common.errors.ApiException; +import org.apache.kafka.common.errors.AuthenticationException; /** * States for KafkaChannel: @@ -73,11 +73,11 @@ public class ChannelState { public static final ChannelState LOCAL_CLOSE = new ChannelState(State.LOCAL_CLOSE); private final State state; - private final ApiException exception; + private final AuthenticationException exception; public ChannelState(State state) { this(state, null); } - public ChannelState(State state, ApiException exception) { + public ChannelState(State state, AuthenticationException exception) { this.state = state; this.exception = exception; } @@ -86,7 +86,7 @@ public class ChannelState { return state; } - public ApiException exception() { + public AuthenticationException exception() { return exception; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/kafka/blob/69d2a177/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java index 407eb9f..3f2a94c 100644 --- a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java @@ -135,7 +135,7 @@ public class MetadataTest { long now = 10000; // lastRefreshMs updated to now. - metadata.failedUpdate(now); + metadata.failedUpdate(now, null); // Backing off. Remaining time until next try should be returned. assertEquals(refreshBackoffMs, metadata.timeToNextUpdate(now)); @@ -216,7 +216,7 @@ public class MetadataTest { metadata.update(Cluster.empty(), Collections.<String>emptySet(), time); assertEquals(100, metadata.timeToNextUpdate(1000)); - metadata.failedUpdate(1100); + metadata.failedUpdate(1100, null); assertEquals(100, metadata.timeToNextUpdate(1100)); assertEquals(100, metadata.lastSuccessfulUpdate()); http://git-wip-us.apache.org/repos/asf/kafka/blob/69d2a177/clients/src/test/java/org/apache/kafka/clients/MockClient.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java index 71e32ff..66ff253 100644 --- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java +++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java @@ -18,6 +18,7 @@ package org.apache.kafka.clients; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Node; +import org.apache.kafka.common.errors.AuthenticationException; import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.requests.AbstractRequest; import org.apache.kafka.common.requests.AbstractResponse; @@ -135,6 +136,11 @@ public class MockClient implements KafkaClient { } @Override + public AuthenticationException authenticationException(Node node) { + return null; + } + + @Override public void disconnect(String node) { long now = time.milliseconds(); Iterator<ClientRequest> iter = requests.iterator(); http://git-wip-us.apache.org/repos/asf/kafka/blob/69d2a177/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java new file mode 100644 index 0000000..b4460f7 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.security.authenticator; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.config.internals.BrokerSecurityConfigs; +import org.apache.kafka.common.errors.AuthenticationFailedException; +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.network.NetworkTestUtils; +import org.apache.kafka.common.network.NioEchoServer; +import org.apache.kafka.common.protocol.SecurityProtocol; +import org.apache.kafka.common.security.TestSecurityConfig; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class ClientAuthenticationFailureTest { + + private NioEchoServer server; + private Map<String, Object> saslServerConfigs; + private Map<String, Object> saslClientConfigs; + private final String topic = "test"; + private TestJaasConfig testJaasConfig; + + @Before + public void setup() throws Exception { + SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT; + + saslServerConfigs = new HashMap<>(); + saslServerConfigs.put(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG, Arrays.asList("PLAIN")); + + saslClientConfigs = new HashMap<>(); + saslClientConfigs.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT"); + saslClientConfigs.put(SaslConfigs.SASL_MECHANISM, "PLAIN"); + + testJaasConfig = TestJaasConfig.createConfiguration("PLAIN", Arrays.asList("PLAIN")); + testJaasConfig.setClientOptions("PLAIN", TestJaasConfig.USERNAME, "anotherpassword"); + server = createEchoServer(securityProtocol); + } + + @After + public void teardown() throws Exception { + if (server != null) + server.close(); + } + + @Test + public void testConsumerWithInvalidCredentials() { + saslClientConfigs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + server.port()); + saslClientConfigs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); + saslClientConfigs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); + + try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(saslClientConfigs)) { + consumer.subscribe(Arrays.asList(topic)); + consumer.poll(100); + fail("Expected an authentication error!"); + } catch (AuthenticationFailedException e) { + // OK + } catch (Exception e) { + fail("Expected only an authentication error, but another error occurred: " + e.getMessage()); + } + } + + @Test + public void testProducerWithInvalidCredentials() { + saslClientConfigs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + server.port()); + saslClientConfigs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); + saslClientConfigs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); + + ProducerRecord<String, String> record = new ProducerRecord<>(topic, "message"); + try (KafkaProducer<String, String> producer = new KafkaProducer<>(saslClientConfigs)) { + producer.send(record).get(); + fail("Expected an authentication error!"); + } catch (Exception e) { + assertTrue("Expected an exception of type AuthenticationFailedException", e.getCause() instanceof AuthenticationFailedException); + } + } + + private NioEchoServer createEchoServer(SecurityProtocol securityProtocol) throws Exception { + return createEchoServer(ListenerName.forSecurityProtocol(securityProtocol), securityProtocol); + } + + private NioEchoServer createEchoServer(ListenerName listenerName, SecurityProtocol securityProtocol) throws Exception { + return NetworkTestUtils.createEchoServer(listenerName, securityProtocol, + new TestSecurityConfig(saslServerConfigs), new CredentialCache()); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/69d2a177/core/src/main/scala/kafka/admin/AdminClient.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala index fadacec..24149d7 100644 --- a/core/src/main/scala/kafka/admin/AdminClient.scala +++ b/core/src/main/scala/kafka/admin/AdminClient.scala @@ -26,7 +26,7 @@ import org.apache.kafka.clients._ import org.apache.kafka.clients.consumer.internals.{ConsumerNetworkClient, ConsumerProtocol, RequestFuture, RequestFutureAdapter} import org.apache.kafka.common.config.ConfigDef.{Importance, Type} import org.apache.kafka.common.config.{AbstractConfig, ConfigDef} -import org.apache.kafka.common.errors.TimeoutException +import org.apache.kafka.common.errors.{AuthenticationException, TimeoutException} import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.network.Selector import org.apache.kafka.common.protocol.{ApiKeys, Errors} @@ -57,9 +57,8 @@ class AdminClient(val time: Time, val networkThread = new KafkaThread("admin-client-network-thread", new Runnable { override def run() { try { - while (running) { + while (running) client.poll(Long.MaxValue) - } } catch { case t : Throwable => error("admin-client-network-thread exited", t) @@ -96,6 +95,8 @@ class AdminClient(val time: Time, try { return send(broker, api, request) } catch { + case e: AuthenticationException => + throw e case e: Exception => debug(s"Request $api failed against node $broker", e) } http://git-wip-us.apache.org/repos/asf/kafka/blob/69d2a177/core/src/main/scala/kafka/tools/ConsoleConsumer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala index a0818bc..c014caf 100755 --- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala @@ -31,7 +31,7 @@ import kafka.metrics.KafkaMetricsReporter import kafka.utils._ import kafka.utils.Implicits._ import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord} -import org.apache.kafka.common.errors.WakeupException +import org.apache.kafka.common.errors.{AuthenticationException, WakeupException} import org.apache.kafka.common.record.TimestampType import org.apache.kafka.common.serialization.Deserializer import org.apache.kafka.common.utils.Utils @@ -53,6 +53,9 @@ object ConsoleConsumer extends Logging { try { run(conf) } catch { + case e: AuthenticationException => + error("Authentication failed: terminating consumer process", e) + Exit.exit(1) case e: Throwable => error("Unknown error when running consumer: ", e) Exit.exit(1) http://git-wip-us.apache.org/repos/asf/kafka/blob/69d2a177/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala b/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala new file mode 100644 index 0000000..52fbdba --- /dev/null +++ b/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala @@ -0,0 +1,176 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package kafka.api + +import java.io.FileOutputStream +import java.util.concurrent.{ExecutionException, Future, TimeUnit} +import scala.collection.JavaConverters.seqAsJavaListConverter + +import org.apache.kafka.clients.consumer.{KafkaConsumer, ConsumerConfig} +import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata} +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.errors.AuthenticationFailedException +import org.apache.kafka.common.protocol.SecurityProtocol +import org.apache.kafka.common.serialization.ByteArrayDeserializer +import org.junit.{After, Before, Test} +import org.junit.Assert._ + +import kafka.admin.ConsumerGroupCommand.{ConsumerGroupCommandOptions, KafkaConsumerGroupService} +import kafka.server.KafkaConfig +import kafka.utils.{JaasTestUtils, TestUtils, ZkUtils} + +class SaslClientsWithInvalidCredentialsTest extends IntegrationTestHarness with SaslSetup { + private val kafkaClientSaslMechanism = "SCRAM-SHA-256" + private val kafkaServerSaslMechanisms = List(kafkaClientSaslMechanism) + override protected val securityProtocol = SecurityProtocol.SASL_PLAINTEXT + override protected val serverSaslProperties = Some(kafkaServerSaslProperties(kafkaServerSaslMechanisms, kafkaClientSaslMechanism)) + override protected val clientSaslProperties = Some(kafkaClientSaslProperties(kafkaClientSaslMechanism)) + val consumerCount = 1 + val producerCount = 1 + val serverCount = 1 + + this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") + + val topic = "topic" + val tp = new TopicPartition(topic, 0) + + override def configureSecurityBeforeServersStart() { + super.configureSecurityBeforeServersStart() + zkUtils.makeSurePersistentPathExists(ZkUtils.ConfigChangesPath) + // Create broker credentials before starting brokers + createScramCredentials(zkConnect, JaasTestUtils.KafkaScramAdmin, JaasTestUtils.KafkaScramAdminPassword) + } + + @Before + override def setUp(): Unit = { + startSasl(jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), Both, + JaasTestUtils.KafkaServerContextName)) + super.setUp() + TestUtils.createTopic(this.zkUtils, topic, 1, serverCount, this.servers) + } + + @After + override def tearDown(): Unit = { + super.tearDown() + closeSasl() + } + + @Test + def testProducerWithAuthenticationFailure() { + verifyAuthenticationException(() => sendOneRecord(10000)) + + createClientCredential() + verifyWithRetry(() => sendOneRecord()) + } + + @Test + def testConsumerWithAuthenticationFailure() { + val consumer = this.consumers.head + consumer.subscribe(List(topic).asJava) + verifyConsumerWithAuthenticationFailure(consumer) + } + + @Test + def testManualAssignmentConsumerWithAuthenticationFailure() { + val consumer = this.consumers.head + consumer.assign(List(tp).asJava) + verifyConsumerWithAuthenticationFailure(consumer) + } + + @Test + def testManualAssignmentConsumerWithAutoCommitDisabledWithAuthenticationFailure() { + this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false.toString) + val consumer = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer()) + consumers += consumer + consumer.assign(List(tp).asJava) + consumer.seek(tp, 0) + + verifyConsumerWithAuthenticationFailure(consumer) + } + + private def verifyConsumerWithAuthenticationFailure(consumer: KafkaConsumer[Array[Byte], Array[Byte]]) { + verifyAuthenticationException(() => consumer.poll(10000)) + + createClientCredential() + verifyWithRetry(() => sendOneRecord()) + verifyWithRetry(() => assertEquals(1, consumer.poll(1000).count)) + } + + @Test + def testConsumerGroupServiceWithAuthenticationFailure() { + val propsFile = TestUtils.tempFile() + val propsStream = new FileOutputStream(propsFile) + propsStream.write("security.protocol=SASL_PLAINTEXT\n".getBytes()) + propsStream.write(s"sasl.mechanism=$kafkaClientSaslMechanism".getBytes()) + propsStream.close() + + val cgcArgs = Array("--bootstrap-server", brokerList, + "--describe", + "--group", "test.group", + "--command-config", propsFile.getAbsolutePath) + val opts = new ConsumerGroupCommandOptions(cgcArgs) + val consumerGroupService = new KafkaConsumerGroupService(opts) + + val consumer = consumers.head + consumer.subscribe(List(topic).asJava) + + verifyAuthenticationException(() => consumerGroupService.listGroups) + createClientCredential() + verifyWithRetry(() => consumer.poll(1000)) + assertEquals(1, consumerGroupService.listGroups.size) + } + + private def createClientCredential(): Unit = { + createScramCredentials(zkConnect, JaasTestUtils.KafkaScramUser2, JaasTestUtils.KafkaScramPassword2) + } + + private def sendOneRecord(maxWaitMs: Long = 15000): Unit = { + val producer = this.producers.head + val record = new ProducerRecord(tp.topic(), tp.partition(), 0L, "key".getBytes, "value".getBytes) + val future = producer.send(record) + producer.flush() + try { + val recordMetadata = future.get(maxWaitMs, TimeUnit.MILLISECONDS) + assertTrue(s"Invalid offset $recordMetadata", recordMetadata.offset >= 0) + } catch { + case e: ExecutionException => throw e.getCause + } + } + + private def verifyAuthenticationException(f: () => Unit): Unit = { + val startMs = System.currentTimeMillis + try { + f() + fail("Expected an authentication exception") + } catch { + case e: AuthenticationFailedException => + // expected exception + val elapsedMs = System.currentTimeMillis - startMs + assertTrue(s"Poll took too long, elapsed=$elapsedMs", elapsedMs <= 5000) + assertTrue(s"Exception message not useful: $e", e.getMessage.contains("invalid credentials")) + } + } + + private def verifyWithRetry(f: () => Unit): Unit = { + var attempts = 0 + TestUtils.waitUntilTrue(() => { + try { + attempts += 1 + f() + true + } catch { + case _: AuthenticationFailedException => false + } + }, s"Operation did not succeed within timeout after $attempts") + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/69d2a177/core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala index 0bc4e50..ed1c77b 100644 --- a/core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala +++ b/core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala @@ -35,22 +35,14 @@ class SaslScramSslEndToEndAuthorizationTest extends SaslEndToEndAuthorizationTes super.configureSecurityBeforeServersStart() zkUtils.makeSurePersistentPathExists(ZkUtils.ConfigChangesPath) // Create broker credentials before starting brokers - ConfigCommand.main(configCommandArgs(kafkaPrincipal, kafkaPassword)) + createScramCredentials(zkConnect, kafkaPrincipal, kafkaPassword) } @Before override def setUp() { super.setUp() // Create client credentials after starting brokers so that dynamic credential creation is also tested - ConfigCommand.main(configCommandArgs(clientPrincipal, clientPassword)) - ConfigCommand.main(configCommandArgs(JaasTestUtils.KafkaScramUser2, JaasTestUtils.KafkaScramPassword2)) - } - - private def configCommandArgs(username: String, password: String) : Array[String] = { - val credentials = kafkaServerSaslMechanisms.map(m => s"$m=[iterations=4096,password=$password]") - Array("--zookeeper", zkConnect, - "--alter", "--add-config", credentials.mkString(","), - "--entity-type", "users", - "--entity-name", username) + createScramCredentials(zkConnect, clientPrincipal, clientPassword) + createScramCredentials(zkConnect, JaasTestUtils.KafkaScramUser2, JaasTestUtils.KafkaScramPassword2) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/69d2a177/core/src/test/scala/integration/kafka/api/SaslSetup.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/SaslSetup.scala b/core/src/test/scala/integration/kafka/api/SaslSetup.scala index f874f4e..51deb85 100644 --- a/core/src/test/scala/integration/kafka/api/SaslSetup.scala +++ b/core/src/test/scala/integration/kafka/api/SaslSetup.scala @@ -21,14 +21,16 @@ import java.io.File import java.util.Properties import javax.security.auth.login.Configuration +import kafka.admin.ConfigCommand import kafka.security.minikdc.MiniKdc import kafka.server.KafkaConfig import kafka.utils.JaasTestUtils.{JaasSection, Krb5LoginModule, ZkDigestModule} import kafka.utils.{JaasTestUtils, TestUtils} -import org.apache.kafka.common.security.JaasUtils -import org.apache.kafka.common.security.authenticator.LoginManager import org.apache.kafka.common.config.SaslConfigs import org.apache.kafka.common.config.internals.BrokerSecurityConfigs +import org.apache.kafka.common.security.JaasUtils +import org.apache.kafka.common.security.authenticator.LoginManager +import org.apache.kafka.common.security.scram.ScramMechanism /* * Implements an enumeration for the modes enabled here: @@ -135,4 +137,13 @@ trait SaslSetup { def jaasClientLoginModule(clientSaslMechanism: String): String = JaasTestUtils.clientLoginModule(clientSaslMechanism, clientKeytabFile) + def createScramCredentials(zkConnect: String, userName: String, password: String): Unit = { + val credentials = ScramMechanism.values.map(m => s"${m.mechanismName}=[iterations=4096,password=$password]") + val args = Array("--zookeeper", zkConnect, + "--alter", "--add-config", credentials.mkString(","), + "--entity-type", "users", + "--entity-name", userName) + ConfigCommand.main(args) + } + }