rajinisivaram commented on a change in pull request #8990:
URL: https://github.com/apache/kafka/pull/8990#discussion_r450752852



##########
File path: 
clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
##########
@@ -365,4 +367,48 @@ public void testExponentialConnectionSetupTimeout() {
                 connectionSetupTimeoutMs * connectionSetupTimeoutJitter);
         assertTrue(connectionStates.connectingNodes().contains(nodeId1));
     }
+
+    @Test
+    public void testTimedOutConnections() {
+        // Initiate two connections
+        connectionStates.connecting(nodeId1, time.milliseconds(), "localhost", 
ClientDnsLookup.DEFAULT);
+        connectionStates.connecting(nodeId2, time.milliseconds(), "localhost", 
ClientDnsLookup.DEFAULT);
+
+        // Expect no timed out connections
+        
assertTrue(connectionStates.nodesWithConnectionSetupTimeout(time.milliseconds()).isEmpty());

Review comment:
       nit: `assertEquals(0, ...)` may be better here so we know how many in 
the case of failure?

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##########
@@ -440,6 +441,20 @@ public boolean isConnectionSetupTimeout(String id, long 
now) {
         return now - lastConnectAttemptMs(id) > connectionSetupTimeoutMs(id);
     }
 
+    /**
+     * Return the Set of nodes whose connection setup has timed out.
+     * @param now the current time in ms
+     */
+    public Set<String> nodesWithConnectionSetupTimeout(long now) {
+        Set<String> nodes = new HashSet<>();
+        for (String nodeId : connectingNodes) {

Review comment:
       We can use `connectingNodes.stream().filter`?

##########
File path: 
clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
##########
@@ -365,4 +367,48 @@ public void testExponentialConnectionSetupTimeout() {
                 connectionSetupTimeoutMs * connectionSetupTimeoutJitter);
         assertTrue(connectionStates.connectingNodes().contains(nodeId1));
     }
+
+    @Test
+    public void testTimedOutConnections() {
+        // Initiate two connections
+        connectionStates.connecting(nodeId1, time.milliseconds(), "localhost", 
ClientDnsLookup.DEFAULT);
+        connectionStates.connecting(nodeId2, time.milliseconds(), "localhost", 
ClientDnsLookup.DEFAULT);
+
+        // Expect no timed out connections
+        
assertTrue(connectionStates.nodesWithConnectionSetupTimeout(time.milliseconds()).isEmpty());
+
+        // Advance time by half of the connection setup timeout
+        time.sleep(connectionSetupTimeoutMs / 2);
+
+        // Initiate a third connections

Review comment:
       nit: connections => connection

##########
File path: 
clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
##########
@@ -365,4 +367,48 @@ public void testExponentialConnectionSetupTimeout() {
                 connectionSetupTimeoutMs * connectionSetupTimeoutJitter);
         assertTrue(connectionStates.connectingNodes().contains(nodeId1));
     }
+
+    @Test
+    public void testTimedOutConnections() {
+        // Initiate two connections
+        connectionStates.connecting(nodeId1, time.milliseconds(), "localhost", 
ClientDnsLookup.DEFAULT);
+        connectionStates.connecting(nodeId2, time.milliseconds(), "localhost", 
ClientDnsLookup.DEFAULT);
+
+        // Expect no timed out connections
+        
assertTrue(connectionStates.nodesWithConnectionSetupTimeout(time.milliseconds()).isEmpty());
+
+        // Advance time by half of the connection setup timeout
+        time.sleep(connectionSetupTimeoutMs / 2);
+
+        // Initiate a third connections
+        connectionStates.connecting(nodeId3, time.milliseconds(), "localhost", 
ClientDnsLookup.DEFAULT);
+
+        // Advance time beyond the connection setup timeout (+ max jitter) for 
the first two connections
+        time.sleep((long) (connectionSetupTimeoutMs / 2 + 
connectionSetupTimeoutMs * connectionSetupTimeoutJitter));
+
+        // Expect two timed out connections.
+        Set<String> timedOutConnections = 
connectionStates.nodesWithConnectionSetupTimeout(time.milliseconds());
+        assertEquals(2, timedOutConnections.size());
+        assertTrue(timedOutConnections.contains(nodeId1));
+        assertTrue(timedOutConnections.contains(nodeId2));
+
+        // Disconnect the first two connections
+        connectionStates.disconnected(nodeId1, time.milliseconds());
+        connectionStates.disconnected(nodeId2, time.milliseconds());
+
+        // Advance time beyond the connection setup timeout (+ max jitter) for 
for the third connections

Review comment:
       typo: `for for`

##########
File path: 
clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
##########
@@ -365,4 +367,48 @@ public void testExponentialConnectionSetupTimeout() {
                 connectionSetupTimeoutMs * connectionSetupTimeoutJitter);
         assertTrue(connectionStates.connectingNodes().contains(nodeId1));
     }
+
+    @Test
+    public void testTimedOutConnections() {
+        // Initiate two connections
+        connectionStates.connecting(nodeId1, time.milliseconds(), "localhost", 
ClientDnsLookup.DEFAULT);
+        connectionStates.connecting(nodeId2, time.milliseconds(), "localhost", 
ClientDnsLookup.DEFAULT);
+
+        // Expect no timed out connections
+        
assertTrue(connectionStates.nodesWithConnectionSetupTimeout(time.milliseconds()).isEmpty());
+
+        // Advance time by half of the connection setup timeout
+        time.sleep(connectionSetupTimeoutMs / 2);
+
+        // Initiate a third connections
+        connectionStates.connecting(nodeId3, time.milliseconds(), "localhost", 
ClientDnsLookup.DEFAULT);
+
+        // Advance time beyond the connection setup timeout (+ max jitter) for 
the first two connections
+        time.sleep((long) (connectionSetupTimeoutMs / 2 + 
connectionSetupTimeoutMs * connectionSetupTimeoutJitter));
+
+        // Expect two timed out connections.
+        Set<String> timedOutConnections = 
connectionStates.nodesWithConnectionSetupTimeout(time.milliseconds());
+        assertEquals(2, timedOutConnections.size());
+        assertTrue(timedOutConnections.contains(nodeId1));
+        assertTrue(timedOutConnections.contains(nodeId2));
+
+        // Disconnect the first two connections
+        connectionStates.disconnected(nodeId1, time.milliseconds());
+        connectionStates.disconnected(nodeId2, time.milliseconds());
+
+        // Advance time beyond the connection setup timeout (+ max jitter) for 
for the third connections
+        time.sleep((long) (connectionSetupTimeoutMs / 2 + 
connectionSetupTimeoutMs * connectionSetupTimeoutJitter));
+
+        // Expect two timed out connections.

Review comment:
       one?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to