rajinisivaram commented on a change in pull request #8990: URL: https://github.com/apache/kafka/pull/8990#discussion_r450752852
########## File path: clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java ########## @@ -365,4 +367,48 @@ public void testExponentialConnectionSetupTimeout() { connectionSetupTimeoutMs * connectionSetupTimeoutJitter); assertTrue(connectionStates.connectingNodes().contains(nodeId1)); } + + @Test + public void testTimedOutConnections() { + // Initiate two connections + connectionStates.connecting(nodeId1, time.milliseconds(), "localhost", ClientDnsLookup.DEFAULT); + connectionStates.connecting(nodeId2, time.milliseconds(), "localhost", ClientDnsLookup.DEFAULT); + + // Expect no timed out connections + assertTrue(connectionStates.nodesWithConnectionSetupTimeout(time.milliseconds()).isEmpty()); Review comment: nit: `assertEquals(0, ...)` may be better here so we know how many in the case of failure? ########## File path: clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java ########## @@ -440,6 +441,20 @@ public boolean isConnectionSetupTimeout(String id, long now) { return now - lastConnectAttemptMs(id) > connectionSetupTimeoutMs(id); } + /** + * Return the Set of nodes whose connection setup has timed out. + * @param now the current time in ms + */ + public Set<String> nodesWithConnectionSetupTimeout(long now) { + Set<String> nodes = new HashSet<>(); + for (String nodeId : connectingNodes) { Review comment: We can use `connectingNodes.stream().filter`? ########## File path: clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java ########## @@ -365,4 +367,48 @@ public void testExponentialConnectionSetupTimeout() { connectionSetupTimeoutMs * connectionSetupTimeoutJitter); assertTrue(connectionStates.connectingNodes().contains(nodeId1)); } + + @Test + public void testTimedOutConnections() { + // Initiate two connections + connectionStates.connecting(nodeId1, time.milliseconds(), "localhost", ClientDnsLookup.DEFAULT); + connectionStates.connecting(nodeId2, time.milliseconds(), "localhost", ClientDnsLookup.DEFAULT); + + // Expect no timed out connections + assertTrue(connectionStates.nodesWithConnectionSetupTimeout(time.milliseconds()).isEmpty()); + + // Advance time by half of the connection setup timeout + time.sleep(connectionSetupTimeoutMs / 2); + + // Initiate a third connections Review comment: nit: connections => connection ########## File path: clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java ########## @@ -365,4 +367,48 @@ public void testExponentialConnectionSetupTimeout() { connectionSetupTimeoutMs * connectionSetupTimeoutJitter); assertTrue(connectionStates.connectingNodes().contains(nodeId1)); } + + @Test + public void testTimedOutConnections() { + // Initiate two connections + connectionStates.connecting(nodeId1, time.milliseconds(), "localhost", ClientDnsLookup.DEFAULT); + connectionStates.connecting(nodeId2, time.milliseconds(), "localhost", ClientDnsLookup.DEFAULT); + + // Expect no timed out connections + assertTrue(connectionStates.nodesWithConnectionSetupTimeout(time.milliseconds()).isEmpty()); + + // Advance time by half of the connection setup timeout + time.sleep(connectionSetupTimeoutMs / 2); + + // Initiate a third connections + connectionStates.connecting(nodeId3, time.milliseconds(), "localhost", ClientDnsLookup.DEFAULT); + + // Advance time beyond the connection setup timeout (+ max jitter) for the first two connections + time.sleep((long) (connectionSetupTimeoutMs / 2 + connectionSetupTimeoutMs * connectionSetupTimeoutJitter)); + + // Expect two timed out connections. + Set<String> timedOutConnections = connectionStates.nodesWithConnectionSetupTimeout(time.milliseconds()); + assertEquals(2, timedOutConnections.size()); + assertTrue(timedOutConnections.contains(nodeId1)); + assertTrue(timedOutConnections.contains(nodeId2)); + + // Disconnect the first two connections + connectionStates.disconnected(nodeId1, time.milliseconds()); + connectionStates.disconnected(nodeId2, time.milliseconds()); + + // Advance time beyond the connection setup timeout (+ max jitter) for for the third connections Review comment: typo: `for for` ########## File path: clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java ########## @@ -365,4 +367,48 @@ public void testExponentialConnectionSetupTimeout() { connectionSetupTimeoutMs * connectionSetupTimeoutJitter); assertTrue(connectionStates.connectingNodes().contains(nodeId1)); } + + @Test + public void testTimedOutConnections() { + // Initiate two connections + connectionStates.connecting(nodeId1, time.milliseconds(), "localhost", ClientDnsLookup.DEFAULT); + connectionStates.connecting(nodeId2, time.milliseconds(), "localhost", ClientDnsLookup.DEFAULT); + + // Expect no timed out connections + assertTrue(connectionStates.nodesWithConnectionSetupTimeout(time.milliseconds()).isEmpty()); + + // Advance time by half of the connection setup timeout + time.sleep(connectionSetupTimeoutMs / 2); + + // Initiate a third connections + connectionStates.connecting(nodeId3, time.milliseconds(), "localhost", ClientDnsLookup.DEFAULT); + + // Advance time beyond the connection setup timeout (+ max jitter) for the first two connections + time.sleep((long) (connectionSetupTimeoutMs / 2 + connectionSetupTimeoutMs * connectionSetupTimeoutJitter)); + + // Expect two timed out connections. + Set<String> timedOutConnections = connectionStates.nodesWithConnectionSetupTimeout(time.milliseconds()); + assertEquals(2, timedOutConnections.size()); + assertTrue(timedOutConnections.contains(nodeId1)); + assertTrue(timedOutConnections.contains(nodeId2)); + + // Disconnect the first two connections + connectionStates.disconnected(nodeId1, time.milliseconds()); + connectionStates.disconnected(nodeId2, time.milliseconds()); + + // Advance time beyond the connection setup timeout (+ max jitter) for for the third connections + time.sleep((long) (connectionSetupTimeoutMs / 2 + connectionSetupTimeoutMs * connectionSetupTimeoutJitter)); + + // Expect two timed out connections. Review comment: one? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org