dajac commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r442725547
##########
File path:
clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##########
@@ -357,6 +399,38 @@ private NodeConnectionState nodeState(String id) {
return state;
}
+ /**
+ * Get the id set of nodes which are in CONNECTING state
+ */
+ public Set<String> connectingNodes() {
+ return this.connectingNodes;
+ }
+
+ /**
+ * Get the timestamp of the latest connection attempt of a given node
+ * @param id the connection to fetch the state for
+ */
+ public long lastConnectAttemptMs(String id) {
+ NodeConnectionState nodeState = this.nodeState.get(id);
+ return nodeState == null ? 0 : nodeState.lastConnectAttemptMs;
+ }
+
+ public long connectionSetupTimeoutMs(String id) {
Review comment:
Can we add a javadoc for this method?
##########
File path:
clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##########
@@ -357,6 +398,36 @@ private NodeConnectionState nodeState(String id) {
return state;
}
+ /**
+ * Get the id set of nodes which are in CONNECTING state
+ */
+ public Set<String> connectingNodes() {
+ return this.connectingNodes;
+ }
+
+ /**
+ * Get the timestamp of the latest connection attempt of a given node
+ * @param id the connection to fetch the state for
+ */
+ public long lastConnectAttemptMs(String id) {
+ NodeConnectionState nodeState = this.nodeState.get(id);
+ return nodeState == null ? 0 : nodeState.lastConnectAttemptMs;
+ }
+
+ public long connectionSetupTimeoutMs(String id) {
+ NodeConnectionState nodeState = this.nodeState.get(id);
+ return nodeState.connectionSetupTimeoutMs;
+ }
+
+ /**
+ * Test if the connection to the given node has reached its timeout
+ * @param id the connection to fetch the state for
+ * @param now the current time in ms
+ */
+ public boolean isConnectionSetupTimeout(String id, long now) {
+ return now - lastConnectAttemptMs(id) > connectionSetupTimeoutMs(id);
Review comment:
Oh right. I missed the ones in `connecting`. Thanks for the
clarification.
##########
File path:
clients/src/main/java/org/apache/kafka/common/utils/ExponentialBackoff.java
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.utils;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * An utility class for exponential backoff, timeout, etc...
Review comment:
nit: I would rephrase this a bit and also explain briefly how we use it
in AK (e.g. connect timeout, reconnection backoff, etc.). Also, I would suggest
to terminate phrases with `.`.
##########
File path:
clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##########
@@ -300,30 +323,48 @@ public AuthenticationException
authenticationException(String id) {
* Resets the failure count for a node and sets the reconnect backoff to
the base
* value configured via reconnect.backoff.ms
*
- * @param nodeState The node state object to update
+ * @param nodeState nodeState The node state object to update
*/
private void resetReconnectBackoff(NodeConnectionState nodeState) {
nodeState.failedAttempts = 0;
- nodeState.reconnectBackoffMs = this.reconnectBackoffInitMs;
+ nodeState.reconnectBackoffMs = reconnectBackoff.term(0);
+ }
+
+ /**
+ * Resets the failure count for a node and sets the connection setup
timeout to the base
+ * value configured via socket.connection.setup.timeout.ms
+ *
+ * @param nodeState nodeState The node state object to update
+ */
+ private void resetConnectionSetupTimeout(NodeConnectionState nodeState) {
+ nodeState.failedConnectAttempts = 0;
+ nodeState.connectionSetupTimeoutMs = connectionSetupTimeout.term(0);
}
/**
- * Update the node reconnect backoff exponentially.
+ * Increment the failure counter, update the node reconnect backoff
exponentially,
+ * and record the current timestamp.
* The delay is reconnect.backoff.ms * 2**(failures - 1) * (+/- 20% random
jitter)
* Up to a (pre-jitter) maximum of reconnect.backoff.max.ms
*
* @param nodeState The node state object to update
*/
- private void updateReconnectBackoff(NodeConnectionState nodeState) {
- if (this.reconnectBackoffMaxMs > this.reconnectBackoffInitMs) {
- nodeState.failedAttempts += 1;
- double backoffExp = Math.min(nodeState.failedAttempts - 1,
this.reconnectBackoffMaxExp);
- double backoffFactor = Math.pow(RECONNECT_BACKOFF_EXP_BASE,
backoffExp);
- long reconnectBackoffMs = (long) (this.reconnectBackoffInitMs *
backoffFactor);
- // Actual backoff is randomized to avoid connection storms.
- double randomFactor = ThreadLocalRandom.current().nextDouble(0.8,
1.2);
- nodeState.reconnectBackoffMs = (long) (randomFactor *
reconnectBackoffMs);
- }
+ private void incrementReconnectBackoff(NodeConnectionState nodeState, long
now) {
+ nodeState.reconnectBackoffMs =
reconnectBackoff.term(nodeState.failedAttempts);
+ nodeState.failedAttempts++;
Review comment:
That makes sense. Thanks for the clarification.
##########
File path:
clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##########
@@ -357,6 +398,36 @@ private NodeConnectionState nodeState(String id) {
return state;
}
+ /**
+ * Get the id set of nodes which are in CONNECTING state
+ */
+ public Set<String> connectingNodes() {
+ return this.connectingNodes;
+ }
+
+ /**
+ * Get the timestamp of the latest connection attempt of a given node
+ * @param id the connection to fetch the state for
+ */
+ public long lastConnectAttemptMs(String id) {
+ NodeConnectionState nodeState = this.nodeState.get(id);
+ return nodeState == null ? 0 : nodeState.lastConnectAttemptMs;
+ }
+
+ public long connectionSetupTimeoutMs(String id) {
+ NodeConnectionState nodeState = this.nodeState.get(id);
+ return nodeState.connectionSetupTimeoutMs;
Review comment:
I would rather prefer to handle this like we did in
`lastConnectAttemptMs` in order to remain consistent. If `nodeState` is `null`,
we can return `0`.
##########
File path:
clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
##########
@@ -185,4 +191,9 @@ public static void
warnIfDeprecatedDnsLookupValue(AbstractConfig config) {
CLIENT_DNS_LOOKUP_CONFIG, ClientDnsLookup.DEFAULT,
ClientDnsLookup.USE_ALL_DNS_IPS);
}
+
+ public class Defaults {
+ public static final long SocketConnectionSetupTimeoutMs = 10 * 1000;
+ public static final long SocketConnectionSetupTimeoutMaxMs = 127 *
1000;
Review comment:
We usually defined constants with capital letters and underscores to
separate words. Moreover, we usually put defaults next to the config and the
doc. We could use the following:
- `SOCKET_CONNECTION_SETUP_TIMEOUT_MS_DEFAULT`; and
- `SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_DEFAULT`.
##########
File path:
clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##########
@@ -357,6 +345,37 @@ private NodeConnectionState nodeState(String id) {
return state;
}
+ /**
+ * Get the id set of nodes which are in CONNECTING state
+ */
+ public Set<String> connectingNodes() {
+ return this.connectingNodes;
+ }
+
+ /**
+ * Get the timestamp of the latest connection attempt of a given node
+ * @param id the connection to fetch the state for
+ */
+ public long lastConnectAttemptMs(String id) {
+ NodeConnectionState nodeState = this.nodeState.get(id);
+ return nodeState == null ? 0 : nodeState.lastConnectAttemptMs;
+ }
+
+ // Visible for testing
+ long connectionSetupTimeoutMs(String id) {
+ NodeConnectionState nodeState = this.nodeState.get(id);
+ return nodeState.connectionSetupTimeoutMs;
+ }
+
+ /**
+ * Test if the connection to the given node has reached its timeout
+ * @param id the connection to fetch the state for
+ * @param now the current time in ms
+ */
+ public boolean isConnectionSetupTimeout(String id, long now) {
+ return now - lastConnectAttemptMs(id) > connectionSetupTimeoutMs(id);
Review comment:
That is indeed true today as the caller only calls with nodes in
`connectingNodes` but that may not be true forever. I would add the check as
suggested by Rajini here to make the implementation safe.
##########
File path:
clients/src/main/java/org/apache/kafka/common/utils/ExponentialBackoff.java
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.utils;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * An utility class for exponential backoff, timeout, etc...
+ * The formula is Term(n) = random(1 - jitter, 1 + jitter) * scaleFactor *
(ratio) ^ n
+ * If scaleFactor is greater or equal than termMax, a constant term of will be
provided
+ * This class is thread-safe
+ */
+public class ExponentialBackoff {
+ private final int ratio;
+ private final double expMax;
+ private final long scaleFactor;
+ private final double jitter;
+
+ public ExponentialBackoff(long scaleFactor, int ratio, long termMax,
double jitter) {
Review comment:
I find the terminology used here not intuitive. What about using
something like: `initialInterval`, `multiplier` and `maxInterval`? I think
these are more common when it comes to configuring backoff.
##########
File path:
clients/src/main/java/org/apache/kafka/common/utils/ExponentialBackoff.java
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.utils;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * An utility class for exponential backoff, timeout, etc...
+ * The formula is Term(n) = random(1 - jitter, 1 + jitter) * scaleFactor *
(ratio) ^ n
+ * If scaleFactor is greater or equal than termMax, a constant term of will be
provided
+ * This class is thread-safe
+ */
+public class ExponentialBackoff {
+ private final int ratio;
+ private final double expMax;
+ private final long scaleFactor;
+ private final double jitter;
+
+ public ExponentialBackoff(long scaleFactor, int ratio, long termMax,
double jitter) {
+ this.scaleFactor = scaleFactor;
+ this.ratio = ratio;
+ this.jitter = jitter;
+ this.expMax = termMax > scaleFactor ?
+ Math.log(termMax / (double) Math.max(scaleFactor, 1)) /
Math.log(ratio) : 0;
+ }
+
+ public long term(long n) {
Review comment:
nit: Let me try with another suggestion ;) What about `backoff(long
attempts)`?
##########
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##########
@@ -88,6 +88,8 @@ object Defaults {
val MaxConnections: Int = Int.MaxValue
val ConnectionsMaxIdleMs = 10 * 60 * 1000L
val RequestTimeoutMs = 30000
+ val ConnectionSetupTimeoutMs = 10 * 1000L
+ val ConnectionSetupTimeoutMaxMs = 127 * 1000L
Review comment:
We can use the ones defined in `CommonClientConfigs` here.
##########
File path: clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
##########
@@ -771,7 +794,7 @@ private void processDisconnection(List<ClientResponse>
responses,
* @param responses The list of responses to update
* @param now The current time
*/
- private void handleTimedOutRequests(List<ClientResponse> responses, long
now) {
+ private void handleTimeoutRequests(List<ClientResponse> responses, long
now) {
Review comment:
nit: I personally prefer the previous name. I would also rename
`handleTimeoutConnections` to `handleTimedOutConnections`.
##########
File path:
clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
##########
@@ -321,4 +325,37 @@ public void testIsPreparingConnection() {
connectionStates.disconnected(nodeId1, time.milliseconds());
assertFalse(connectionStates.isPreparingConnection(nodeId1));
}
+
+ @Test
+ public void testExponentialConnectionSetupTimeout() {
+ assertTrue(connectionStates.canConnect(nodeId1, time.milliseconds()));
+
+ // Check the exponential timeout growth
+ for (int n = 0; n <= Math.log((double) connectionSetupTimeoutMaxMs /
connectionSetupTimeoutMs) / Math.log(2); n++) {
Review comment:
nit: Could we define a local constant for the base `2` and reuse it
everywhere?
##########
File path:
clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
##########
@@ -321,4 +325,37 @@ public void testIsPreparingConnection() {
connectionStates.disconnected(nodeId1, time.milliseconds());
assertFalse(connectionStates.isPreparingConnection(nodeId1));
}
+
+ @Test
+ public void testExponentialConnectionSetupTimeout() {
+ assertTrue(connectionStates.canConnect(nodeId1, time.milliseconds()));
+
+ // Check the exponential timeout growth
+ for (int n = 0; n <= Math.log((double) connectionSetupTimeoutMaxMs /
connectionSetupTimeoutMs) / Math.log(2); n++) {
+ connectionStates.connecting(nodeId1, time.milliseconds(),
"localhost", ClientDnsLookup.DEFAULT);
+ assertTrue(connectionStates.connectingNodes().contains(nodeId1));
+ assertEquals(connectionSetupTimeoutMs * Math.pow(2, n),
+ connectionStates.connectionSetupTimeoutMs(nodeId1),
+ connectionSetupTimeoutMs * Math.pow(2, n) * 0.2);
+ connectionStates.disconnected(nodeId1, time.milliseconds());
+ assertFalse(connectionStates.connectingNodes().contains(nodeId1));
+ }
+
+ // Check the timeout value upper bound
+ connectionStates.connecting(nodeId1, time.milliseconds(), "localhost",
ClientDnsLookup.DEFAULT);
+ assertEquals(connectionStates.connectionSetupTimeoutMs(nodeId1),
connectionSetupTimeoutMaxMs, connectionSetupTimeoutMaxMs * 0.2);
Review comment:
nit: I would put `connectionSetupTimeoutMaxMs` first as it is the
expected value. The same applies below.
##########
File path:
clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
##########
@@ -321,4 +325,37 @@ public void testIsPreparingConnection() {
connectionStates.disconnected(nodeId1, time.milliseconds());
assertFalse(connectionStates.isPreparingConnection(nodeId1));
}
+
+ @Test
+ public void testExponentialConnectionSetupTimeout() {
+ assertTrue(connectionStates.canConnect(nodeId1, time.milliseconds()));
+
+ // Check the exponential timeout growth
+ for (int n = 0; n <= Math.log((double) connectionSetupTimeoutMaxMs /
connectionSetupTimeoutMs) / Math.log(2); n++) {
+ connectionStates.connecting(nodeId1, time.milliseconds(),
"localhost", ClientDnsLookup.DEFAULT);
+ assertTrue(connectionStates.connectingNodes().contains(nodeId1));
+ assertEquals(connectionSetupTimeoutMs * Math.pow(2, n),
+ connectionStates.connectionSetupTimeoutMs(nodeId1),
+ connectionSetupTimeoutMs * Math.pow(2, n) * 0.2);
Review comment:
nit: For the jitter, what about defining a constant like
`reconnectBackoffJitter`?
##########
File path: core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala
##########
@@ -205,6 +205,8 @@ object BrokerApiVersionsCommand {
private object AdminClient {
val DefaultConnectionMaxIdleMs = 9 * 60 * 1000
val DefaultRequestTimeoutMs = 5000
+ val DefaultSocketConnectionSetupMs = 10 * 1000
+ val DefaultSocketConnectionSetupMaxMs = 127 * 1000
Review comment:
It seems that we can remove these two and use the ones in
`CommonClientConfigs`.
##########
File path:
clients/src/test/java/org/apache/kafka/common/utils/ExponentialBackoffTest.java
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.utils;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class ExponentialBackoffTest {
+ @Test
+ public void testExponentialBackoff() {
+ long scaleFactor = 100;
+ int ratio = 2;
+ long termMax = 2000;
+ double jitter = 0.2;
+ ExponentialBackoff exponentialBackoff = new ExponentialBackoff(
+ scaleFactor, ratio, termMax, jitter
+ );
+
+ for (int i = 0; i <= 100; i++) {
+ for (int n = 0; n <= 4; n++) {
+ assertEquals(scaleFactor * Math.pow(ratio, n),
exponentialBackoff.term(n),
+ scaleFactor * Math.pow(ratio, n) * jitter);
+ }
+ assertTrue(exponentialBackoff.term(1000) <= termMax * (1 +
jitter));
Review comment:
I think that it would be good to test with `n=5` as well to ensure that
`termMax` is already applied from it. Perhaps, we could do the following:
```
for (int n = 0; n <= 100; n++) {
if (n <= 4)
assertEquals(scaleFactor * Math.pow(ratio, n),
exponentialBackoff.term(n),
scaleFactor * Math.pow(ratio, n) * jitter);
else
assertTrue(exponentialBackoff.term(1000) <= termMax * (1 + jitter));
}
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]