rajinisivaram commented on code in PR #13277:
URL: https://github.com/apache/kafka/pull/13277#discussion_r1635927735


##########
clients/src/main/java/org/apache/kafka/clients/LeastLoadedNode.java:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients;
+
+import org.apache.kafka.common.Node;
+
+public class LeastLoadedNode {
+    private final Node node;
+    private final boolean atLeastOneConnectionReady;
+
+    public LeastLoadedNode(Node node, boolean atLeastOneConnectionReady) {
+        this.node = node;
+        this.atLeastOneConnectionReady = atLeastOneConnectionReady;
+    }
+
+    public Node node() {
+        return node;
+    }
+
+    /**
+     * Indicates if the least loaded node is available or at least a ready 
connection exists.
+     *
+     * <p>There may be no node available while ready connections to live nodes 
exist. This may happen when
+     * the connections are overloaded with in-flight requests. This function 
takes this into account.
+     */
+    public boolean hasNodeAvailableOrConnectionReady() {
+        return node() != null || atLeastOneConnectionReady;

Review Comment:
   This can just use `node` field.



##########
clients/src/main/java/org/apache/kafka/clients/NetworkClient.java:
##########
@@ -1121,13 +1140,25 @@ public long maybeUpdate(long now) {
 
             // Beware that the behavior of this method and the computation of 
timeouts for poll() are
             // highly dependent on the behavior of leastLoadedNode.
-            Node node = leastLoadedNode(now);
-            if (node == null) {
+            LeastLoadedNode leastLoadedNode = leastLoadedNode(now);
+
+            // Rebootstrap if needed and configured.
+            if (!leastLoadedNode.hasNodeAvailableOrConnectionReady()
+                    && metadataRecoveryStrategy == 
MetadataRecoveryStrategy.REBOOTSTRAP) {
+                for (final Node oldNode : metadata.fetch().nodes()) {
+                    NetworkClient.this.close(oldNode.idString());

Review Comment:
   Is there a chance that there may be connections to old nodes? I think we 
want to rebootstrap only if every node is in backoff state. We don't close 
nodes in the Admin client equivalent below, which is confusing, so it may be 
better to remove this unless it is required. Checking 
`hasNodeAvailableOrConnectionReady` adds to the confusion since it suggests 
there may be nodes in some non-ready state that need to be closed. Should we 
change `leastLoadedNode()` method to return `hasConnectedOrConnecting` nodes 
instead of `atLeastOneConnectionReady` to make this more obvious?



##########
clients/src/main/java/org/apache/kafka/clients/NetworkClient.java:
##########
@@ -1121,13 +1140,25 @@ public long maybeUpdate(long now) {
 
             // Beware that the behavior of this method and the computation of 
timeouts for poll() are
             // highly dependent on the behavior of leastLoadedNode.
-            Node node = leastLoadedNode(now);
-            if (node == null) {
+            LeastLoadedNode leastLoadedNode = leastLoadedNode(now);
+
+            // Rebootstrap if needed and configured.
+            if (!leastLoadedNode.hasNodeAvailableOrConnectionReady()
+                    && metadataRecoveryStrategy == 
MetadataRecoveryStrategy.REBOOTSTRAP) {

Review Comment:
   Should we swap this around to `if (metadataRecoveryStrategy == 
MetadataRecoveryStrategy.REBOOTSTRAP && 
!leastLoadedNode.hasNodeAvailableOrConnectionReady())` here and in the admin 
client, so that we only use the new method if rebootstrap is configured?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to