Repository: kafka
Updated Branches:
  refs/heads/trunk 46aa88b9c -> 4c75f31a5


KAFKA-5179; Log connection termination during authentication

Author: Rajini Sivaram <rajinisiva...@googlemail.com>

Reviewers: Ismael Juma, Jun Rao

Closes #2980 from rajinisivaram/KAFKA-5179


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4c75f31a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4c75f31a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4c75f31a

Branch: refs/heads/trunk
Commit: 4c75f31a5f80e6a717d040b0534c79f5ed8d9346
Parents: 46aa88b
Author: Rajini Sivaram <rajinisiva...@googlemail.com>
Authored: Mon May 15 18:13:20 2017 -0400
Committer: Rajini Sivaram <rajinisiva...@googlemail.com>
Committed: Mon May 15 18:13:20 2017 -0400

----------------------------------------------------------------------
 checkstyle/suppressions.xml                     |  2 +-
 .../org/apache/kafka/clients/NetworkClient.java | 27 ++++++++--
 .../kafka/common/network/ChannelState.java      | 56 ++++++++++++++++++++
 .../kafka/common/network/KafkaChannel.java      | 16 +++++-
 .../apache/kafka/common/network/Selectable.java |  7 +--
 .../apache/kafka/common/network/Selector.java   | 22 +++++---
 .../apache/kafka/clients/NetworkClientTest.java |  4 +-
 .../kafka/common/network/NetworkTestUtils.java  |  3 +-
 .../kafka/common/network/SelectorTest.java      |  9 ++--
 .../kafka/common/network/SslSelectorTest.java   |  2 +-
 .../common/network/SslTransportLayerTest.java   | 14 ++---
 .../authenticator/SaslAuthenticatorTest.java    | 40 ++++++--------
 .../org/apache/kafka/test/MockSelector.java     |  9 ++--
 .../main/scala/kafka/network/SocketServer.scala |  2 +-
 14 files changed, 156 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/4c75f31a/checkstyle/suppressions.xml
----------------------------------------------------------------------
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index dd41f94..9729ee5 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -8,7 +8,7 @@
 
     <!-- Clients -->
     <suppress checks="ClassFanOutComplexity"
-              
files="(Fetcher|ConsumerCoordinator|KafkaConsumer|KafkaProducer|SaslServerAuthenticator|Utils|TransactionManagerTest|KafkaAdminClient).java"/>
+              
files="(Fetcher|ConsumerCoordinator|KafkaConsumer|KafkaProducer|SaslServerAuthenticator|Utils|TransactionManagerTest|KafkaAdminClient|NetworkClient).java"/>
     <suppress checks="ClassFanOutComplexity"
               files=".*/protocol/Errors.java"/>
     <suppress checks="ClassFanOutComplexity"

http://git-wip-us.apache.org/repos/asf/kafka/blob/4c75f31a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index df9e2fa..a09f85d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -19,6 +19,7 @@ package org.apache.kafka.clients;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.network.ChannelState;
 import org.apache.kafka.common.network.NetworkReceive;
 import org.apache.kafka.common.network.Selectable;
 import org.apache.kafka.common.network.Send;
