[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

2020-06-25 Thread GitBox


d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r445911915



##
File path: 
clients/src/main/java/org/apache/kafka/common/utils/ExponentialBackoff.java
##
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.utils;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * An utility class for keeping the parameters and providing the value of 
exponential
+ * retry backoff, exponential reconnect backoff, exponential timeout, etc.
+ * The formula is

Review comment:
   : added





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

2020-06-25 Thread GitBox


d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r445911638



##
File path: 
clients/src/main/java/org/apache/kafka/common/utils/ExponentialBackoff.java
##
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.utils;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * An utility class for keeping the parameters and providing the value of 
exponential
+ * retry backoff, exponential reconnect backoff, exponential timeout, etc.
+ * The formula is
+ * Backoff(attempts) = random(1 - jitter, 1 + jitter) * initialInterval * 
multiplier ^ attempts
+ * If scaleFactor is greater or equal than termMax, a constant backoff of will 
be provided

Review comment:
   Replaced.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

2020-06-25 Thread GitBox


d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r445911349



##
File path: 
clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##
@@ -357,6 +402,46 @@ private NodeConnectionState nodeState(String id) {
 return state;
 }
 
+/**
+ * Get the id set of nodes which are in CONNECTING state
+ */
+public Set connectingNodes() {
+return this.connectingNodes;
+}
+
+/**
+ * Get the timestamp of the latest connection attempt of a given node
+ * @param id the connection to fetch the state for
+ */
+public long lastConnectAttemptMs(String id) {
+NodeConnectionState nodeState = this.nodeState.get(id);
+return nodeState == null ? 0 : nodeState.lastConnectAttemptMs;
+}
+
+/**
+ * Get the current socket connection setup timeout of the given node.
+ * The base value is defined via socket.connection.setup.timeout.
+ * @param id the connection to fetch the state for
+ */
+public long connectionSetupTimeoutMs(String id) {
+NodeConnectionState nodeState = this.nodeState.get(id);
+if (nodeState == null)
+throw new IllegalStateException("Connection to node " + id + " 
hasn't been initialized");

Review comment:
   Good catch. Refactored.

##
File path: 
clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##
@@ -357,6 +402,46 @@ private NodeConnectionState nodeState(String id) {
 return state;
 }
 
+/**
+ * Get the id set of nodes which are in CONNECTING state
+ */
+public Set connectingNodes() {
+return this.connectingNodes;
+}
+
+/**
+ * Get the timestamp of the latest connection attempt of a given node
+ * @param id the connection to fetch the state for
+ */
+public long lastConnectAttemptMs(String id) {
+NodeConnectionState nodeState = this.nodeState.get(id);
+return nodeState == null ? 0 : nodeState.lastConnectAttemptMs;
+}
+
+/**
+ * Get the current socket connection setup timeout of the given node.
+ * The base value is defined via socket.connection.setup.timeout.
+ * @param id the connection to fetch the state for
+ */
+public long connectionSetupTimeoutMs(String id) {
+NodeConnectionState nodeState = this.nodeState.get(id);
+if (nodeState == null)
+throw new IllegalStateException("Connection to node " + id + " 
hasn't been initialized");
+return nodeState.connectionSetupTimeoutMs;
+}
+
+/**
+ * Test if the connection to the given node has reached its timeout
+ * @param id the connection to fetch the state for
+ * @param now the current time in ms
+ */
+public boolean isConnectionSetupTimeout(String id, long now) {
+NodeConnectionState nodeState = this.nodeState.get(id);

Review comment:
   Good catch. Refactored.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

2020-06-24 Thread GitBox


d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r445062773



##
File path: 
clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
##
@@ -103,6 +103,14 @@
 Utils.join(SecurityProtocol.names(), ", ") + ".";
 public static final String DEFAULT_SECURITY_PROTOCOL = "PLAINTEXT";
 
+public static final String SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG = 
"socket.connection.setup.timeout.ms";
+public static final String SOCKET_CONNECTION_SETUP_TIMEOUT_MS_DOC = "The 
amount of time the client will wait for the socket connection to be 
established. If the connection is not built before the timeout elapses, clients 
will close the socket channel.";
+public static final Long SOCKET_CONNECTION_SETUP_TIMEOUT_MS_DEFAULT = 10 * 
1000L;
+
+public static final String SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG = 
"socket.connection.setup.timeout.max.ms";
+public static final String SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_DOC = 
"The maximum amount of time the client will wait for the socket connection to 
be established. The connection setup timeout will increase exponentially for 
each consecutive connection failure up to this maximum. To avoid connection 
storms, a randomization factor of 0.2 will be applied to the timeout resulting 
in a random range between 20% below and 20% above the computed value.";
+public static final Long SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_DEFAULT = 
127 * 1000L;

Review comment:
   Yes.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

2020-06-24 Thread GitBox


d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r445054740



##
File path: core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala
##
@@ -205,6 +205,8 @@ object BrokerApiVersionsCommand {
   private object AdminClient {
 val DefaultConnectionMaxIdleMs = 9 * 60 * 1000
 val DefaultRequestTimeoutMs = 5000
+val DefaultSocketConnectionSetupMs = 10 * 1000
+val DefaultSocketConnectionSetupMaxMs = 127 * 1000

Review comment:
   Oh, that's true. Removed them.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

2020-06-24 Thread GitBox


d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r445054383



##
File path: 
clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
##
@@ -40,7 +40,12 @@
 private final MockTime time = new MockTime();
 private final long reconnectBackoffMs = 10 * 1000;
 private final long reconnectBackoffMax = 60 * 1000;
-private final double reconnectBackoffJitter = 0.2;
+private final long connectionSetupTimeoutMs = 10 * 1000;
+private final long connectionSetupTimeoutMaxMs = 127 * 1000;
+private final static int RECONNECT_BACKOFF_EXP_BASE = 2;
+private final static double RECONNECT_BACKOFF_JITTER = 0.2;
+private final static int CONNECTION_SETUP_TIMEOUT_EXP_BASE = 2;
+private final static double CONNECTION_SETUP_TIMEOUT_JITTER = 0.2;

Review comment:
   Make sense. I've made them package-private and lower-cased the constants.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

2020-06-24 Thread GitBox


d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r445048690



##
File path: 
clients/src/main/java/org/apache/kafka/common/utils/ExponentialBackoff.java
##
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.utils;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * An utility class for exponential backoff, timeout, etc...
+ * The formula is Term(n) = random(1 - jitter, 1 + jitter) * scaleFactor * 
(ratio) ^ n
+ * If scaleFactor is greater or equal than termMax, a constant term of will be 
provided
+ * This class is thread-safe
+ */
+public class ExponentialBackoff {
+private final int ratio;
+private final double expMax;
+private final long scaleFactor;
+private final double jitter;
+
+public ExponentialBackoff(long scaleFactor, int ratio, long termMax, 
double jitter) {

Review comment:
   Sure. Let's use this proposal.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

2020-06-24 Thread GitBox


d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r445048384



##
File path: 
clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
##
@@ -103,6 +103,14 @@
 Utils.join(SecurityProtocol.names(), ", ") + ".";
 public static final String DEFAULT_SECURITY_PROTOCOL = "PLAINTEXT";
 
+public static final String SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG = 
"socket.connection.setup.timeout.ms";
+public static final String SOCKET_CONNECTION_SETUP_TIMEOUT_MS_DOC = "The 
amount of time the client will wait for the socket connection to be 
established. If the connection is not built before the timeout elapses, clients 
will close the socket channel.";
+public static final Long SOCKET_CONNECTION_SETUP_TIMEOUT_MS_DEFAULT = 10 * 
1000L;

Review comment:
   Sure. Refactored.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

2020-06-24 Thread GitBox


d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r445042974



##
File path: 
clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##
@@ -357,6 +398,36 @@ private NodeConnectionState nodeState(String id) {
 return state;
 }
 
+/**
+ * Get the id set of nodes which are in CONNECTING state
+ */
+public Set connectingNodes() {
+return this.connectingNodes;
+}
+
+/**
+ * Get the timestamp of the latest connection attempt of a given node
+ * @param id the connection to fetch the state for
+ */
+public long lastConnectAttemptMs(String id) {
+NodeConnectionState nodeState = this.nodeState.get(id);
+return nodeState == null ? 0 : nodeState.lastConnectAttemptMs;
+}
+
+public long connectionSetupTimeoutMs(String id) {
+NodeConnectionState nodeState = this.nodeState.get(id);
+return nodeState.connectionSetupTimeoutMs;

Review comment:
   Sure. Now `isConnectionSetupTimeout` is also using this checker.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

2020-06-22 Thread GitBox


d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r443970898



##
File path: 
clients/src/main/java/org/apache/kafka/common/utils/ExponentialBackoff.java
##
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.utils;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * An utility class for exponential backoff, timeout, etc...

Review comment:
   Sure. How does it look like now?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

2020-06-22 Thread GitBox


d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r443935192



##
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##
@@ -88,6 +88,8 @@ object Defaults {
   val MaxConnections: Int = Int.MaxValue
   val ConnectionsMaxIdleMs = 10 * 60 * 1000L
   val RequestTimeoutMs = 3
+  val ConnectionSetupTimeoutMs = 10 * 1000L
+  val ConnectionSetupTimeoutMaxMs = 127 * 1000L

Review comment:
   Yes. Refactored.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

2020-06-22 Thread GitBox


d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r443934101



##
File path: core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala
##
@@ -205,6 +205,8 @@ object BrokerApiVersionsCommand {
   private object AdminClient {
 val DefaultConnectionMaxIdleMs = 9 * 60 * 1000
 val DefaultRequestTimeoutMs = 5000
+val DefaultSocketConnectionSetupMs = 10 * 1000
+val DefaultSocketConnectionSetupMaxMs = 127 * 1000

Review comment:
   Yes. Removed.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

2020-06-22 Thread GitBox


d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r443933957



##
File path: 
clients/src/test/java/org/apache/kafka/common/utils/ExponentialBackoffTest.java
##
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.utils;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class ExponentialBackoffTest {
+@Test
+public void testExponentialBackoff() {
+long scaleFactor = 100;
+int ratio = 2;
+long termMax = 2000;
+double jitter = 0.2;
+ExponentialBackoff exponentialBackoff = new ExponentialBackoff(
+scaleFactor, ratio, termMax, jitter
+);
+
+for (int i = 0; i <= 100; i++) {
+for (int n = 0; n <= 4; n++) {
+assertEquals(scaleFactor * Math.pow(ratio, n), 
exponentialBackoff.term(n),
+scaleFactor * Math.pow(ratio, n) * jitter);
+}
+assertTrue(exponentialBackoff.term(1000) <= termMax * (1 + 
jitter));

Review comment:
   Thanks for the suggestion. I've adopted this in my test.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

2020-06-22 Thread GitBox


d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r443891445



##
File path: 
clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
##
@@ -321,4 +325,37 @@ public void testIsPreparingConnection() {
 connectionStates.disconnected(nodeId1, time.milliseconds());
 assertFalse(connectionStates.isPreparingConnection(nodeId1));
 }
+
+@Test
+public void testExponentialConnectionSetupTimeout() {
+assertTrue(connectionStates.canConnect(nodeId1, time.milliseconds()));
+
+// Check the exponential timeout growth
+for (int n = 0; n <= Math.log((double) connectionSetupTimeoutMaxMs / 
connectionSetupTimeoutMs) / Math.log(2); n++) {

Review comment:
   Yes. I added static class properties to make the test robust.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

2020-06-22 Thread GitBox


d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r443903361



##
File path: 
clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
##
@@ -321,4 +325,37 @@ public void testIsPreparingConnection() {
 connectionStates.disconnected(nodeId1, time.milliseconds());
 assertFalse(connectionStates.isPreparingConnection(nodeId1));
 }
+
+@Test
+public void testExponentialConnectionSetupTimeout() {
+assertTrue(connectionStates.canConnect(nodeId1, time.milliseconds()));
+
+// Check the exponential timeout growth
+for (int n = 0; n <= Math.log((double) connectionSetupTimeoutMaxMs / 
connectionSetupTimeoutMs) / Math.log(2); n++) {
+connectionStates.connecting(nodeId1, time.milliseconds(), 
"localhost", ClientDnsLookup.DEFAULT);
+assertTrue(connectionStates.connectingNodes().contains(nodeId1));
+assertEquals(connectionSetupTimeoutMs * Math.pow(2, n),
+connectionStates.connectionSetupTimeoutMs(nodeId1),
+connectionSetupTimeoutMs * Math.pow(2, n) * 0.2);
+connectionStates.disconnected(nodeId1, time.milliseconds());
+assertFalse(connectionStates.connectingNodes().contains(nodeId1));
+}
+
+// Check the timeout value upper bound
+connectionStates.connecting(nodeId1, time.milliseconds(), "localhost", 
ClientDnsLookup.DEFAULT);
+assertEquals(connectionStates.connectionSetupTimeoutMs(nodeId1), 
connectionSetupTimeoutMaxMs, connectionSetupTimeoutMaxMs * 0.2);

Review comment:
   Good catch. The `expected` should be what you suggested. Refactored.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

2020-06-22 Thread GitBox


d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r443903133



##
File path: 
clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
##
@@ -321,4 +325,37 @@ public void testIsPreparingConnection() {
 connectionStates.disconnected(nodeId1, time.milliseconds());
 assertFalse(connectionStates.isPreparingConnection(nodeId1));
 }
+
+@Test
+public void testExponentialConnectionSetupTimeout() {
+assertTrue(connectionStates.canConnect(nodeId1, time.milliseconds()));
+
+// Check the exponential timeout growth
+for (int n = 0; n <= Math.log((double) connectionSetupTimeoutMaxMs / 
connectionSetupTimeoutMs) / Math.log(2); n++) {
+connectionStates.connecting(nodeId1, time.milliseconds(), 
"localhost", ClientDnsLookup.DEFAULT);
+assertTrue(connectionStates.connectingNodes().contains(nodeId1));
+assertEquals(connectionSetupTimeoutMs * Math.pow(2, n),
+connectionStates.connectionSetupTimeoutMs(nodeId1),
+connectionSetupTimeoutMs * Math.pow(2, n) * 0.2);

Review comment:
   Yes. I added static class properties for specifying these parameters.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

2020-06-22 Thread GitBox


d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r443891445



##
File path: 
clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
##
@@ -321,4 +325,37 @@ public void testIsPreparingConnection() {
 connectionStates.disconnected(nodeId1, time.milliseconds());
 assertFalse(connectionStates.isPreparingConnection(nodeId1));
 }
+
+@Test
+public void testExponentialConnectionSetupTimeout() {
+assertTrue(connectionStates.canConnect(nodeId1, time.milliseconds()));
+
+// Check the exponential timeout growth
+for (int n = 0; n <= Math.log((double) connectionSetupTimeoutMaxMs / 
connectionSetupTimeoutMs) / Math.log(2); n++) {

Review comment:
   Yes. I'll create a constant to make the test robust.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

2020-06-22 Thread GitBox


d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r443890115



##
File path: 
clients/src/main/java/org/apache/kafka/common/utils/ExponentialBackoff.java
##
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.utils;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * An utility class for exponential backoff, timeout, etc...
+ * The formula is Term(n) = random(1 - jitter, 1 + jitter) * scaleFactor * 
(ratio) ^ n
+ * If scaleFactor is greater or equal than termMax, a constant term of will be 
provided
+ * This class is thread-safe
+ */
+public class ExponentialBackoff {
+private final int ratio;
+private final double expMax;
+private final long scaleFactor;
+private final double jitter;
+
+public ExponentialBackoff(long scaleFactor, int ratio, long termMax, 
double jitter) {
+this.scaleFactor = scaleFactor;
+this.ratio = ratio;
+this.jitter = jitter;
+this.expMax = termMax > scaleFactor ?
+Math.log(termMax / (double) Math.max(scaleFactor, 1)) / 
Math.log(ratio) : 0;
+}
+
+public long term(long n) {

Review comment:
   Thanks for the suggestion. Let's use the signature `backoff(long 
attempts)` for now





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

2020-06-22 Thread GitBox


d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r443888648



##
File path: 
clients/src/main/java/org/apache/kafka/common/utils/ExponentialBackoff.java
##
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.utils;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * An utility class for exponential backoff, timeout, etc...
+ * The formula is Term(n) = random(1 - jitter, 1 + jitter) * scaleFactor * 
(ratio) ^ n
+ * If scaleFactor is greater or equal than termMax, a constant term of will be 
provided
+ * This class is thread-safe
+ */
+public class ExponentialBackoff {
+private final int ratio;
+private final double expMax;
+private final long scaleFactor;
+private final double jitter;
+
+public ExponentialBackoff(long scaleFactor, int ratio, long termMax, 
double jitter) {

Review comment:
   I was using the naming for geometric sequences on Wikipedia. I think 
your suggestion also make sense. @rajinisivaram Do you think we can use 
`initialInterval`, `multiplier`, and `maxInterval`?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

2020-06-22 Thread GitBox


d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r443807553



##
File path: clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
##
@@ -771,7 +794,7 @@ private void processDisconnection(List 
responses,
  * @param responses The list of responses to update
  * @param now The current time
  */
-private void handleTimedOutRequests(List responses, long 
now) {
+private void handleTimeoutRequests(List responses, long 
now) {

Review comment:
   Sure. Refactored.
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

2020-06-22 Thread GitBox


d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r443806341



##
File path: 
clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
##
@@ -185,4 +191,9 @@ public static void 
warnIfDeprecatedDnsLookupValue(AbstractConfig config) {
 CLIENT_DNS_LOOKUP_CONFIG, ClientDnsLookup.DEFAULT,
 ClientDnsLookup.USE_ALL_DNS_IPS);
 }
+
+public class Defaults {
+public static final long SocketConnectionSetupTimeoutMs = 10 * 1000;
+public static final long SocketConnectionSetupTimeoutMaxMs = 127 * 
1000;

Review comment:
   I see. Refactored.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

2020-06-22 Thread GitBox


d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r443794598



##
File path: 
clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
##
@@ -103,6 +103,12 @@
 Utils.join(SecurityProtocol.names(), ", ") + ".";
 public static final String DEFAULT_SECURITY_PROTOCOL = "PLAINTEXT";
 
+public static final String SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG = 
"socket.connection.setup.timeout.ms";
+public static final String SOCKET_CONNECTION_SETUP_TIMEOUT_MS_DOC = "The 
amount of time the client will wait for the initial socket connection to be 
established. If the connection is not built before the timeout elapses the 
network client will close the socket channel.";
+
+public static final String SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG = 
"socket.connection.setup.timeout.max.ms";
+public static final String SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_DOC = 
"The maximum amount of time the client will wait for the initial socket 
connection to be established. The connection setup timeout will increase 
exponentially for each consecutive connection failure up to this maximum. To 
avoid connection storms, a randomization factor of 0.2 will be applied to the 
backoff resulting in a random range between 20% below and 20% above the 
computed value.";

Review comment:
   Yes. Refactored.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

2020-06-22 Thread GitBox


d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r443793290



##
File path: 
clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##
@@ -357,6 +345,37 @@ private NodeConnectionState nodeState(String id) {
 return state;
 }
 
+/**
+ * Get the id set of nodes which are in CONNECTING state
+ */
+public Set connectingNodes() {
+return this.connectingNodes;
+}
+
+/**
+ * Get the timestamp of the latest connection attempt of a given node
+ * @param id the connection to fetch the state for
+ */
+public long lastConnectAttemptMs(String id) {
+NodeConnectionState nodeState = this.nodeState.get(id);
+return nodeState == null ? 0 : nodeState.lastConnectAttemptMs;
+}
+
+// Visible for testing
+long connectionSetupTimeoutMs(String id) {
+NodeConnectionState nodeState = this.nodeState.get(id);
+return nodeState.connectionSetupTimeoutMs;
+}
+
+/**
+ * Test if the connection to the given node has reached its timeout
+ * @param id the connection to fetch the state for
+ * @param now the current time in ms
+ */
+public boolean isConnectionSetupTimeout(String id, long now) {
+return now - lastConnectAttemptMs(id) > connectionSetupTimeoutMs(id);

Review comment:
   Make sense. Checker added.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

2020-06-22 Thread GitBox


d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r443785134



##
File path: 
clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##
@@ -357,6 +398,36 @@ private NodeConnectionState nodeState(String id) {
 return state;
 }
 
+/**
+ * Get the id set of nodes which are in CONNECTING state
+ */
+public Set connectingNodes() {
+return this.connectingNodes;
+}
+
+/**
+ * Get the timestamp of the latest connection attempt of a given node
+ * @param id the connection to fetch the state for
+ */
+public long lastConnectAttemptMs(String id) {
+NodeConnectionState nodeState = this.nodeState.get(id);
+return nodeState == null ? 0 : nodeState.lastConnectAttemptMs;
+}
+
+public long connectionSetupTimeoutMs(String id) {
+NodeConnectionState nodeState = this.nodeState.get(id);
+return nodeState.connectionSetupTimeoutMs;

Review comment:
   When `NetworkClient` initializes a connection to a given node 
(`NetworkClient::initiateConnect`), it's guaranteed that the `nodeState` will 
get initialized and won't be `null`. I think it's probably not reasonable if 
the caller wants to get the connection timeout of a given node before the 
connection initialization, which is the reason I prevent this kind of calling 
by throwing the exception.
   
   However, it might be reasonable for a caller to get the 
`lastConnectAttemptMs` before initializing the connection. For example, the 
node provider wants to provide a node with the least recent connection attempt. 
For those nodes haven't been connected yet, their `NodeConnectionState` does 
not exist. However, this implies that the node has the highest priority and we 
may assume their `lastConnectAttemptMs` is 0.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

2020-06-22 Thread GitBox


d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r443785134



##
File path: 
clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##
@@ -357,6 +398,36 @@ private NodeConnectionState nodeState(String id) {
 return state;
 }
 
+/**
+ * Get the id set of nodes which are in CONNECTING state
+ */
+public Set connectingNodes() {
+return this.connectingNodes;
+}
+
+/**
+ * Get the timestamp of the latest connection attempt of a given node
+ * @param id the connection to fetch the state for
+ */
+public long lastConnectAttemptMs(String id) {
+NodeConnectionState nodeState = this.nodeState.get(id);
+return nodeState == null ? 0 : nodeState.lastConnectAttemptMs;
+}
+
+public long connectionSetupTimeoutMs(String id) {
+NodeConnectionState nodeState = this.nodeState.get(id);
+return nodeState.connectionSetupTimeoutMs;

Review comment:
   When `NetworkClient` initializes a connection to a given node 
(`NetworkClient::initiateConnect`), it's guaranteed that the `nodeState` will 
get initialized and won't be `null`. I think it's probably not reasonable if 
the caller wants to get the connection timeout of a given node before the 
connection initialization, which is the reason I prevent this kind of calling 
by throwing the exception.
   
   However, it might be reasonable for a caller to get the 
`lastConnectAttemptMs` before initializing the connection. For example, the 
node provider wants to provide a node with the least recent connection 
attempts. For those nodes haven't been connected, their `NodeConnectionState` 
does not exist. However, this implies that the node has the highest priority 
and we may assume their `lastConnectAttemptMs` is 0.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

2020-06-22 Thread GitBox


d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r443785134



##
File path: 
clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##
@@ -357,6 +398,36 @@ private NodeConnectionState nodeState(String id) {
 return state;
 }
 
+/**
+ * Get the id set of nodes which are in CONNECTING state
+ */
+public Set connectingNodes() {
+return this.connectingNodes;
+}
+
+/**
+ * Get the timestamp of the latest connection attempt of a given node
+ * @param id the connection to fetch the state for
+ */
+public long lastConnectAttemptMs(String id) {
+NodeConnectionState nodeState = this.nodeState.get(id);
+return nodeState == null ? 0 : nodeState.lastConnectAttemptMs;
+}
+
+public long connectionSetupTimeoutMs(String id) {
+NodeConnectionState nodeState = this.nodeState.get(id);
+return nodeState.connectionSetupTimeoutMs;

Review comment:
   When `NetworkClient` initializes a connection to a given node 
(`NetworkClient::initiateConnect`), it's guaranteed that the `nodeState` will 
get initialized and won't be `null`. I think it's probably not reasonable if 
the caller wants to get the connection timeout of a given node before the 
connection initialization, which is the reason I prevent this kind of calling 
by throwing the exception.
   
   However, it might be reasonable for a caller to get the 
`lastConnectAttemptMs` before initializing the connection. For example, the 
node provider wants to provide a node with the least recent connection attempt. 
For those nodes haven't been connected, their `NodeConnectionState` does not 
exist. However, this implies that the node has the highest priority and we may 
assume their `lastConnectAttemptMs` is 0.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

2020-06-22 Thread GitBox


d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r443785134



##
File path: 
clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##
@@ -357,6 +398,36 @@ private NodeConnectionState nodeState(String id) {
 return state;
 }
 
+/**
+ * Get the id set of nodes which are in CONNECTING state
+ */
+public Set connectingNodes() {
+return this.connectingNodes;
+}
+
+/**
+ * Get the timestamp of the latest connection attempt of a given node
+ * @param id the connection to fetch the state for
+ */
+public long lastConnectAttemptMs(String id) {
+NodeConnectionState nodeState = this.nodeState.get(id);
+return nodeState == null ? 0 : nodeState.lastConnectAttemptMs;
+}
+
+public long connectionSetupTimeoutMs(String id) {
+NodeConnectionState nodeState = this.nodeState.get(id);
+return nodeState.connectionSetupTimeoutMs;

Review comment:
   When `NetworkClient` initializes a connection to a given node 
(`NetworkClient::initiateConnect`), it's guaranteed that the `nodeState` will 
get initialized and won't be `null`. I think it's probably not reasonable if 
the caller wants to get the connection timeout of a given node before the 
connection initialization, which is the reason I prevent this kind of calling 
by throwing the exception.
   
   However, it might be reasonable for a caller to get the 
`lastConnectAttemptMs` before initializing the connection to the node. For 
example, the node provider wants to provide a node with the least recent 
connection attempts. For those nodes haven't been connected, their 
`NodeConnectionState` does not exist. However, this implies that the node has 
the highest priority and we may assume their `lastConnectAttemptMs` is 0.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

2020-06-22 Thread GitBox


d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r443774588



##
File path: 
clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##
@@ -357,6 +399,38 @@ private NodeConnectionState nodeState(String id) {
 return state;
 }
 
+/**
+ * Get the id set of nodes which are in CONNECTING state
+ */
+public Set connectingNodes() {
+return this.connectingNodes;
+}
+
+/**
+ * Get the timestamp of the latest connection attempt of a given node
+ * @param id the connection to fetch the state for
+ */
+public long lastConnectAttemptMs(String id) {
+NodeConnectionState nodeState = this.nodeState.get(id);
+return nodeState == null ? 0 : nodeState.lastConnectAttemptMs;
+}
+
+public long connectionSetupTimeoutMs(String id) {

Review comment:
   Yes. Added.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

2020-06-22 Thread GitBox


d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r443769844



##
File path: 
clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##
@@ -158,9 +175,15 @@ public InetAddress currentAddress(String id) throws 
UnknownHostException {
  */
 public void disconnected(String id, long now) {
 NodeConnectionState nodeState = nodeState(id);
-nodeState.state = ConnectionState.DISCONNECTED;
 nodeState.lastConnectAttemptMs = now;
-updateReconnectBackoff(nodeState);
+incrementReconnectBackoff(nodeState);
+if (nodeState.state == ConnectionState.CONNECTING) {
+incrementConnectionSetupTimeout(nodeState);

Review comment:
   Refactored





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

2020-06-22 Thread GitBox


d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r443770204



##
File path: 
clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##
@@ -300,30 +325,47 @@ public AuthenticationException 
authenticationException(String id) {
  * Resets the failure count for a node and sets the reconnect backoff to 
the base
  * value configured via reconnect.backoff.ms
  *
- * @param nodeState The node state object to update
+ * @param nodeState nodeState The node state object to update

Review comment:
   Removed.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

2020-06-22 Thread GitBox


d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r443769782



##
File path: 
clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##
@@ -158,9 +175,15 @@ public InetAddress currentAddress(String id) throws 
UnknownHostException {
  */
 public void disconnected(String id, long now) {
 NodeConnectionState nodeState = nodeState(id);
-nodeState.state = ConnectionState.DISCONNECTED;
 nodeState.lastConnectAttemptMs = now;
-updateReconnectBackoff(nodeState);
+incrementReconnectBackoff(nodeState);

Review comment:
   Make sense. Refactored.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

2020-06-18 Thread GitBox


d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r442394058



##
File path: 
clients/src/main/java/org/apache/kafka/common/utils/GeometricProgression.java
##
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.utils;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * An util class for exponential backoff, backoff, etc...
+ * The formula is Term(n) = random(1 - jitter, 1 + jitter) * scaleFactor * 
(ratio) ^ n
+ * If scaleFactor is greater or equal than termMax, a constant term of will be 
provided
+ * This class is thread-safe
+ */
+public class GeometricProgression {
+private final int ratio;
+private final double expMax;
+private final long scaleFactor;
+private final double jitter;
+
+public GeometricProgression(long scaleFactor, int ratio, long termMax, 
double jitter) {
+this.scaleFactor = scaleFactor;
+this.ratio = ratio;
+this.jitter = jitter;
+this.expMax = termMax > scaleFactor ?
+Math.log(termMax / (double) Math.max(scaleFactor, 1)) / 
Math.log(ratio) : 0;
+}
+
+public long term(long n) {

Review comment:
   As we noticed in your earlier comments, the same value of `attempts` may 
correspond to different terms.
   connection_timeout = constant * 2 ^ (attempts)
   reconnect_backoff = constant * 2 ^ (attempts - 1)
   (in KIP-580) retry_backoff = constant * 2 ^ (attempts - 1) 
   So I think using `retries` or `attempts` instead of `n` might also confuse 
people. Shall we think of another naming? 

##
File path: 
clients/src/main/java/org/apache/kafka/common/utils/GeometricProgression.java
##
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.utils;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * An util class for exponential backoff, backoff, etc...
+ * The formula is Term(n) = random(1 - jitter, 1 + jitter) * scaleFactor * 
(ratio) ^ n
+ * If scaleFactor is greater or equal than termMax, a constant term of will be 
provided
+ * This class is thread-safe
+ */
+public class GeometricProgression {
+private final int ratio;
+private final double expMax;
+private final long scaleFactor;
+private final double jitter;
+
+public GeometricProgression(long scaleFactor, int ratio, long termMax, 
double jitter) {
+this.scaleFactor = scaleFactor;
+this.ratio = ratio;
+this.jitter = jitter;
+this.expMax = termMax > scaleFactor ?
+Math.log(termMax / (double) Math.max(scaleFactor, 1)) / 
Math.log(ratio) : 0;
+}
+
+public long term(long n) {

Review comment:
   As we noticed in your earlier comments, the same value of `attempts` may 
correspond to different terms.
   
   connection_timeout = constant * 2 ^ (attempts)
   reconnect_backoff = constant * 2 ^ (attempts - 1)
   (in KIP-580) retry_backoff = constant * 2 ^ (attempts - 1) 
   
   So I think using `retries` or `attempts` instead of `n` might also confuse 
people. Shall we think of another naming? 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructur

[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

2020-06-18 Thread GitBox


d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r442389711



##
File path: 
clients/src/main/java/org/apache/kafka/common/utils/GeometricProgression.java
##
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.utils;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * An util class for exponential backoff, backoff, etc...
+ * The formula is Term(n) = random(1 - jitter, 1 + jitter) * scaleFactor * 
(ratio) ^ n
+ * If scaleFactor is greater or equal than termMax, a constant term of will be 
provided
+ * This class is thread-safe
+ */
+public class GeometricProgression {

Review comment:
   Good idea. Will go for `ExponentialBackoff`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

2020-06-18 Thread GitBox


d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r442387660



##
File path: 
clients/src/test/java/org/apache/kafka/common/utils/GeometricProgressionTest.java
##
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.utils;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class GeometricProgressionTest {
+@Test
+public void testGeometricProgression() {
+long scaleFactor = 100;
+int ratio = 2;
+long termMax = 2000;
+double jitter = 0.2;
+GeometricProgression geometricProgression = new GeometricProgression(
+scaleFactor, ratio, termMax, jitter
+);
+
+for (int i = 0; i <= 100; i++) {
+for (int n = 0; n <= 4; n++) {
+assertEquals(scaleFactor * Math.pow(ratio, n), 
geometricProgression.term(n),
+scaleFactor * Math.pow(ratio, n) * jitter);
+}
+System.out.println(geometricProgression.term(5));

Review comment:
   Oh, right. I missed removing it.

##
File path: 
clients/src/test/java/org/apache/kafka/common/utils/GeometricProgressionTest.java
##
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.utils;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class GeometricProgressionTest {
+@Test
+public void testGeometricProgression() {
+long scaleFactor = 100;
+int ratio = 2;
+long termMax = 2000;
+double jitter = 0.2;
+GeometricProgression geometricProgression = new GeometricProgression(
+scaleFactor, ratio, termMax, jitter
+);
+
+for (int i = 0; i <= 100; i++) {
+for (int n = 0; n <= 4; n++) {
+assertEquals(scaleFactor * Math.pow(ratio, n), 
geometricProgression.term(n),
+scaleFactor * Math.pow(ratio, n) * jitter);
+}
+System.out.println(geometricProgression.term(5));

Review comment:
   Oh, right. I forgot removing it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

2020-06-18 Thread GitBox


d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r442382471



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
##
@@ -149,6 +155,16 @@
 atLeast(0),
 Importance.MEDIUM,
 REQUEST_TIMEOUT_MS_DOC)
+
.define(SOCKET_CONNECTIONS_SETUP_TIMEOUT_MS_CONFIG,
+Type.LONG,
+10 * 1000,
+Importance.MEDIUM,
+
CommonClientConfigs.SOCKET_CONNECTIONS_SETUP_TIMEOUT_MS_DOC)
+
.define(SOCKET_CONNECTIONS_SETUP_TIMEOUT_MAX_MS_CONFIG,
+Type.LONG,
+127 * 1000,

Review comment:
   Sounds good. WIll refactor.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

2020-06-18 Thread GitBox


d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r442375424



##
File path: clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
##
@@ -786,6 +808,29 @@ private void handleAbortedSends(List 
responses) {
 abortedSends.clear();
 }
 
+/**
+ * Handle socket channel connection timeout. The timeout will hit iff a 
connection
+ * stays at the ConnectionState.CONNECTING state longer than the timeout 
value,
+ * as indicated by ClusterConnectionStates.NodeConnectionState.
+ *
+ * @param responses The list of responses to update
+ * @param now The current time
+ */
+private void handleTimeoutConnections(List responses, long 
now) {
+Set connectingNodes = connectionStates.connectingNodes();
+for (String nodeId: connectingNodes) {

Review comment:
   Refactored





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

2020-06-18 Thread GitBox


d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r442372873



##
File path: 
clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
##
@@ -103,6 +103,12 @@
 Utils.join(SecurityProtocol.names(), ", ") + ".";
 public static final String DEFAULT_SECURITY_PROTOCOL = "PLAINTEXT";
 
+public static final String SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG = 
"socket.connection.setup.timeout.ms";
+public static final String SOCKET_CONNECTION_SETUP_TIMEOUT_MS_DOC = "The 
amount of time the client will wait for the initial socket connection to be 
built. If the connection is not built before the timeout elapses the network 
client will close the socket channel. The default value will be 10 seconds.";
+
+public static final String SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG = 
"socket.connection.setup.timeout.max.ms";
+public static final String SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_DOC = 
"The maximum amount of time the client will wait for the initial socket 
connection to be built. The connection setup timeout will increase 
exponentially for each consecutive connection failure up to this maximum. To 
avoid connection storms, a randomization factor of 0.2 will be applied to the 
backoff resulting in a random range between 20% below and 20% above the 
computed value. The default value will be 127 seconds.";

Review comment:
   Refactored





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

2020-06-18 Thread GitBox


d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r442363632



##
File path: clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
##
@@ -678,7 +696,11 @@ public Node leastLoadedNode(long now) {
 } else if 
(connectionStates.isPreparingConnection(node.idString())) {
 foundConnecting = node;
 } else if (canConnect(node, now)) {
-foundCanConnect = node;
+if (foundCanConnect == null ||
+
this.connectionStates.lastConnectAttemptMs(foundCanConnect.idString()) >
+
this.connectionStates.lastConnectAttemptMs(node.idString())) {
+foundCanConnect = node;
+}

Review comment:
   Yes





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

2020-06-18 Thread GitBox


d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r442362673



##
File path: 
clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
##
@@ -103,6 +103,12 @@
 Utils.join(SecurityProtocol.names(), ", ") + ".";
 public static final String DEFAULT_SECURITY_PROTOCOL = "PLAINTEXT";
 
+public static final String SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG = 
"socket.connection.setup.timeout.ms";
+public static final String SOCKET_CONNECTION_SETUP_TIMEOUT_MS_DOC = "The 
amount of time the client will wait for the initial socket connection to be 
built. If the connection is not built before the timeout elapses the network 
client will close the socket channel. The default value will be 10 seconds.";

Review comment:
   Make sense. I'll change the description and remove the defaults in the 
doc.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

2020-06-18 Thread GitBox


d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r442355123



##
File path: 
clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##
@@ -357,6 +398,36 @@ private NodeConnectionState nodeState(String id) {
 return state;
 }
 
+/**
+ * Get the id set of nodes which are in CONNECTING state
+ */
+public Set connectingNodes() {
+return this.connectingNodes;
+}
+
+/**
+ * Get the timestamp of the latest connection attempt of a given node
+ * @param id the connection to fetch the state for
+ */
+public long lastConnectAttemptMs(String id) {
+NodeConnectionState nodeState = this.nodeState.get(id);
+return nodeState == null ? 0 : nodeState.lastConnectAttemptMs;
+}
+
+public long connectionSetupTimeoutMs(String id) {
+NodeConnectionState nodeState = this.nodeState.get(id);
+return nodeState.connectionSetupTimeoutMs;
+}
+
+/**
+ * Test if the connection to the given node has reached its timeout
+ * @param id the connection to fetch the state for
+ * @param now the current time in ms
+ */
+public boolean isConnectionSetupTimeout(String id, long now) {
+return now - lastConnectAttemptMs(id) > connectionSetupTimeoutMs(id);

Review comment:
   I think so. The `lastConnectAttemptMs` is updated in both `connecting` 
(Line 145 & Line 157) and `disconnected`. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

2020-06-18 Thread GitBox


d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r442355123



##
File path: 
clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##
@@ -357,6 +398,36 @@ private NodeConnectionState nodeState(String id) {
 return state;
 }
 
+/**
+ * Get the id set of nodes which are in CONNECTING state
+ */
+public Set connectingNodes() {
+return this.connectingNodes;
+}
+
+/**
+ * Get the timestamp of the latest connection attempt of a given node
+ * @param id the connection to fetch the state for
+ */
+public long lastConnectAttemptMs(String id) {
+NodeConnectionState nodeState = this.nodeState.get(id);
+return nodeState == null ? 0 : nodeState.lastConnectAttemptMs;
+}
+
+public long connectionSetupTimeoutMs(String id) {
+NodeConnectionState nodeState = this.nodeState.get(id);
+return nodeState.connectionSetupTimeoutMs;
+}
+
+/**
+ * Test if the connection to the given node has reached its timeout
+ * @param id the connection to fetch the state for
+ * @param now the current time in ms
+ */
+public boolean isConnectionSetupTimeout(String id, long now) {
+return now - lastConnectAttemptMs(id) > connectionSetupTimeoutMs(id);

Review comment:
   I think so. The `lastConnectAttemptMs` is updated in both `connecting` 
and `disconnected`. (Line 145 & Line 157)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

2020-06-18 Thread GitBox


d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r442355123



##
File path: 
clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##
@@ -357,6 +398,36 @@ private NodeConnectionState nodeState(String id) {
 return state;
 }
 
+/**
+ * Get the id set of nodes which are in CONNECTING state
+ */
+public Set connectingNodes() {
+return this.connectingNodes;
+}
+
+/**
+ * Get the timestamp of the latest connection attempt of a given node
+ * @param id the connection to fetch the state for
+ */
+public long lastConnectAttemptMs(String id) {
+NodeConnectionState nodeState = this.nodeState.get(id);
+return nodeState == null ? 0 : nodeState.lastConnectAttemptMs;
+}
+
+public long connectionSetupTimeoutMs(String id) {
+NodeConnectionState nodeState = this.nodeState.get(id);
+return nodeState.connectionSetupTimeoutMs;
+}
+
+/**
+ * Test if the connection to the given node has reached its timeout
+ * @param id the connection to fetch the state for
+ * @param now the current time in ms
+ */
+public boolean isConnectionSetupTimeout(String id, long now) {
+return now - lastConnectAttemptMs(id) > connectionSetupTimeoutMs(id);

Review comment:
   I think so. The `lastConnectAttemptMs` is updated in both `connecting` 
and `disconnected`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

2020-06-18 Thread GitBox


d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r442355123



##
File path: 
clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##
@@ -357,6 +398,36 @@ private NodeConnectionState nodeState(String id) {
 return state;
 }
 
+/**
+ * Get the id set of nodes which are in CONNECTING state
+ */
+public Set connectingNodes() {
+return this.connectingNodes;
+}
+
+/**
+ * Get the timestamp of the latest connection attempt of a given node
+ * @param id the connection to fetch the state for
+ */
+public long lastConnectAttemptMs(String id) {
+NodeConnectionState nodeState = this.nodeState.get(id);
+return nodeState == null ? 0 : nodeState.lastConnectAttemptMs;
+}
+
+public long connectionSetupTimeoutMs(String id) {
+NodeConnectionState nodeState = this.nodeState.get(id);
+return nodeState.connectionSetupTimeoutMs;
+}
+
+/**
+ * Test if the connection to the given node has reached its timeout
+ * @param id the connection to fetch the state for
+ * @param now the current time in ms
+ */
+public boolean isConnectionSetupTimeout(String id, long now) {
+return now - lastConnectAttemptMs(id) > connectionSetupTimeoutMs(id);

Review comment:
   Good catch. I'll make the logic record it in both `connecting` and 
`disconnected`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

2020-06-18 Thread GitBox


d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r442354307



##
File path: 
clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##
@@ -357,6 +398,36 @@ private NodeConnectionState nodeState(String id) {
 return state;
 }
 
+/**
+ * Get the id set of nodes which are in CONNECTING state
+ */
+public Set connectingNodes() {
+return this.connectingNodes;
+}
+
+/**
+ * Get the timestamp of the latest connection attempt of a given node
+ * @param id the connection to fetch the state for
+ */
+public long lastConnectAttemptMs(String id) {
+NodeConnectionState nodeState = this.nodeState.get(id);
+return nodeState == null ? 0 : nodeState.lastConnectAttemptMs;
+}
+
+public long connectionSetupTimeoutMs(String id) {
+NodeConnectionState nodeState = this.nodeState.get(id);
+return nodeState.connectionSetupTimeoutMs;

Review comment:
   No. The caller will ensure that the node is in the connecting state. 
I'll add an IllegalStateException here.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

2020-06-17 Thread GitBox


d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r441915398



##
File path: 
clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##
@@ -300,30 +323,48 @@ public AuthenticationException 
authenticationException(String id) {
  * Resets the failure count for a node and sets the reconnect backoff to 
the base
  * value configured via reconnect.backoff.ms
  *
- * @param nodeState The node state object to update
+ * @param nodeState nodeState The node state object to update
  */
 private void resetReconnectBackoff(NodeConnectionState nodeState) {
 nodeState.failedAttempts = 0;
-nodeState.reconnectBackoffMs = this.reconnectBackoffInitMs;
+nodeState.reconnectBackoffMs = reconnectBackoff.term(0);
+}
+
+/**
+ * Resets the failure count for a node and sets the connection setup 
timeout to the base
+ * value configured via socket.connection.setup.timeout.ms
+ *
+ * @param nodeState nodeState The node state object to update
+ */
+private void resetConnectionSetupTimeout(NodeConnectionState nodeState) {
+nodeState.failedConnectAttempts = 0;
+nodeState.connectionSetupTimeoutMs = connectionSetupTimeout.term(0);
 }
 
 /**
- * Update the node reconnect backoff exponentially.
+ * Increment the failure counter, update the node reconnect backoff 
exponentially,
+ * and record the current timestamp.
  * The delay is reconnect.backoff.ms * 2**(failures - 1) * (+/- 20% random 
jitter)
  * Up to a (pre-jitter) maximum of reconnect.backoff.max.ms
  *
  * @param nodeState The node state object to update
  */
-private void updateReconnectBackoff(NodeConnectionState nodeState) {
-if (this.reconnectBackoffMaxMs > this.reconnectBackoffInitMs) {
-nodeState.failedAttempts += 1;
-double backoffExp = Math.min(nodeState.failedAttempts - 1, 
this.reconnectBackoffMaxExp);
-double backoffFactor = Math.pow(RECONNECT_BACKOFF_EXP_BASE, 
backoffExp);
-long reconnectBackoffMs = (long) (this.reconnectBackoffInitMs * 
backoffFactor);
-// Actual backoff is randomized to avoid connection storms.
-double randomFactor = ThreadLocalRandom.current().nextDouble(0.8, 
1.2);
-nodeState.reconnectBackoffMs = (long) (randomFactor * 
reconnectBackoffMs);
-}
+private void incrementReconnectBackoff(NodeConnectionState nodeState, long 
now) {
+nodeState.reconnectBackoffMs = 
reconnectBackoff.term(nodeState.failedAttempts);
+nodeState.failedAttempts++;

Review comment:
   reconnectBackoff.term(0) will return the ${reconnect.backoff.ms} * 2 ^ 0 
* jitter
   reconnectBackoff.term(1) will return the ${reconnect.backoff.ms} * 2 ^ 1 * 
jitter
   
   The difference btw reconnect backoff and connection timeout here is that, 
after the first failed attempts, the connection timeout will be the 1st term of 
the randomized geometric sequence but the reconnect backoff will be the 0st 
term of the randomized geometric sequence. So We should use (failedAttempts - 
1) for fetching reconnect backoff and (failedAttemps) for fetching the 
connection timeout.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

2020-06-17 Thread GitBox


d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r441915398



##
File path: 
clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##
@@ -300,30 +323,48 @@ public AuthenticationException 
authenticationException(String id) {
  * Resets the failure count for a node and sets the reconnect backoff to 
the base
  * value configured via reconnect.backoff.ms
  *
- * @param nodeState The node state object to update
+ * @param nodeState nodeState The node state object to update
  */
 private void resetReconnectBackoff(NodeConnectionState nodeState) {
 nodeState.failedAttempts = 0;
-nodeState.reconnectBackoffMs = this.reconnectBackoffInitMs;
+nodeState.reconnectBackoffMs = reconnectBackoff.term(0);
+}
+
+/**
+ * Resets the failure count for a node and sets the connection setup 
timeout to the base
+ * value configured via socket.connection.setup.timeout.ms
+ *
+ * @param nodeState nodeState The node state object to update
+ */
+private void resetConnectionSetupTimeout(NodeConnectionState nodeState) {
+nodeState.failedConnectAttempts = 0;
+nodeState.connectionSetupTimeoutMs = connectionSetupTimeout.term(0);
 }
 
 /**
- * Update the node reconnect backoff exponentially.
+ * Increment the failure counter, update the node reconnect backoff 
exponentially,
+ * and record the current timestamp.
  * The delay is reconnect.backoff.ms * 2**(failures - 1) * (+/- 20% random 
jitter)
  * Up to a (pre-jitter) maximum of reconnect.backoff.max.ms
  *
  * @param nodeState The node state object to update
  */
-private void updateReconnectBackoff(NodeConnectionState nodeState) {
-if (this.reconnectBackoffMaxMs > this.reconnectBackoffInitMs) {
-nodeState.failedAttempts += 1;
-double backoffExp = Math.min(nodeState.failedAttempts - 1, 
this.reconnectBackoffMaxExp);
-double backoffFactor = Math.pow(RECONNECT_BACKOFF_EXP_BASE, 
backoffExp);
-long reconnectBackoffMs = (long) (this.reconnectBackoffInitMs * 
backoffFactor);
-// Actual backoff is randomized to avoid connection storms.
-double randomFactor = ThreadLocalRandom.current().nextDouble(0.8, 
1.2);
-nodeState.reconnectBackoffMs = (long) (randomFactor * 
reconnectBackoffMs);
-}
+private void incrementReconnectBackoff(NodeConnectionState nodeState, long 
now) {
+nodeState.reconnectBackoffMs = 
reconnectBackoff.term(nodeState.failedAttempts);
+nodeState.failedAttempts++;

Review comment:
   reconnectBackoff.term(0) will return the ${reconnect.backoff.ms} * 2 * 0 
* jitter
   reconnectBackoff.term(1) will return the ${reconnect.backoff.ms} * 2 * 1 * 
jitter
   
   The difference btw reconnect backoff and connection timeout here is that, 
after the first failed attempts, the connection timeout will be the 1st term of 
the randomized geometric sequence but the reconnect backoff will be the 0st 
term of the randomized geometric sequence. So We should use (failedAttempts - 
1) for fetching reconnect backoff and (failedAttemps) for fetching the 
connection timeout.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

2020-06-13 Thread GitBox


d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r439756327



##
File path: clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
##
@@ -554,6 +571,7 @@ private void doSend(ClientRequest clientRequest, boolean 
isInternalRequest, long
 handleDisconnections(responses, updatedNow);
 handleConnections();
 handleInitiateApiVersionRequests(updatedNow);
+handleTimeOutConnections(responses, updatedNow);

Review comment:
   Do you mean TimeOut => Timeout? Refactored.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

2020-06-13 Thread GitBox


d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r439752541



##
File path: 
clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##
@@ -296,36 +314,6 @@ public AuthenticationException 
authenticationException(String id) {
 return state != null ? state.authenticationException : null;
 }
 
-/**
- * Resets the failure count for a node and sets the reconnect backoff to 
the base
- * value configured via reconnect.backoff.ms
- *
- * @param nodeState The node state object to update
- */
-private void resetReconnectBackoff(NodeConnectionState nodeState) {

Review comment:
   Including all types of reset together is probably not a good choice 
because the reset of failed attempts and the reset of the connection timeout 
may happen in different places. 
   
   However, I agree we should have some abstraction on the update and reset 
logic. I'll put the logic in new class methods.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

2020-06-13 Thread GitBox


d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r439751792



##
File path: 
clients/src/main/java/org/apache/kafka/common/utils/GeometricProgression.java
##
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.utils;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * An util class for exponential backoff, backoff, etc...
+ * The formula is Term(n) = random(1 - jitter, 1 + jitter) * scaleFactor * 
(ratio) ^ n
+ * If scaleFactor is greater or equal than termMax, a constant term of will be 
provided
+ * This class is thread-safe
+ */
+public class GeometricProgression {

Review comment:
   The naming is kind of hard because exponential reconnect backoff, 
exponential retry backoff, and exponential timeout are sharing this util class 
at the same time. Any suggestion?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

2020-06-13 Thread GitBox


d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r439751751



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
##
@@ -149,6 +155,16 @@
 atLeast(0),
 Importance.MEDIUM,
 REQUEST_TIMEOUT_MS_DOC)
+
.define(SOCKET_CONNECTIONS_SETUP_TIMEOUT_MS_CONFIG,
+Type.LONG,
+10 * 1000,
+Importance.MEDIUM,
+
CommonClientConfigs.SOCKET_CONNECTIONS_SETUP_TIMEOUT_MS_DOC)
+
.define(SOCKET_CONNECTIONS_SETUP_TIMEOUT_MAX_MS_CONFIG,
+Type.LONG,
+127 * 1000,

Review comment:
   I think the current practice is to hard code the defaults in each 
client's config definition. Do we still want to define the defaults in 
CommonClientConfigs?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

2020-06-12 Thread GitBox


d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r439663059



##
File path: clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
##
@@ -786,6 +808,26 @@ private void handleAbortedSends(List 
responses) {
 abortedSends.clear();
 }
 
+/**
+ * Handle socket channel connection timeout. The timeout will hit iff a 
connection
+ * stays at the ConnectionState.CONNECTING state longer than the timeout 
value,
+ * as indicated by ClusterConnectionStates.NodeConnectionState.
+ *
+ * @param responses The list of responses to update
+ * @param now The current time
+ */
+private void handleTimeOutConnections(List responses, long 
now) {
+Set connectingNodes = connectionStates.connectingNodes();
+for (String nodeId: connectingNodes) {
+if (connectionStates.isConnectionSetupTimeout(nodeId, now)) {
+// close connection to the node
+this.selector.close(nodeId);
+log.debug("Disconnecting from node {} due to socket connection 
setup timeout.", nodeId);

Review comment:
   Added timeout value to the log.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

2020-06-12 Thread GitBox


d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r439647104



##
File path: clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
##
@@ -786,6 +808,26 @@ private void handleAbortedSends(List 
responses) {
 abortedSends.clear();
 }
 
+/**
+ * Handle socket channel connection timeout. The timeout will hit iff a 
connection
+ * stays at the ConnectionState.CONNECTING state longer than the timeout 
value,
+ * as indicated by ClusterConnectionStates.NodeConnectionState.
+ *
+ * @param responses The list of responses to update
+ * @param now The current time
+ */
+private void handleTimeOutConnections(List responses, long 
now) {
+Set connectingNodes = connectionStates.connectingNodes();
+for (String nodeId: connectingNodes) {
+if (connectionStates.isConnectionSetupTimeout(nodeId, now)) {
+// close connection to the node

Review comment:
   Line deleted.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

2020-06-12 Thread GitBox


d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r439646696



##
File path: 
clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##
@@ -357,6 +345,37 @@ private NodeConnectionState nodeState(String id) {
 return state;
 }
 
+/**
+ * Get the id set of nodes which are in CONNECTING state
+ */
+public Set connectingNodes() {
+return this.connectingNodes;
+}
+
+/**
+ * Get the timestamp of the latest connection attempt of a given node
+ * @param id the connection to fetch the state for
+ */
+public long lastConnectAttemptMs(String id) {
+NodeConnectionState nodeState = this.nodeState.get(id);
+return nodeState == null ? 0 : nodeState.lastConnectAttemptMs;
+}
+
+// Visible for testing
+long connectionSetupTimeoutMs(String id) {
+NodeConnectionState nodeState = this.nodeState.get(id);
+return nodeState.connectionSetupTimeoutMs;
+}
+
+/**
+ * Test if the connection to the given node has reached its timeout
+ * @param id the connection to fetch the state for
+ * @param now the current time in ms
+ */
+public boolean isConnectionSetupTimeout(String id, long now) {
+return now - lastConnectAttemptMs(id) > connectionSetupTimeoutMs(id);

Review comment:
   I think we don't need to check if the node is in connecting state 
because the caller is only applying this test to all the nodes in the 
connecting state.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

2020-06-12 Thread GitBox


d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r439646696



##
File path: 
clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##
@@ -357,6 +345,37 @@ private NodeConnectionState nodeState(String id) {
 return state;
 }
 
+/**
+ * Get the id set of nodes which are in CONNECTING state
+ */
+public Set connectingNodes() {
+return this.connectingNodes;
+}
+
+/**
+ * Get the timestamp of the latest connection attempt of a given node
+ * @param id the connection to fetch the state for
+ */
+public long lastConnectAttemptMs(String id) {
+NodeConnectionState nodeState = this.nodeState.get(id);
+return nodeState == null ? 0 : nodeState.lastConnectAttemptMs;
+}
+
+// Visible for testing
+long connectionSetupTimeoutMs(String id) {
+NodeConnectionState nodeState = this.nodeState.get(id);
+return nodeState.connectionSetupTimeoutMs;
+}
+
+/**
+ * Test if the connection to the given node has reached its timeout
+ * @param id the connection to fetch the state for
+ * @param now the current time in ms
+ */
+public boolean isConnectionSetupTimeout(String id, long now) {
+return now - lastConnectAttemptMs(id) > connectionSetupTimeoutMs(id);

Review comment:
   I think we don't need to check if the node is in connecting state since 
the caller is only applying this test to all the nodes in the connecting state.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

2020-06-12 Thread GitBox


d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r439632475



##
File path: 
clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
##
@@ -103,6 +103,12 @@
 Utils.join(SecurityProtocol.names(), ", ") + ".";
 public static final String DEFAULT_SECURITY_PROTOCOL = "PLAINTEXT";
 
+public static final String SOCKET_CONNECTIONS_SETUP_TIMEOUT_MS_CONFIG = 
"socket.connections.setup.timeout.ms";

Review comment:
   I think in the kip I used `socket.connections.setup.timeout.ms`
   Since @cmccabe also suggests `socket.connection.setup.timeout.ms`, I'll 
change the KIP proposal to stick to `socket.connection.setup.timeout.ms`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

2020-06-12 Thread GitBox


d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r439621170



##
File path: 
clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##
@@ -34,19 +36,22 @@
  *
  */
 final class ClusterConnectionStates {
-private final long reconnectBackoffInitMs;
-private final long reconnectBackoffMaxMs;
-private final static int RECONNECT_BACKOFF_EXP_BASE = 2;

Review comment:
   I was thinking that the existing code hardcoded the jitter. I added back 
the constance and also added a constance for jitter.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

2020-06-12 Thread GitBox


d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r439611544



##
File path: 
clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##
@@ -34,19 +36,22 @@
  *
  */
 final class ClusterConnectionStates {
-private final long reconnectBackoffInitMs;
-private final long reconnectBackoffMaxMs;
-private final static int RECONNECT_BACKOFF_EXP_BASE = 2;
-private final double reconnectBackoffMaxExp;
 private final Map nodeState;
 private final Logger log;
+private Set connectingNodes;
+private GeometricProgression reconnectBackoff;
+private GeometricProgression connectionSetupTimeout;
 
-public ClusterConnectionStates(long reconnectBackoffMs, long 
reconnectBackoffMaxMs, LogContext logContext) {
+public ClusterConnectionStates(long reconnectBackoffMs, long 
reconnectBackoffMaxMs,
+   long connectionSetupTimeoutMs, long 
connectionSetupTimeoutMaxMs,
+   LogContext logContext) {
 this.log = logContext.logger(ClusterConnectionStates.class);
-this.reconnectBackoffInitMs = reconnectBackoffMs;
-this.reconnectBackoffMaxMs = reconnectBackoffMaxMs;
-this.reconnectBackoffMaxExp = Math.log(this.reconnectBackoffMaxMs / 
(double) Math.max(reconnectBackoffMs, 1)) / 
Math.log(RECONNECT_BACKOFF_EXP_BASE);
+this.reconnectBackoff = new GeometricProgression(
+reconnectBackoffMs, 2, reconnectBackoffMaxMs, 0.2);

Review comment:
   Yes, we are reusing the code.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

2020-06-12 Thread GitBox


d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r439609952



##
File path: 
clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##
@@ -158,9 +165,16 @@ public InetAddress currentAddress(String id) throws 
UnknownHostException {
  */
 public void disconnected(String id, long now) {
 NodeConnectionState nodeState = nodeState(id);
-nodeState.state = ConnectionState.DISCONNECTED;
 nodeState.lastConnectAttemptMs = now;
-updateReconnectBackoff(nodeState);
+nodeState.failedAttempts++;
+nodeState.reconnectBackoffMs = 
reconnectBackoff.term(nodeState.failedAttempts - 1);
+if (nodeState.state == ConnectionState.CONNECTING) {
+nodeState.connectionSetupTimeoutMs = 
connectionSetupTimeout.term(nodeState.failedAttempts);

Review comment:
   Correct. I'll add a new class property to record connection failures 
separately.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org