Repository: kafka Updated Branches: refs/heads/trunk 46aa88b9c -> 4c75f31a5
KAFKA-5179; Log connection termination during authentication Author: Rajini Sivaram <rajinisiva...@googlemail.com> Reviewers: Ismael Juma, Jun Rao Closes #2980 from rajinisivaram/KAFKA-5179 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4c75f31a Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4c75f31a Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4c75f31a Branch: refs/heads/trunk Commit: 4c75f31a5f80e6a717d040b0534c79f5ed8d9346 Parents: 46aa88b Author: Rajini Sivaram <rajinisiva...@googlemail.com> Authored: Mon May 15 18:13:20 2017 -0400 Committer: Rajini Sivaram <rajinisiva...@googlemail.com> Committed: Mon May 15 18:13:20 2017 -0400 ---------------------------------------------------------------------- checkstyle/suppressions.xml | 2 +- .../org/apache/kafka/clients/NetworkClient.java | 27 ++++++++-- .../kafka/common/network/ChannelState.java | 56 ++++++++++++++++++++ .../kafka/common/network/KafkaChannel.java | 16 +++++- .../apache/kafka/common/network/Selectable.java | 7 +-- .../apache/kafka/common/network/Selector.java | 22 +++++--- .../apache/kafka/clients/NetworkClientTest.java | 4 +- .../kafka/common/network/NetworkTestUtils.java | 3 +- .../kafka/common/network/SelectorTest.java | 9 ++-- .../kafka/common/network/SslSelectorTest.java | 2 +- .../common/network/SslTransportLayerTest.java | 14 ++--- .../authenticator/SaslAuthenticatorTest.java | 40 ++++++-------- .../org/apache/kafka/test/MockSelector.java | 9 ++-- .../main/scala/kafka/network/SocketServer.scala | 2 +- 14 files changed, 156 insertions(+), 57 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/4c75f31a/checkstyle/suppressions.xml ---------------------------------------------------------------------- diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index dd41f94..9729ee5 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -8,7 +8,7 @@ <!-- Clients --> <suppress checks="ClassFanOutComplexity" - files="(Fetcher|ConsumerCoordinator|KafkaConsumer|KafkaProducer|SaslServerAuthenticator|Utils|TransactionManagerTest|KafkaAdminClient).java"/> + files="(Fetcher|ConsumerCoordinator|KafkaConsumer|KafkaProducer|SaslServerAuthenticator|Utils|TransactionManagerTest|KafkaAdminClient|NetworkClient).java"/> <suppress checks="ClassFanOutComplexity" files=".*/protocol/Errors.java"/> <suppress checks="ClassFanOutComplexity" http://git-wip-us.apache.org/repos/asf/kafka/blob/4c75f31a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index df9e2fa..a09f85d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -19,6 +19,7 @@ package org.apache.kafka.clients; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Node; import org.apache.kafka.common.errors.UnsupportedVersionException; +import org.apache.kafka.common.network.ChannelState; import org.apache.kafka.common.network.NetworkReceive; import org.apache.kafka.common.network.Selectable; import org.apache.kafka.common.network.Send; @@ -482,10 +483,21 @@ public class NetworkClient implements KafkaClient { * @param nodeId Id of the node to be disconnected * @param now The current time */ - private void processDisconnection(List<ClientResponse> responses, String nodeId, long now) { + private void processDisconnection(List<ClientResponse> responses, String nodeId, long now, ChannelState disconnectState) { connectionStates.disconnected(nodeId, now); apiVersions.remove(nodeId); nodesNeedingApiVersionsFetch.remove(nodeId); + switch (disconnectState) { + case AUTHENTICATE: + log.warn("Connection to node {} terminated during authentication. This may indicate " + + "that authentication failed due to invalid credentials.", nodeId); + break; + case NOT_CONNECTED: + log.warn("Connection to node {} could not be established. Broker may not be available.", nodeId); + break; + default: + break; // Disconnections in other states are logged at debug level in Selector + } for (InFlightRequest request : this.inFlightRequests.clearAll(nodeId)) { log.trace("Cancelled request {} due to node {} being disconnected", request.request, nodeId); if (request.isInternalRequest && request.header.apiKey() == ApiKeys.METADATA.id) @@ -508,7 +520,7 @@ public class NetworkClient implements KafkaClient { // close connection to the node this.selector.close(nodeId); log.debug("Disconnecting from node {} due to request timeout.", nodeId); - processDisconnection(responses, nodeId, now); + processDisconnection(responses, nodeId, now, ChannelState.LOCAL_CLOSE); } // we disconnected, so we should probably refresh our metadata @@ -567,7 +579,7 @@ public class NetworkClient implements KafkaClient { log.warn("Node {} got error {} when making an ApiVersionsRequest. Disconnecting.", node, apiVersionsResponse.error()); this.selector.close(node); - processDisconnection(responses, node, now); + processDisconnection(responses, node, now, ChannelState.LOCAL_CLOSE); } else { nodesNeedingApiVersionsFetch.put(node, new ApiVersionsRequest.Builder((short) 0)); } @@ -588,9 +600,10 @@ public class NetworkClient implements KafkaClient { * @param now The current time */ private void handleDisconnections(List<ClientResponse> responses, long now) { - for (String node : this.selector.disconnected()) { + for (Map.Entry<String, ChannelState> entry : this.selector.disconnected().entrySet()) { + String node = entry.getKey(); log.debug("Node {} disconnected.", node); - processDisconnection(responses, node, now); + processDisconnection(responses, node, now, entry.getValue()); } // we got a disconnect so we should probably refresh our metadata and see if that broker is dead if (this.selector.disconnected().size() > 0) @@ -710,6 +723,10 @@ public class NetworkClient implements KafkaClient { @Override public void handleDisconnection(String destination) { Cluster cluster = metadata.fetch(); + // 'processDisconnection' generates warnings for misconfigured bootstrap server configuration + // resulting in 'Connection Refused' and misconfigured security resulting in authentication failures. + // The warning below handles the case where connection to a broker was established, but was disconnected + // before metadata could be obtained. if (cluster.isBootstrapConfigured()) { int nodeId = Integer.parseInt(destination); Node node = cluster.nodeById(nodeId); http://git-wip-us.apache.org/repos/asf/kafka/blob/4c75f31a/clients/src/main/java/org/apache/kafka/common/network/ChannelState.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/network/ChannelState.java b/clients/src/main/java/org/apache/kafka/common/network/ChannelState.java new file mode 100644 index 0000000..23e877c --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/network/ChannelState.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.network; + +/** + * States for KafkaChannel: + * <ul> + * <li>NOT_CONNECTED: Connections are created in NOT_CONNECTED state. State is updated + * on {@link TransportLayer#finishConnect()} when socket connection is established. + * PLAINTEXT channels transition from NOT_CONNECTED to READY, others transition + * to AUTHENTICATE. Failures in NOT_CONNECTED state typically indicate that the + * remote endpoint is unavailable, which may be due to misconfigured endpoints.</li> + * <li>AUTHENTICATE: SSL, SASL_SSL and SASL_PLAINTEXT channels are in AUTHENTICATE state during SSL and + * SASL handshake. Disconnections in AUTHENTICATE state may indicate that SSL or SASL + * authentication failed. Channels transition to READY state when authentication completes + * successfully.</li> + * <li>READY: Connected, authenticated channels are in READY state. Channels may transition from + * READY to EXPIRED, FAILED_SEND or LOCAL_CLOSE.</li> + * <li>EXPIRED: Idle connections are moved to EXPIRED state on idle timeout and the channel is closed.</li> + * <li>FAILED_SEND: Channels transition from READY to FAILED_SEND state if the channel is closed due + * to a send failure.</li> + * <li>LOCAL_CLOSE: Channels are moved to LOCAL_CLOSE state if close() is initiated locally.</li> + * </ul> + * If the remote endpoint closes a channel, the state of the channel reflects the state the channel + * was in at the time of disconnection. This state may be useful to identify the reason for disconnection. + * <p> + * Typical transitions: + * <ul> + * <li>PLAINTEXT Good path: NOT_CONNECTED => READY => LOCAL_CLOSE</li> + * <li>SASL/SSL Good path: NOT_CONNECTED => AUTHENTICATE => READY => LOCAL_CLOSE</li> + * <li>Bootstrap server misconfiguration: NOT_CONNECTED, disconnected in NOT_CONNECTED state</li> + * <li>Security misconfiguration: NOT_CONNECTED => AUTHENTICATE, disconnected in AUTHENTICATE state</li> + * </ul> + */ +public enum ChannelState { + NOT_CONNECTED, + AUTHENTICATE, + READY, + EXPIRED, + FAILED_SEND, + LOCAL_CLOSE +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/4c75f31a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java index ea03ff0..5e3a895 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java +++ b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java @@ -41,6 +41,7 @@ public class KafkaChannel { // processed after the channel is disconnected. private boolean disconnected; private boolean muted; + private ChannelState state; public KafkaChannel(String id, TransportLayer transportLayer, Authenticator authenticator, int maxReceiveSize) throws IOException { this.id = id; @@ -50,6 +51,7 @@ public class KafkaChannel { this.maxReceiveSize = maxReceiveSize; this.disconnected = false; this.muted = false; + this.state = ChannelState.NOT_CONNECTED; } public void close() throws IOException { @@ -72,6 +74,8 @@ public class KafkaChannel { transportLayer.handshake(); if (transportLayer.ready() && !authenticator.complete()) authenticator.authenticate(); + if (ready()) + state = ChannelState.READY; } public void disconnect() { @@ -79,9 +83,19 @@ public class KafkaChannel { transportLayer.disconnect(); } + public void state(ChannelState state) { + this.state = state; + } + + public ChannelState state() { + return this.state; + } public boolean finishConnect() throws IOException { - return transportLayer.finishConnect(); + boolean connected = transportLayer.finishConnect(); + if (connected) + state = ready() ? ChannelState.READY : ChannelState.AUTHENTICATE; + return connected; } public boolean isConnected() { http://git-wip-us.apache.org/repos/asf/kafka/blob/4c75f31a/clients/src/main/java/org/apache/kafka/common/network/Selectable.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selectable.java b/clients/src/main/java/org/apache/kafka/common/network/Selectable.java index 6eca427..efb603c 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/Selectable.java +++ b/clients/src/main/java/org/apache/kafka/common/network/Selectable.java @@ -20,6 +20,7 @@ package org.apache.kafka.common.network; import java.io.IOException; import java.net.InetSocketAddress; import java.util.List; +import java.util.Map; /** * An interface for asynchronous, multi-channel network I/O @@ -80,10 +81,10 @@ public interface Selectable { public List<NetworkReceive> completedReceives(); /** - * The list of connections that finished disconnecting on the last {@link #poll(long) poll()} - * call. + * The connections that finished disconnecting on the last {@link #poll(long) poll()} + * call. Channel state indicates the local channel state at the time of disconnection. */ - public List<String> disconnected(); + public Map<String, ChannelState> disconnected(); /** * The list of connections that completed their connection on the last {@link #poll(long) poll()} http://git-wip-us.apache.org/repos/asf/kafka/blob/4c75f31a/clients/src/main/java/org/apache/kafka/common/network/Selector.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java index a74a584..8f85202 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java +++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java @@ -92,7 +92,7 @@ public class Selector implements Selectable, AutoCloseable { private final Map<KafkaChannel, Deque<NetworkReceive>> stagedReceives; private final Set<SelectionKey> immediatelyConnectedKeys; private final Map<String, KafkaChannel> closingChannels; - private final List<String> disconnected; + private final Map<String, ChannelState> disconnected; private final List<String> connected; private final List<String> failedSends; private final Time time; @@ -137,7 +137,7 @@ public class Selector implements Selectable, AutoCloseable { this.immediatelyConnectedKeys = new HashSet<>(); this.closingChannels = new HashMap<>(); this.connected = new ArrayList<>(); - this.disconnected = new ArrayList<>(); + this.disconnected = new HashMap<>(); this.failedSends = new ArrayList<>(); this.sensors = new SelectorMetrics(metrics, metricGrpPrefix, metricTags, metricsPerConnection); this.channelBuilder = channelBuilder; @@ -413,7 +413,7 @@ public class Selector implements Selectable, AutoCloseable { } @Override - public List<String> disconnected() { + public Map<String, ChannelState> disconnected() { return this.disconnected; } @@ -466,6 +466,7 @@ public class Selector implements Selectable, AutoCloseable { if (log.isTraceEnabled()) log.trace("About to close the idle connection from {} due to being idle for {} millis", connectionId, (currentTimeNanos - expiredConnection.getValue()) / 1000 / 1000); + channel.state(ChannelState.EXPIRED); close(channel, true); } } @@ -489,7 +490,12 @@ public class Selector implements Selectable, AutoCloseable { it.remove(); } } - this.disconnected.addAll(this.failedSends); + for (String channel : this.failedSends) { + KafkaChannel failedChannel = closingChannels.get(channel); + if (failedChannel != null) + failedChannel.state(ChannelState.FAILED_SEND); + this.disconnected.put(channel, ChannelState.FAILED_SEND); + } this.failedSends.clear(); } @@ -516,8 +522,12 @@ public class Selector implements Selectable, AutoCloseable { */ public void close(String id) { KafkaChannel channel = this.channels.get(id); - if (channel != null) + if (channel != null) { + // There is no disconnect notification for local close, but updating + // channel state here anyway to avoid confusion. + channel.state(ChannelState.LOCAL_CLOSE); close(channel, false); + } } /** @@ -566,7 +576,7 @@ public class Selector implements Selectable, AutoCloseable { this.sensors.connectionClosed.record(); this.stagedReceives.remove(channel); if (notifyDisconnect) - this.disconnected.add(channel.id()); + this.disconnected.put(channel.id(), channel.state()); } /** http://git-wip-us.apache.org/repos/asf/kafka/blob/4c75f31a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java index 55b4fc6..59a46ac 100644 --- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java @@ -185,8 +185,8 @@ public class NetworkClientTest { // sleeping to make sure that the time since last send is greater than requestTimeOut time.sleep(3000); client.poll(3000, time.milliseconds()); - String disconnectedNode = selector.disconnected().get(0); - assertEquals(node.idString(), disconnectedNode); + assertEquals(1, selector.disconnected().size()); + assertTrue("Node not found in disconnected map", selector.disconnected().containsKey(node.idString())); } @Test http://git-wip-us.apache.org/repos/asf/kafka/blob/4c75f31a/clients/src/test/java/org/apache/kafka/common/network/NetworkTestUtils.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/network/NetworkTestUtils.java b/clients/src/test/java/org/apache/kafka/common/network/NetworkTestUtils.java index a3859c1..43c7d9b 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/NetworkTestUtils.java +++ b/clients/src/test/java/org/apache/kafka/common/network/NetworkTestUtils.java @@ -77,7 +77,7 @@ public class NetworkTestUtils { assertTrue(selector.isChannelReady(node)); } - public static void waitForChannelClose(Selector selector, String node) throws IOException { + public static void waitForChannelClose(Selector selector, String node, ChannelState channelState) throws IOException { boolean closed = false; for (int i = 0; i < 30; i++) { selector.poll(1000L); @@ -87,5 +87,6 @@ public class NetworkTestUtils { } } assertTrue("Channel was not closed by timeout", closed); + assertEquals(channelState, selector.disconnected().get(node)); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/4c75f31a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java index adff4b2..33959fd 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java @@ -81,7 +81,7 @@ public class SelectorTest { // disconnect this.server.closeConnections(); - while (!selector.disconnected().contains(node)) + while (!selector.disconnected().containsKey(node)) selector.poll(1000L); // reconnect and do another request @@ -127,8 +127,10 @@ public class SelectorTest { ServerSocket nonListeningSocket = new ServerSocket(0); int nonListeningPort = nonListeningSocket.getLocalPort(); selector.connect(node, new InetSocketAddress("localhost", nonListeningPort), BUFFER_SIZE, BUFFER_SIZE); - while (selector.disconnected().contains(node)) + while (selector.disconnected().containsKey(node)) { + assertEquals(ChannelState.NOT_CONNECTED, selector.disconnected().get(node)); selector.poll(1000L); + } nonListeningSocket.close(); } @@ -262,7 +264,8 @@ public class SelectorTest { time.sleep(6000); // The max idle time is 5000ms selector.poll(0); - assertTrue("The idle connection should have been closed", selector.disconnected().contains(id)); + assertTrue("The idle connection should have been closed", selector.disconnected().containsKey(id)); + assertEquals(ChannelState.EXPIRED, selector.disconnected().get(id)); } http://git-wip-us.apache.org/repos/asf/kafka/blob/4c75f31a/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java index 476ddfb..80f266f 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java @@ -154,7 +154,7 @@ public class SslSelectorTest extends SelectorTest { List<String> disconnected = new ArrayList<>(); while (!disconnected.contains(node) && System.currentTimeMillis() < expiryTime) { selector.poll(10); - disconnected.addAll(selector.disconnected()); + disconnected.addAll(selector.disconnected().keySet()); } assertTrue("Renegotiation should cause disconnection", disconnected.contains(node)); http://git-wip-us.apache.org/repos/asf/kafka/blob/4c75f31a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java index 345ace1..42e0f6f 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java @@ -119,7 +119,7 @@ public class SslTransportLayerTest { InetSocketAddress addr = new InetSocketAddress("localhost", server.port()); selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE); - NetworkTestUtils.waitForChannelClose(selector, node); + NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.AUTHENTICATE); } /** @@ -184,7 +184,7 @@ public class SslTransportLayerTest { sslClientConfigs.remove(SslConfigs.SSL_KEY_PASSWORD_CONFIG); createSelector(sslClientConfigs); selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE); - NetworkTestUtils.waitForChannelClose(selector, node); + NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.AUTHENTICATE); selector.close(); server.close(); @@ -212,7 +212,7 @@ public class SslTransportLayerTest { InetSocketAddress addr = new InetSocketAddress("localhost", server.port()); selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE); - NetworkTestUtils.waitForChannelClose(selector, node); + NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.AUTHENTICATE); } /** @@ -232,7 +232,7 @@ public class SslTransportLayerTest { InetSocketAddress addr = new InetSocketAddress("localhost", server.port()); selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE); - NetworkTestUtils.waitForChannelClose(selector, node); + NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.AUTHENTICATE); } /** @@ -384,7 +384,7 @@ public class SslTransportLayerTest { InetSocketAddress addr = new InetSocketAddress("localhost", server.port()); selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE); - NetworkTestUtils.waitForChannelClose(selector, node); + NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.AUTHENTICATE); } /** @@ -401,7 +401,7 @@ public class SslTransportLayerTest { InetSocketAddress addr = new InetSocketAddress("localhost", server.port()); selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE); - NetworkTestUtils.waitForChannelClose(selector, node); + NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.AUTHENTICATE); } /** @@ -419,7 +419,7 @@ public class SslTransportLayerTest { InetSocketAddress addr = new InetSocketAddress("localhost", server.port()); selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE); - NetworkTestUtils.waitForChannelClose(selector, node); + NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.AUTHENTICATE); } /** http://git-wip-us.apache.org/repos/asf/kafka/blob/4c75f31a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java index 1aea835..28402f0 100644 --- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java @@ -23,6 +23,7 @@ import org.apache.kafka.common.config.types.Password; import org.apache.kafka.common.network.CertStores; import org.apache.kafka.common.network.ChannelBuilder; import org.apache.kafka.common.network.ChannelBuilders; +import org.apache.kafka.common.network.ChannelState; import org.apache.kafka.common.network.ListenerName; import org.apache.kafka.common.network.NetworkSend; import org.apache.kafka.common.network.NetworkTestUtils; @@ -137,8 +138,7 @@ public class SaslAuthenticatorTest { jaasConfig.setPlainClientOptions(TestJaasConfig.USERNAME, "invalidpassword"); server = createEchoServer(securityProtocol); - createClientConnection(securityProtocol, node); - NetworkTestUtils.waitForChannelClose(selector, node); + createAndCheckClientConnectionFailure(securityProtocol, node); } /** @@ -152,8 +152,7 @@ public class SaslAuthenticatorTest { jaasConfig.setPlainClientOptions("invaliduser", TestJaasConfig.PASSWORD); server = createEchoServer(securityProtocol); - createClientConnection(securityProtocol, node); - NetworkTestUtils.waitForChannelClose(selector, node); + createAndCheckClientConnectionFailure(securityProtocol, node); } /** @@ -286,8 +285,7 @@ public class SaslAuthenticatorTest { String node = "0"; server = createEchoServer(securityProtocol); updateScramCredentialCache(TestJaasConfig.USERNAME, TestJaasConfig.PASSWORD); - createClientConnection(securityProtocol, node); - NetworkTestUtils.waitForChannelClose(selector, node); + createAndCheckClientConnectionFailure(securityProtocol, node); } /** @@ -305,8 +303,7 @@ public class SaslAuthenticatorTest { String node = "0"; server = createEchoServer(securityProtocol); updateScramCredentialCache(TestJaasConfig.USERNAME, TestJaasConfig.PASSWORD); - createClientConnection(securityProtocol, node); - NetworkTestUtils.waitForChannelClose(selector, node); + createAndCheckClientConnectionFailure(securityProtocol, node); } /** @@ -323,8 +320,7 @@ public class SaslAuthenticatorTest { server.credentialCache().cache(ScramMechanism.SCRAM_SHA_256.mechanismName(), ScramCredential.class).remove(TestJaasConfig.USERNAME); String node = "1"; saslClientConfigs.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-256"); - createClientConnection(securityProtocol, node); - NetworkTestUtils.waitForChannelClose(selector, node); + createAndCheckClientConnectionFailure(securityProtocol, node); saslClientConfigs.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512"); createAndCheckClientConnection(securityProtocol, "2"); @@ -425,7 +421,7 @@ public class SaslAuthenticatorTest { SaslHandshakeRequest request = new SaslHandshakeRequest("PLAIN"); RequestHeader header = new RequestHeader(ApiKeys.SASL_HANDSHAKE.id, Short.MAX_VALUE, "someclient", 2); selector.send(request.toSend(node1, header)); - NetworkTestUtils.waitForChannelClose(selector, node1); + NetworkTestUtils.waitForChannelClose(selector, node1, ChannelState.READY); selector.close(); // Test good connection still works @@ -451,7 +447,7 @@ public class SaslAuthenticatorTest { byte[] bytes = new byte[1024]; random.nextBytes(bytes); selector.send(new NetworkSend(node1, ByteBuffer.wrap(bytes))); - NetworkTestUtils.waitForChannelClose(selector, node1); + NetworkTestUtils.waitForChannelClose(selector, node1, ChannelState.READY); selector.close(); // Test good connection still works @@ -462,7 +458,7 @@ public class SaslAuthenticatorTest { createClientConnection(SecurityProtocol.PLAINTEXT, node2); random.nextBytes(bytes); selector.send(new NetworkSend(node2, ByteBuffer.wrap(bytes))); - NetworkTestUtils.waitForChannelClose(selector, node2); + NetworkTestUtils.waitForChannelClose(selector, node2, ChannelState.READY); selector.close(); // Test good connection still works @@ -491,7 +487,7 @@ public class SaslAuthenticatorTest { RequestHeader versionsHeader = new RequestHeader(ApiKeys.API_VERSIONS.id, request.version(), "someclient", 2); selector.send(request.toSend(node1, versionsHeader)); - NetworkTestUtils.waitForChannelClose(selector, node1); + NetworkTestUtils.waitForChannelClose(selector, node1, ChannelState.READY); selector.close(); // Test good connection still works @@ -518,7 +514,7 @@ public class SaslAuthenticatorTest { buffer.put(new byte[buffer.capacity() - 4]); buffer.rewind(); selector.send(new NetworkSend(node1, buffer)); - NetworkTestUtils.waitForChannelClose(selector, node1); + NetworkTestUtils.waitForChannelClose(selector, node1, ChannelState.READY); selector.close(); // Test good connection still works @@ -532,7 +528,7 @@ public class SaslAuthenticatorTest { buffer.put(new byte[buffer.capacity() - 4]); buffer.rewind(); selector.send(new NetworkSend(node2, buffer)); - NetworkTestUtils.waitForChannelClose(selector, node2); + NetworkTestUtils.waitForChannelClose(selector, node2, ChannelState.READY); selector.close(); // Test good connection still works @@ -557,7 +553,7 @@ public class SaslAuthenticatorTest { RequestHeader metadataRequestHeader1 = new RequestHeader(ApiKeys.METADATA.id, metadataRequest1.version(), "someclient", 1); selector.send(metadataRequest1.toSend(node1, metadataRequestHeader1)); - NetworkTestUtils.waitForChannelClose(selector, node1); + NetworkTestUtils.waitForChannelClose(selector, node1, ChannelState.READY); selector.close(); // Test good connection still works @@ -572,7 +568,7 @@ public class SaslAuthenticatorTest { RequestHeader metadataRequestHeader2 = new RequestHeader(ApiKeys.METADATA.id, metadataRequest2.version(), "someclient", 2); selector.send(metadataRequest2.toSend(node2, metadataRequestHeader2)); - NetworkTestUtils.waitForChannelClose(selector, node2); + NetworkTestUtils.waitForChannelClose(selector, node2, ChannelState.READY); selector.close(); // Test good connection still works @@ -608,8 +604,7 @@ public class SaslAuthenticatorTest { configureMechanisms("PLAIN", Arrays.asList("DIGEST-MD5")); server = createEchoServer(securityProtocol); - createClientConnection(securityProtocol, node); - NetworkTestUtils.waitForChannelClose(selector, node); + createAndCheckClientConnectionFailure(securityProtocol, node); } /** @@ -623,8 +618,7 @@ public class SaslAuthenticatorTest { saslClientConfigs.put(SaslConfigs.SASL_MECHANISM, "INVALID"); server = createEchoServer(securityProtocol); - createClientConnection(securityProtocol, node); - NetworkTestUtils.waitForChannelClose(selector, node); + createAndCheckClientConnectionFailure(securityProtocol, node); } /** @@ -824,7 +818,7 @@ public class SaslAuthenticatorTest { private void createAndCheckClientConnectionFailure(SecurityProtocol securityProtocol, String node) throws Exception { createClientConnection(securityProtocol, node); - NetworkTestUtils.waitForChannelClose(selector, node); + NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.AUTHENTICATE); selector.close(); selector = null; } http://git-wip-us.apache.org/repos/asf/kafka/blob/4c75f31a/clients/src/test/java/org/apache/kafka/test/MockSelector.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/test/MockSelector.java b/clients/src/test/java/org/apache/kafka/test/MockSelector.java index c1b2205..225aba4 100644 --- a/clients/src/test/java/org/apache/kafka/test/MockSelector.java +++ b/clients/src/test/java/org/apache/kafka/test/MockSelector.java @@ -19,9 +19,12 @@ package org.apache.kafka.test; import java.io.IOException; import java.net.InetSocketAddress; import java.util.ArrayList; +import java.util.HashMap; import java.util.Iterator; import java.util.List; +import java.util.Map; +import org.apache.kafka.common.network.ChannelState; import org.apache.kafka.common.network.NetworkReceive; import org.apache.kafka.common.network.NetworkSend; import org.apache.kafka.common.network.Selectable; @@ -37,7 +40,7 @@ public class MockSelector implements Selectable { private final List<Send> initiatedSends = new ArrayList<Send>(); private final List<Send> completedSends = new ArrayList<Send>(); private final List<NetworkReceive> completedReceives = new ArrayList<NetworkReceive>(); - private final List<String> disconnected = new ArrayList<String>(); + private final Map<String, ChannelState> disconnected = new HashMap<>(); private final List<String> connected = new ArrayList<String>(); private final List<DelayedReceive> delayedReceives = new ArrayList<>(); @@ -60,7 +63,7 @@ public class MockSelector implements Selectable { @Override public void close(String id) { - this.disconnected.add(id); + this.disconnected.put(id, ChannelState.LOCAL_CLOSE); for (int i = 0; i < this.connected.size(); i++) { if (this.connected.get(i).equals(id)) { this.connected.remove(i); @@ -121,7 +124,7 @@ public class MockSelector implements Selectable { } @Override - public List<String> disconnected() { + public Map<String, ChannelState> disconnected() { return disconnected; } http://git-wip-us.apache.org/repos/asf/kafka/blob/4c75f31a/core/src/main/scala/kafka/network/SocketServer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index fb647fa..48d0233 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -544,7 +544,7 @@ private[kafka] class Processor(val id: Int, } private def processDisconnected() { - selector.disconnected.asScala.foreach { connectionId => + selector.disconnected.keySet.asScala.foreach { connectionId => val remoteHost = ConnectionId.fromString(connectionId).getOrElse { throw new IllegalStateException(s"connectionId has unexpected format: $connectionId") }.remoteHost