dajac commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r440658796



##########
File path: clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
##########
@@ -678,7 +696,11 @@ public Node leastLoadedNode(long now) {
             } else if 
(connectionStates.isPreparingConnection(node.idString())) {
                 foundConnecting = node;
             } else if (canConnect(node, now)) {
-                foundCanConnect = node;
+                if (foundCanConnect == null ||
+                        
this.connectionStates.lastConnectAttemptMs(foundCanConnect.idString()) >
+                                
this.connectionStates.lastConnectAttemptMs(node.idString())) {
+                    foundCanConnect = node;
+                }

Review comment:
       It would be great if you could update the javadoc of the method to 
reflect this.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##########
@@ -357,6 +398,36 @@ private NodeConnectionState nodeState(String id) {
         return state;
     }
 
+    /**
+     * Get the id set of nodes which are in CONNECTING state
+     */
+    public Set<String> connectingNodes() {
+        return this.connectingNodes;
+    }
+
+    /**
+     * Get the timestamp of the latest connection attempt of a given node
+     * @param id the connection to fetch the state for
+     */
+    public long lastConnectAttemptMs(String id) {
+        NodeConnectionState nodeState = this.nodeState.get(id);
+        return nodeState == null ? 0 : nodeState.lastConnectAttemptMs;
+    }
+
+    public long connectionSetupTimeoutMs(String id) {
+        NodeConnectionState nodeState = this.nodeState.get(id);
+        return nodeState.connectionSetupTimeoutMs;
+    }
+
+    /**
+     * Test if the connection to the given node has reached its timeout
+     * @param id the connection to fetch the state for
+     * @param now the current time in ms
+     */
+    public boolean isConnectionSetupTimeout(String id, long now) {
+        return now - lastConnectAttemptMs(id) > connectionSetupTimeoutMs(id);

Review comment:
       Are you sure that using `lastConnectAttemptMs` is correct here? 
`lastConnectAttemptMs` is recorded when a connection is disconnected and as we 
respect the `reconnectBackoffMs` before reconnecting, the connection timeout 
computed here will also include the current `reconnectBackoffMs`. Is this what 
we want? It may be better to record the time in `connecting`.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
##########
@@ -103,6 +103,12 @@
         Utils.join(SecurityProtocol.names(), ", ") + ".";
     public static final String DEFAULT_SECURITY_PROTOCOL = "PLAINTEXT";
 
+    public static final String SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG = 
"socket.connection.setup.timeout.ms";
+    public static final String SOCKET_CONNECTION_SETUP_TIMEOUT_MS_DOC = "The 
amount of time the client will wait for the initial socket connection to be 
built. If the connection is not built before the timeout elapses the network 
client will close the socket channel. The default value will be 10 seconds.";

Review comment:
       nit: I would use `to be established` instead of `to be built` and I 
think that we should avoid putting default values in the documentation here 
because first the default can be changed on a per client basis and second the 
default will be documented based on the default value provided in config 
definition.

##########
File path: 
clients/src/main/java/org/apache/kafka/common/utils/GeometricProgression.java
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.utils;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * An util class for exponential backoff, backoff, etc...
+ * The formula is Term(n) = random(1 - jitter, 1 + jitter) * scaleFactor * 
(ratio) ^ n
+ * If scaleFactor is greater or equal than termMax, a constant term of will be 
provided
+ * This class is thread-safe
+ */
+public class GeometricProgression {
+    private final int ratio;
+    private final double expMax;
+    private final long scaleFactor;
+    private final double jitter;
+
+    public GeometricProgression(long scaleFactor, int ratio, long termMax, 
double jitter) {
+        this.scaleFactor = scaleFactor;
+        this.ratio = ratio;
+        this.jitter = jitter;
+        this.expMax = termMax > scaleFactor ?
+                Math.log(termMax / (double) Math.max(scaleFactor, 1)) / 
Math.log(ratio) : 0;
+    }
+
+    public long term(long n) {

Review comment:
       nit: What about using `retries` instead of `n`? It may help to 
understand that the exponential value is computed based on the number of tries 
or retries.

##########
File path: clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
##########
@@ -786,6 +808,29 @@ private void handleAbortedSends(List<ClientResponse> 
responses) {
         abortedSends.clear();
     }
 
+    /**
+     * Handle socket channel connection timeout. The timeout will hit iff a 
connection
+     * stays at the ConnectionState.CONNECTING state longer than the timeout 
value,
+     * as indicated by ClusterConnectionStates.NodeConnectionState.
+     *
+     * @param responses The list of responses to update
+     * @param now The current time
+     */
+    private void handleTimeoutConnections(List<ClientResponse> responses, long 
now) {
+        Set<String> connectingNodes = connectionStates.connectingNodes();
+        for (String nodeId: connectingNodes) {

Review comment:
       nit: We usually put a space before and after the `:`.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##########
@@ -357,6 +398,36 @@ private NodeConnectionState nodeState(String id) {
         return state;
     }
 
+    /**
+     * Get the id set of nodes which are in CONNECTING state
+     */
+    public Set<String> connectingNodes() {
+        return this.connectingNodes;
+    }
+
+    /**
+     * Get the timestamp of the latest connection attempt of a given node
+     * @param id the connection to fetch the state for
+     */
+    public long lastConnectAttemptMs(String id) {
+        NodeConnectionState nodeState = this.nodeState.get(id);
+        return nodeState == null ? 0 : nodeState.lastConnectAttemptMs;
+    }
+
+    public long connectionSetupTimeoutMs(String id) {
+        NodeConnectionState nodeState = this.nodeState.get(id);
+        return nodeState.connectionSetupTimeoutMs;

Review comment:
       Should we ensure that `nodeState` is not `null` here?

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##########
@@ -300,30 +323,48 @@ public AuthenticationException 
authenticationException(String id) {
      * Resets the failure count for a node and sets the reconnect backoff to 
the base
      * value configured via reconnect.backoff.ms
      *
-     * @param nodeState The node state object to update
+     * @param nodeState nodeState The node state object to update
      */
     private void resetReconnectBackoff(NodeConnectionState nodeState) {
         nodeState.failedAttempts = 0;
-        nodeState.reconnectBackoffMs = this.reconnectBackoffInitMs;
+        nodeState.reconnectBackoffMs = reconnectBackoff.term(0);
+    }
+
+    /**
+     * Resets the failure count for a node and sets the connection setup 
timeout to the base
+     * value configured via socket.connection.setup.timeout.ms
+     *
+     * @param nodeState nodeState The node state object to update
+     */
+    private void resetConnectionSetupTimeout(NodeConnectionState nodeState) {
+        nodeState.failedConnectAttempts = 0;
+        nodeState.connectionSetupTimeoutMs = connectionSetupTimeout.term(0);
     }
 
     /**
-     * Update the node reconnect backoff exponentially.
+     * Increment the failure counter, update the node reconnect backoff 
exponentially,
+     * and record the current timestamp.
      * The delay is reconnect.backoff.ms * 2**(failures - 1) * (+/- 20% random 
jitter)
      * Up to a (pre-jitter) maximum of reconnect.backoff.max.ms
      *
      * @param nodeState The node state object to update
      */
-    private void updateReconnectBackoff(NodeConnectionState nodeState) {
-        if (this.reconnectBackoffMaxMs > this.reconnectBackoffInitMs) {
-            nodeState.failedAttempts += 1;
-            double backoffExp = Math.min(nodeState.failedAttempts - 1, 
this.reconnectBackoffMaxExp);
-            double backoffFactor = Math.pow(RECONNECT_BACKOFF_EXP_BASE, 
backoffExp);
-            long reconnectBackoffMs = (long) (this.reconnectBackoffInitMs * 
backoffFactor);
-            // Actual backoff is randomized to avoid connection storms.
-            double randomFactor = ThreadLocalRandom.current().nextDouble(0.8, 
1.2);
-            nodeState.reconnectBackoffMs = (long) (randomFactor * 
reconnectBackoffMs);
-        }
+    private void incrementReconnectBackoff(NodeConnectionState nodeState, long 
now) {
+        nodeState.reconnectBackoffMs = 
reconnectBackoff.term(nodeState.failedAttempts);
+        nodeState.failedAttempts++;

Review comment:
       Shouldn't we increment before computing the new reconnect backoff?

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
##########
@@ -103,6 +103,12 @@
         Utils.join(SecurityProtocol.names(), ", ") + ".";
     public static final String DEFAULT_SECURITY_PROTOCOL = "PLAINTEXT";
 
+    public static final String SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG = 
"socket.connection.setup.timeout.ms";
+    public static final String SOCKET_CONNECTION_SETUP_TIMEOUT_MS_DOC = "The 
amount of time the client will wait for the initial socket connection to be 
built. If the connection is not built before the timeout elapses the network 
client will close the socket channel. The default value will be 10 seconds.";
+
+    public static final String SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG = 
"socket.connection.setup.timeout.max.ms";
+    public static final String SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_DOC = 
"The maximum amount of time the client will wait for the initial socket 
connection to be built. The connection setup timeout will increase 
exponentially for each consecutive connection failure up to this maximum. To 
avoid connection storms, a randomization factor of 0.2 will be applied to the 
backoff resulting in a random range between 20% below and 20% above the 
computed value. The default value will be 127 seconds.";

Review comment:
       Same comment as above.

##########
File path: 
clients/src/test/java/org/apache/kafka/common/utils/GeometricProgressionTest.java
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.utils;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class GeometricProgressionTest {
+    @Test
+    public void testGeometricProgression() {
+        long scaleFactor = 100;
+        int ratio = 2;
+        long termMax = 2000;
+        double jitter = 0.2;
+        GeometricProgression geometricProgression = new GeometricProgression(
+                scaleFactor, ratio, termMax, jitter
+        );
+
+        for (int i = 0; i <= 100; i++) {
+            for (int n = 0; n <= 4; n++) {
+                assertEquals(scaleFactor * Math.pow(ratio, n), 
geometricProgression.term(n),
+                        scaleFactor * Math.pow(ratio, n) * jitter);
+            }
+            System.out.println(geometricProgression.term(5));

Review comment:
       This one can be removed I suppose.

##########
File path: 
clients/src/main/java/org/apache/kafka/common/utils/GeometricProgression.java
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.utils;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * An util class for exponential backoff, backoff, etc...
+ * The formula is Term(n) = random(1 - jitter, 1 + jitter) * scaleFactor * 
(ratio) ^ n
+ * If scaleFactor is greater or equal than termMax, a constant term of will be 
provided
+ * This class is thread-safe
+ */
+public class GeometricProgression {

Review comment:
       I would go with `ExponentialBackoff` even if we use it for computing an 
exponential timeout as well. I think that people will understand this.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
##########
@@ -149,6 +155,16 @@
                                         atLeast(0),
                                         Importance.MEDIUM,
                                         REQUEST_TIMEOUT_MS_DOC)
+                                
.define(SOCKET_CONNECTIONS_SETUP_TIMEOUT_MS_CONFIG,
+                                        Type.LONG,
+                                        10 * 1000,
+                                        Importance.MEDIUM,
+                                        
CommonClientConfigs.SOCKET_CONNECTIONS_SETUP_TIMEOUT_MS_DOC)
+                                
.define(SOCKET_CONNECTIONS_SETUP_TIMEOUT_MAX_MS_CONFIG,
+                                        Type.LONG,
+                                        127 * 1000,

Review comment:
       While I also recognize that we are not consistent with this, I would do 
it as suggested by Rajini. The defaults are the same everywhere so it makes 
sense to have it defined centrally for now.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to