@@ -482,10 +483,21 @@ public class NetworkClient implements KafkaClient {
      * @param nodeId Id of the node to be disconnected
      * @param now The current time
      */
-    private void processDisconnection(List<ClientResponse> responses, String 
nodeId, long now) {
+    private void processDisconnection(List<ClientResponse> responses, String 
nodeId, long now, ChannelState disconnectState) {
         connectionStates.disconnected(nodeId, now);
         apiVersions.remove(nodeId);
         nodesNeedingApiVersionsFetch.remove(nodeId);
+        switch (disconnectState) {
+            case AUTHENTICATE:
+                log.warn("Connection to node {} terminated during 
authentication. This may indicate " +
+                        "that authentication failed due to invalid 
credentials.", nodeId);
+                break;
+            case NOT_CONNECTED:
+                log.warn("Connection to node {} could not be established. 
Broker may not be available.", nodeId);
+                break;
+            default:
+                break; // Disconnections in other states are logged at debug 
level in Selector
+        }
         for (InFlightRequest request : this.inFlightRequests.clearAll(nodeId)) 
{
             log.trace("Cancelled request {} due to node {} being 
disconnected", request.request, nodeId);
             if (request.isInternalRequest && request.header.apiKey() == 
ApiKeys.METADATA.id)
@@ -508,7 +520,7 @@ public class NetworkClient implements KafkaClient {
             // close connection to the node
             this.selector.close(nodeId);
             log.debug("Disconnecting from node {} due to request timeout.", 
nodeId);
-            processDisconnection(responses, nodeId, now);
+            processDisconnection(responses, nodeId, now, 
ChannelState.LOCAL_CLOSE);
         }
 
         // we disconnected, so we should probably refresh our metadata
@@ -567,7 +579,7 @@ public class NetworkClient implements KafkaClient {
                 log.warn("Node {} got error {} when making an 
ApiVersionsRequest.  Disconnecting.",
                         node, apiVersionsResponse.error());
                 this.selector.close(node);
-                processDisconnection(responses, node, now);
+                processDisconnection(responses, node, now, 
ChannelState.LOCAL_CLOSE);
             } else {
                 nodesNeedingApiVersionsFetch.put(node, new 
ApiVersionsRequest.Builder((short) 0));
             }
@@ -588,9 +600,10 @@ public class NetworkClient implements KafkaClient {
      * @param now The current time
      */
     private void handleDisconnections(List<ClientResponse> responses, long 
now) {
-        for (String node : this.selector.disconnected()) {
+        for (Map.Entry<String, ChannelState> entry : 
this.selector.disconnected().entrySet()) {
+            String node = entry.getKey();
             log.debug("Node {} disconnected.", node);
-            processDisconnection(responses, node, now);
+            processDisconnection(responses, node, now, entry.getValue());
         }
         // we got a disconnect so we should probably refresh our metadata and 
see if that broker is dead
         if (this.selector.disconnected().size() > 0)
@@ -710,6 +723,10 @@ public class NetworkClient implements KafkaClient {
         @Override
         public void handleDisconnection(String destination) {
             Cluster cluster = metadata.fetch();
+            // 'processDisconnection' generates warnings for misconfigured 
bootstrap server configuration
+            // resulting in 'Connection Refused' and misconfigured security 
resulting in authentication failures.
+            // The warning below handles the case where connection to a broker 
was established, but was disconnected
+            // before metadata could be obtained.
             if (cluster.isBootstrapConfigured()) {
                 int nodeId = Integer.parseInt(destination);
                 Node node = cluster.nodeById(nodeId);

http://git-wip-us.apache.org/repos/asf/kafka/blob/4c75f31a/clients/src/main/java/org/apache/kafka/common/network/ChannelState.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/network/ChannelState.java 
b/clients/src/main/java/org/apache/kafka/common/network/ChannelState.java
new file mode 100644
index 0000000..23e877c
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/network/ChannelState.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.network;
+
+/**
+ * States for KafkaChannel:
+ * <ul>
+ *   <li>NOT_CONNECTED: Connections are created in NOT_CONNECTED state. State 
is updated
+ *       on {@link TransportLayer#finishConnect()} when socket connection is 
established.
+ *       PLAINTEXT channels transition from NOT_CONNECTED to READY, others 
transition
+ *       to AUTHENTICATE. Failures in NOT_CONNECTED state typically indicate 
that the
+ *       remote endpoint is unavailable, which may be due to misconfigured 
endpoints.</li>
+ *   <li>AUTHENTICATE: SSL, SASL_SSL and SASL_PLAINTEXT channels are in 
AUTHENTICATE state during SSL and
+ *       SASL handshake. Disconnections in AUTHENTICATE state may indicate 
that SSL or SASL
+ *       authentication failed. Channels transition to READY state when 
authentication completes
+ *       successfully.</li>
+ *   <li>READY: Connected, authenticated channels are in READY state. Channels 
may transition from
+ *       READY to EXPIRED, FAILED_SEND or LOCAL_CLOSE.</li>
+ *   <li>EXPIRED: Idle connections are moved to EXPIRED state on idle timeout 
and the channel is closed.</li>
+ *   <li>FAILED_SEND: Channels transition from READY to FAILED_SEND state if 
the channel is closed due
+ *       to a send failure.</li>
+ *   <li>LOCAL_CLOSE: Channels are moved to LOCAL_CLOSE state if close() is 
initiated locally.</li>
+ * </ul>
+ * If the remote endpoint closes a channel, the state of the channel reflects 
the state the channel
+ * was in at the time of disconnection. This state may be useful to identify 
the reason for disconnection.
+ * <p>
+ * Typical transitions:
+ * <ul>
+ *   <li>PLAINTEXT Good path: NOT_CONNECTED => READY => LOCAL_CLOSE</li>
+ *   <li>SASL/SSL Good path: NOT_CONNECTED => AUTHENTICATE => READY => 
LOCAL_CLOSE</li>
+ *   <li>Bootstrap server misconfiguration: NOT_CONNECTED, disconnected in 
NOT_CONNECTED state</li>
+ *   <li>Security misconfiguration: NOT_CONNECTED => AUTHENTICATE, 
disconnected in AUTHENTICATE state</li>
+ * </ul>
+ */
+public enum ChannelState {
+    NOT_CONNECTED,
+    AUTHENTICATE,
+    READY,
+    EXPIRED,
+    FAILED_SEND,
+    LOCAL_CLOSE
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/4c75f31a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java 
b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
index ea03ff0..5e3a895 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
@@ -41,6 +41,7 @@ public class KafkaChannel {
     // processed after the channel is disconnected.
     private boolean disconnected;
     private boolean muted;
+    private ChannelState state;
 
     public KafkaChannel(String id, TransportLayer transportLayer, 
Authenticator authenticator, int maxReceiveSize) throws IOException {
         this.id = id;
@@ -50,6 +51,7 @@ public class KafkaChannel {
         this.maxReceiveSize = maxReceiveSize;
         this.disconnected = false;
         this.muted = false;
+        this.state = ChannelState.NOT_CONNECTED;
     }
 
     public void close() throws IOException {
@@ -72,6 +74,8 @@ public class KafkaChannel {
             transportLayer.handshake();
         if (transportLayer.ready() && !authenticator.complete())
             authenticator.authenticate();
+        if (ready())
+            state = ChannelState.READY;
     }
 
     public void disconnect() {
@@ -79,9 +83,19 @@ public class KafkaChannel {
         transportLayer.disconnect();
     }
 
+    public void state(ChannelState state) {
+        this.state = state;
+    }
+
+    public ChannelState state() {
+        return this.state;
+    }
 
     public boolean finishConnect() throws IOException {
-        return transportLayer.finishConnect();
+        boolean connected = transportLayer.finishConnect();
+        if (connected)
+            state = ready() ? ChannelState.READY : ChannelState.AUTHENTICATE;
+        return connected;
     }
 
     public boolean isConnected() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/4c75f31a/clients/src/main/java/org/apache/kafka/common/network/Selectable.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/network/Selectable.java 
b/clients/src/main/java/org/apache/kafka/common/network/Selectable.java
index 6eca427..efb603c 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selectable.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selectable.java
@@ -20,6 +20,7 @@ package org.apache.kafka.common.network;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.List;
+import java.util.Map;
 
 /**
  * An interface for asynchronous, multi-channel network I/O
@@ -80,10 +81,10 @@ public interface Selectable {
     public List<NetworkReceive> completedReceives();
 
     /**
-     * The list of connections that finished disconnecting on the last {@link 
#poll(long) poll()}
-     * call.
+     * The connections that finished disconnecting on the last {@link 
#poll(long) poll()}
+     * call. Channel state indicates the local channel state at the time of 
disconnection.
      */
-    public List<String> disconnected();
+    public Map<String, ChannelState> disconnected();
 
     /**
      * The list of connections that completed their connection on the last 
{@link #poll(long) poll()}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4c75f31a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/network/Selector.java 
b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index a74a584..8f85202 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -92,7 +92,7 @@ public class Selector implements Selectable, AutoCloseable {
     private final Map<KafkaChannel, Deque<NetworkReceive>> stagedReceives;
     private final Set<SelectionKey> immediatelyConnectedKeys;
     private final Map<String, KafkaChannel> closingChannels;
-    private final List<String> disconnected;
+    private final Map<String, ChannelState> disconnected;
     private final List<String> connected;
     private final List<String> failedSends;
     private final Time time;
@@ -137,7 +137,7 @@ public class Selector implements Selectable, AutoCloseable {
         this.immediatelyConnectedKeys = new HashSet<>();
         this.closingChannels = new HashMap<>();
         this.connected = new ArrayList<>();
-        this.disconnected = new ArrayList<>();
+        this.disconnected = new HashMap<>();
         this.failedSends = new ArrayList<>();
         this.sensors = new SelectorMetrics(metrics, metricGrpPrefix, 
metricTags, metricsPerConnection);
         this.channelBuilder = channelBuilder;
@@ -413,7 +413,7 @@ public class Selector implements Selectable, AutoCloseable {
     }
 
     @Override
-    public List<String> disconnected() {
+    public Map<String, ChannelState> disconnected() {
         return this.disconnected;
     }
 
@@ -466,6 +466,7 @@ public class Selector implements Selectable, AutoCloseable {
                 if (log.isTraceEnabled())
                     log.trace("About to close the idle connection from {} due 
to being idle for {} millis",
                             connectionId, (currentTimeNanos - 
expiredConnection.getValue()) / 1000 / 1000);
+                channel.state(ChannelState.EXPIRED);
                 close(channel, true);
             }
         }
@@ -489,7 +490,12 @@ public class Selector implements Selectable, AutoCloseable 
{
                 it.remove();
             }
         }
-        this.disconnected.addAll(this.failedSends);
+        for (String channel : this.failedSends) {
+            KafkaChannel failedChannel = closingChannels.get(channel);
+            if (failedChannel != null)
+                failedChannel.state(ChannelState.FAILED_SEND);
+            this.disconnected.put(channel, ChannelState.FAILED_SEND);
+        }
         this.failedSends.clear();
     }
 
@@ -516,8 +522,12 @@ public class Selector implements Selectable, AutoCloseable 
{
      */
     public void close(String id) {
         KafkaChannel channel = this.channels.get(id);
-        if (channel != null)
+        if (channel != null) {
+            // There is no disconnect notification for local close, but 
updating
+            // channel state here anyway to avoid confusion.
+            channel.state(ChannelState.LOCAL_CLOSE);
             close(channel, false);
+        }
     }
 
     /**
@@ -566,7 +576,7 @@ public class Selector implements Selectable, AutoCloseable {
         this.sensors.connectionClosed.record();
         this.stagedReceives.remove(channel);
         if (notifyDisconnect)
-            this.disconnected.add(channel.id());
+            this.disconnected.put(channel.id(), channel.state());
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/4c75f31a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java 
b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
index 55b4fc6..59a46ac 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
@@ -185,8 +185,8 @@ public class NetworkClientTest {
         // sleeping to make sure that the time since last send is greater than 
requestTimeOut
         time.sleep(3000);
         client.poll(3000, time.milliseconds());
-        String disconnectedNode = selector.disconnected().get(0);
-        assertEquals(node.idString(), disconnectedNode);
+        assertEquals(1, selector.disconnected().size());
+        assertTrue("Node not found in disconnected map", 
selector.disconnected().containsKey(node.idString()));
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/4c75f31a/clients/src/test/java/org/apache/kafka/common/network/NetworkTestUtils.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/common/network/NetworkTestUtils.java 
b/clients/src/test/java/org/apache/kafka/common/network/NetworkTestUtils.java
index a3859c1..43c7d9b 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/network/NetworkTestUtils.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/network/NetworkTestUtils.java
@@ -77,7 +77,7 @@ public class NetworkTestUtils {
         assertTrue(selector.isChannelReady(node));
     }
 
-    public static void waitForChannelClose(Selector selector, String node) 
throws IOException {
+    public static void waitForChannelClose(Selector selector, String node, 
ChannelState channelState) throws IOException {
         boolean closed = false;
         for (int i = 0; i < 30; i++) {
             selector.poll(1000L);
@@ -87,5 +87,6 @@ public class NetworkTestUtils {
             }
         }
         assertTrue("Channel was not closed by timeout", closed);
+        assertEquals(channelState, selector.disconnected().get(node));
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/4c75f31a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java 
b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
index adff4b2..33959fd 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
@@ -81,7 +81,7 @@ public class SelectorTest {
 
         // disconnect
         this.server.closeConnections();
-        while (!selector.disconnected().contains(node))
+        while (!selector.disconnected().containsKey(node))
             selector.poll(1000L);
 
         // reconnect and do another request
@@ -127,8 +127,10 @@ public class SelectorTest {
         ServerSocket nonListeningSocket = new ServerSocket(0);
         int nonListeningPort = nonListeningSocket.getLocalPort();
         selector.connect(node, new InetSocketAddress("localhost", 
nonListeningPort), BUFFER_SIZE, BUFFER_SIZE);
-        while (selector.disconnected().contains(node))
+        while (selector.disconnected().containsKey(node)) {
+            assertEquals(ChannelState.NOT_CONNECTED, 
selector.disconnected().get(node));
             selector.poll(1000L);
+        }
         nonListeningSocket.close();
     }
 
@@ -262,7 +264,8 @@ public class SelectorTest {
         time.sleep(6000); // The max idle time is 5000ms
         selector.poll(0);
 
-        assertTrue("The idle connection should have been closed", 
selector.disconnected().contains(id));
+        assertTrue("The idle connection should have been closed", 
selector.disconnected().containsKey(id));
+        assertEquals(ChannelState.EXPIRED, selector.disconnected().get(id));
     }
 
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/4c75f31a/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java 
b/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
index 476ddfb..80f266f 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
@@ -154,7 +154,7 @@ public class SslSelectorTest extends SelectorTest {
         List<String> disconnected = new ArrayList<>();
         while (!disconnected.contains(node) && System.currentTimeMillis() < 
expiryTime) {
             selector.poll(10);
-            disconnected.addAll(selector.disconnected());
+            disconnected.addAll(selector.disconnected().keySet());
         }
         assertTrue("Renegotiation should cause disconnection", 
disconnected.contains(node));
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/4c75f31a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
 
b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
index 345ace1..42e0f6f 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
@@ -119,7 +119,7 @@ public class SslTransportLayerTest {
         InetSocketAddress addr = new InetSocketAddress("localhost", 
server.port());
         selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
 
-        NetworkTestUtils.waitForChannelClose(selector, node);
+        NetworkTestUtils.waitForChannelClose(selector, node, 
ChannelState.AUTHENTICATE);
     }
     
     /**
@@ -184,7 +184,7 @@ public class SslTransportLayerTest {
         sslClientConfigs.remove(SslConfigs.SSL_KEY_PASSWORD_CONFIG);
         createSelector(sslClientConfigs);
         selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
-        NetworkTestUtils.waitForChannelClose(selector, node);
+        NetworkTestUtils.waitForChannelClose(selector, node, 
ChannelState.AUTHENTICATE);
         selector.close();
         server.close();
 
@@ -212,7 +212,7 @@ public class SslTransportLayerTest {
         InetSocketAddress addr = new InetSocketAddress("localhost", 
server.port());
         selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
 
-        NetworkTestUtils.waitForChannelClose(selector, node);
+        NetworkTestUtils.waitForChannelClose(selector, node, 
ChannelState.AUTHENTICATE);
     }
     
     /**
@@ -232,7 +232,7 @@ public class SslTransportLayerTest {
         InetSocketAddress addr = new InetSocketAddress("localhost", 
server.port());
         selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
 
-        NetworkTestUtils.waitForChannelClose(selector, node);
+        NetworkTestUtils.waitForChannelClose(selector, node, 
ChannelState.AUTHENTICATE);
     }
     
     /**
@@ -384,7 +384,7 @@ public class SslTransportLayerTest {
         InetSocketAddress addr = new InetSocketAddress("localhost", 
server.port());
         selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
 
-        NetworkTestUtils.waitForChannelClose(selector, node);
+        NetworkTestUtils.waitForChannelClose(selector, node, 
ChannelState.AUTHENTICATE);
     }
     
     /**
@@ -401,7 +401,7 @@ public class SslTransportLayerTest {
         InetSocketAddress addr = new InetSocketAddress("localhost", 
server.port());
         selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
 
-        NetworkTestUtils.waitForChannelClose(selector, node);
+        NetworkTestUtils.waitForChannelClose(selector, node, 
ChannelState.AUTHENTICATE);
     }
     
     /**
@@ -419,7 +419,7 @@ public class SslTransportLayerTest {
         InetSocketAddress addr = new InetSocketAddress("localhost", 
server.port());
         selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
 
-        NetworkTestUtils.waitForChannelClose(selector, node);
+        NetworkTestUtils.waitForChannelClose(selector, node, 
ChannelState.AUTHENTICATE);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/4c75f31a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
 
b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
index 1aea835..28402f0 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.config.types.Password;
 import org.apache.kafka.common.network.CertStores;
 import org.apache.kafka.common.network.ChannelBuilder;
 import org.apache.kafka.common.network.ChannelBuilders;
+import org.apache.kafka.common.network.ChannelState;
 import org.apache.kafka.common.network.ListenerName;
 import org.apache.kafka.common.network.NetworkSend;
 import org.apache.kafka.common.network.NetworkTestUtils;
@@ -137,8 +138,7 @@ public class SaslAuthenticatorTest {
         jaasConfig.setPlainClientOptions(TestJaasConfig.USERNAME, 
"invalidpassword");
 
         server = createEchoServer(securityProtocol);
-        createClientConnection(securityProtocol, node);
-        NetworkTestUtils.waitForChannelClose(selector, node);
+        createAndCheckClientConnectionFailure(securityProtocol, node);
     }
 
     /**
@@ -152,8 +152,7 @@ public class SaslAuthenticatorTest {
         jaasConfig.setPlainClientOptions("invaliduser", 
TestJaasConfig.PASSWORD);
 
         server = createEchoServer(securityProtocol);
-        createClientConnection(securityProtocol, node);
-        NetworkTestUtils.waitForChannelClose(selector, node);
+        createAndCheckClientConnectionFailure(securityProtocol, node);
     }
 
     /**
@@ -286,8 +285,7 @@ public class SaslAuthenticatorTest {
         String node = "0";
         server = createEchoServer(securityProtocol);
         updateScramCredentialCache(TestJaasConfig.USERNAME, 
TestJaasConfig.PASSWORD);
-        createClientConnection(securityProtocol, node);
-        NetworkTestUtils.waitForChannelClose(selector, node);
+        createAndCheckClientConnectionFailure(securityProtocol, node);
     }
 
     /**
@@ -305,8 +303,7 @@ public class SaslAuthenticatorTest {
         String node = "0";
         server = createEchoServer(securityProtocol);
         updateScramCredentialCache(TestJaasConfig.USERNAME, 
TestJaasConfig.PASSWORD);
-        createClientConnection(securityProtocol, node);
-        NetworkTestUtils.waitForChannelClose(selector, node);
+        createAndCheckClientConnectionFailure(securityProtocol, node);
     }
 
     /**
@@ -323,8 +320,7 @@ public class SaslAuthenticatorTest {
         
server.credentialCache().cache(ScramMechanism.SCRAM_SHA_256.mechanismName(), 
ScramCredential.class).remove(TestJaasConfig.USERNAME);
         String node = "1";
         saslClientConfigs.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-256");
-        createClientConnection(securityProtocol, node);
-        NetworkTestUtils.waitForChannelClose(selector, node);
+        createAndCheckClientConnectionFailure(securityProtocol, node);
 
         saslClientConfigs.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512");
         createAndCheckClientConnection(securityProtocol, "2");
@@ -425,7 +421,7 @@ public class SaslAuthenticatorTest {
         SaslHandshakeRequest request = new SaslHandshakeRequest("PLAIN");
         RequestHeader header = new RequestHeader(ApiKeys.SASL_HANDSHAKE.id, 
Short.MAX_VALUE, "someclient", 2);
         selector.send(request.toSend(node1, header));
-        NetworkTestUtils.waitForChannelClose(selector, node1);
+        NetworkTestUtils.waitForChannelClose(selector, node1, 
ChannelState.READY);
         selector.close();
 
         // Test good connection still works
@@ -451,7 +447,7 @@ public class SaslAuthenticatorTest {
         byte[] bytes = new byte[1024];
         random.nextBytes(bytes);
         selector.send(new NetworkSend(node1, ByteBuffer.wrap(bytes)));
-        NetworkTestUtils.waitForChannelClose(selector, node1);
+        NetworkTestUtils.waitForChannelClose(selector, node1, 
ChannelState.READY);
         selector.close();
 
         // Test good connection still works
@@ -462,7 +458,7 @@ public class SaslAuthenticatorTest {
         createClientConnection(SecurityProtocol.PLAINTEXT, node2);
         random.nextBytes(bytes);
         selector.send(new NetworkSend(node2, ByteBuffer.wrap(bytes)));
-        NetworkTestUtils.waitForChannelClose(selector, node2);
+        NetworkTestUtils.waitForChannelClose(selector, node2, 
ChannelState.READY);
         selector.close();
 
         // Test good connection still works
@@ -491,7 +487,7 @@ public class SaslAuthenticatorTest {
         RequestHeader versionsHeader = new 
RequestHeader(ApiKeys.API_VERSIONS.id,
                 request.version(), "someclient", 2);
         selector.send(request.toSend(node1, versionsHeader));
-        NetworkTestUtils.waitForChannelClose(selector, node1);
+        NetworkTestUtils.waitForChannelClose(selector, node1, 
ChannelState.READY);
         selector.close();
 
         // Test good connection still works
@@ -518,7 +514,7 @@ public class SaslAuthenticatorTest {
         buffer.put(new byte[buffer.capacity() - 4]);
         buffer.rewind();
         selector.send(new NetworkSend(node1, buffer));
-        NetworkTestUtils.waitForChannelClose(selector, node1);
+        NetworkTestUtils.waitForChannelClose(selector, node1, 
ChannelState.READY);
         selector.close();
 
         // Test good connection still works
@@ -532,7 +528,7 @@ public class SaslAuthenticatorTest {
         buffer.put(new byte[buffer.capacity() - 4]);
         buffer.rewind();
         selector.send(new NetworkSend(node2, buffer));
-        NetworkTestUtils.waitForChannelClose(selector, node2);
+        NetworkTestUtils.waitForChannelClose(selector, node2, 
ChannelState.READY);
         selector.close();
 
         // Test good connection still works
@@ -557,7 +553,7 @@ public class SaslAuthenticatorTest {
         RequestHeader metadataRequestHeader1 = new 
RequestHeader(ApiKeys.METADATA.id,
                 metadataRequest1.version(), "someclient", 1);
         selector.send(metadataRequest1.toSend(node1, metadataRequestHeader1));
-        NetworkTestUtils.waitForChannelClose(selector, node1);
+        NetworkTestUtils.waitForChannelClose(selector, node1, 
ChannelState.READY);
         selector.close();
 
         // Test good connection still works
@@ -572,7 +568,7 @@ public class SaslAuthenticatorTest {
         RequestHeader metadataRequestHeader2 = new 
RequestHeader(ApiKeys.METADATA.id,
                 metadataRequest2.version(), "someclient", 2);
         selector.send(metadataRequest2.toSend(node2, metadataRequestHeader2));
-        NetworkTestUtils.waitForChannelClose(selector, node2);
+        NetworkTestUtils.waitForChannelClose(selector, node2, 
ChannelState.READY);
         selector.close();
 
         // Test good connection still works
@@ -608,8 +604,7 @@ public class SaslAuthenticatorTest {
         configureMechanisms("PLAIN", Arrays.asList("DIGEST-MD5"));
 
         server = createEchoServer(securityProtocol);
-        createClientConnection(securityProtocol, node);
-        NetworkTestUtils.waitForChannelClose(selector, node);
+        createAndCheckClientConnectionFailure(securityProtocol, node);
     }
 
     /**
@@ -623,8 +618,7 @@ public class SaslAuthenticatorTest {
         saslClientConfigs.put(SaslConfigs.SASL_MECHANISM, "INVALID");
 
         server = createEchoServer(securityProtocol);
-        createClientConnection(securityProtocol, node);
-        NetworkTestUtils.waitForChannelClose(selector, node);
+        createAndCheckClientConnectionFailure(securityProtocol, node);
     }
 
     /**
@@ -824,7 +818,7 @@ public class SaslAuthenticatorTest {
 
     private void createAndCheckClientConnectionFailure(SecurityProtocol 
securityProtocol, String node) throws Exception {
         createClientConnection(securityProtocol, node);
-        NetworkTestUtils.waitForChannelClose(selector, node);
+        NetworkTestUtils.waitForChannelClose(selector, node, 
ChannelState.AUTHENTICATE);
         selector.close();
         selector = null;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/4c75f31a/clients/src/test/java/org/apache/kafka/test/MockSelector.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/test/MockSelector.java 
b/clients/src/test/java/org/apache/kafka/test/MockSelector.java
index c1b2205..225aba4 100644
--- a/clients/src/test/java/org/apache/kafka/test/MockSelector.java
+++ b/clients/src/test/java/org/apache/kafka/test/MockSelector.java
@@ -19,9 +19,12 @@ package org.apache.kafka.test;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 
+import org.apache.kafka.common.network.ChannelState;
 import org.apache.kafka.common.network.NetworkReceive;
 import org.apache.kafka.common.network.NetworkSend;
 import org.apache.kafka.common.network.Selectable;
@@ -37,7 +40,7 @@ public class MockSelector implements Selectable {
     private final List<Send> initiatedSends = new ArrayList<Send>();
     private final List<Send> completedSends = new ArrayList<Send>();
     private final List<NetworkReceive> completedReceives = new 
ArrayList<NetworkReceive>();
-    private final List<String> disconnected = new ArrayList<String>();
+    private final Map<String, ChannelState> disconnected = new HashMap<>();
     private final List<String> connected = new ArrayList<String>();
     private final List<DelayedReceive> delayedReceives = new ArrayList<>();
 
@@ -60,7 +63,7 @@ public class MockSelector implements Selectable {
 
     @Override
     public void close(String id) {
-        this.disconnected.add(id);
+        this.disconnected.put(id, ChannelState.LOCAL_CLOSE);
         for (int i = 0; i < this.connected.size(); i++) {
             if (this.connected.get(i).equals(id)) {
                 this.connected.remove(i);
@@ -121,7 +124,7 @@ public class MockSelector implements Selectable {
     }
 
     @Override
-    public List<String> disconnected() {
+    public Map<String, ChannelState> disconnected() {
         return disconnected;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/4c75f31a/core/src/main/scala/kafka/network/SocketServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala 
b/core/src/main/scala/kafka/network/SocketServer.scala
index fb647fa..48d0233 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -544,7 +544,7 @@ private[kafka] class Processor(val id: Int,
   }
 
   private def processDisconnected() {
-    selector.disconnected.asScala.foreach { connectionId =>
+    selector.disconnected.keySet.asScala.foreach { connectionId =>
       val remoteHost = ConnectionId.fromString(connectionId).getOrElse {
         throw new IllegalStateException(s"connectionId has unexpected format: 
$connectionId")
       }.remoteHost

Reply via email to