dajac commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r445654781
##########
File path:
clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##########
@@ -357,6 +402,46 @@ private NodeConnectionState nodeState(String id) {
return state;
}
+ /**
+ * Get the id set of nodes which are in CONNECTING state
+ */
+ public Set<String> connectingNodes() {
+ return this.connectingNodes;
+ }
+
+ /**
+ * Get the timestamp of the latest connection attempt of a given node
+ * @param id the connection to fetch the state for
+ */
+ public long lastConnectAttemptMs(String id) {
+ NodeConnectionState nodeState = this.nodeState.get(id);
+ return nodeState == null ? 0 : nodeState.lastConnectAttemptMs;
+ }
+
+ /**
+ * Get the current socket connection setup timeout of the given node.
+ * The base value is defined via socket.connection.setup.timeout.
+ * @param id the connection to fetch the state for
+ */
+ public long connectionSetupTimeoutMs(String id) {
+ NodeConnectionState nodeState = this.nodeState.get(id);
+ if (nodeState == null)
+ throw new IllegalStateException("Connection to node " + id + "
hasn't been initialized");
+ return nodeState.connectionSetupTimeoutMs;
+ }
+
+ /**
+ * Test if the connection to the given node has reached its timeout
+ * @param id the connection to fetch the state for
+ * @param now the current time in ms
+ */
+ public boolean isConnectionSetupTimeout(String id, long now) {
+ NodeConnectionState nodeState = this.nodeState.get(id);
Review comment:
nit: You can also use `this.nodeState(id)` here.
##########
File path:
clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##########
@@ -357,6 +402,46 @@ private NodeConnectionState nodeState(String id) {
return state;
}
+ /**
+ * Get the id set of nodes which are in CONNECTING state
+ */
+ public Set<String> connectingNodes() {
+ return this.connectingNodes;
+ }
+
+ /**
+ * Get the timestamp of the latest connection attempt of a given node
+ * @param id the connection to fetch the state for
+ */
+ public long lastConnectAttemptMs(String id) {
+ NodeConnectionState nodeState = this.nodeState.get(id);
+ return nodeState == null ? 0 : nodeState.lastConnectAttemptMs;
+ }
+
+ /**
+ * Get the current socket connection setup timeout of the given node.
+ * The base value is defined via socket.connection.setup.timeout.
+ * @param id the connection to fetch the state for
+ */
+ public long connectionSetupTimeoutMs(String id) {
+ NodeConnectionState nodeState = this.nodeState.get(id);
+ if (nodeState == null)
+ throw new IllegalStateException("Connection to node " + id + "
hasn't been initialized");
Review comment:
nit: This can be replaced by `this.nodeState(id)`.
##########
File path:
clients/src/main/java/org/apache/kafka/common/utils/ExponentialBackoff.java
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.utils;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * An utility class for keeping the parameters and providing the value of
exponential
+ * retry backoff, exponential reconnect backoff, exponential timeout, etc.
+ * The formula is
+ * Backoff(attempts) = random(1 - jitter, 1 + jitter) * initialInterval *
multiplier ^ attempts
+ * If scaleFactor is greater or equal than termMax, a constant backoff of will
be provided
Review comment:
Replace `scaleFactor` and `termMax` with the new terms.
##########
File path:
clients/src/main/java/org/apache/kafka/common/utils/ExponentialBackoff.java
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.utils;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * An utility class for keeping the parameters and providing the value of
exponential
+ * retry backoff, exponential reconnect backoff, exponential timeout, etc.
+ * The formula is
Review comment:
nit: `The formula is` -> `The formula is:`
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]