rajinisivaram commented on code in PR #13277: URL: https://github.com/apache/kafka/pull/13277#discussion_r1635927735
########## clients/src/main/java/org/apache/kafka/clients/LeastLoadedNode.java: ########## @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients; + +import org.apache.kafka.common.Node; + +public class LeastLoadedNode { + private final Node node; + private final boolean atLeastOneConnectionReady; + + public LeastLoadedNode(Node node, boolean atLeastOneConnectionReady) { + this.node = node; + this.atLeastOneConnectionReady = atLeastOneConnectionReady; + } + + public Node node() { + return node; + } + + /** + * Indicates if the least loaded node is available or at least a ready connection exists. + * + * <p>There may be no node available while ready connections to live nodes exist. This may happen when + * the connections are overloaded with in-flight requests. This function takes this into account. + */ + public boolean hasNodeAvailableOrConnectionReady() { + return node() != null || atLeastOneConnectionReady; Review Comment: This can just use `node` field. ########## clients/src/main/java/org/apache/kafka/clients/NetworkClient.java: ########## @@ -1121,13 +1140,25 @@ public long maybeUpdate(long now) { // Beware that the behavior of this method and the computation of timeouts for poll() are // highly dependent on the behavior of leastLoadedNode. - Node node = leastLoadedNode(now); - if (node == null) { + LeastLoadedNode leastLoadedNode = leastLoadedNode(now); + + // Rebootstrap if needed and configured. + if (!leastLoadedNode.hasNodeAvailableOrConnectionReady() + && metadataRecoveryStrategy == MetadataRecoveryStrategy.REBOOTSTRAP) { + for (final Node oldNode : metadata.fetch().nodes()) { + NetworkClient.this.close(oldNode.idString()); Review Comment: Is there a chance that there may be connections to old nodes? I think we want to rebootstrap only if every node is in backoff state. We don't close nodes in the Admin client equivalent below, which is confusing, so it may be better to remove this unless it is required. Checking `hasNodeAvailableOrConnectionReady` adds to the confusion since it suggests there may be nodes in some non-ready state that need to be closed. Should we change `leastLoadedNode()` method to return `hasConnectedOrConnecting` nodes instead of `atLeastOneConnectionReady` to make this more obvious? ########## clients/src/main/java/org/apache/kafka/clients/NetworkClient.java: ########## @@ -1121,13 +1140,25 @@ public long maybeUpdate(long now) { // Beware that the behavior of this method and the computation of timeouts for poll() are // highly dependent on the behavior of leastLoadedNode. - Node node = leastLoadedNode(now); - if (node == null) { + LeastLoadedNode leastLoadedNode = leastLoadedNode(now); + + // Rebootstrap if needed and configured. + if (!leastLoadedNode.hasNodeAvailableOrConnectionReady() + && metadataRecoveryStrategy == MetadataRecoveryStrategy.REBOOTSTRAP) { Review Comment: Should we swap this around to `if (metadataRecoveryStrategy == MetadataRecoveryStrategy.REBOOTSTRAP && !leastLoadedNode.hasNodeAvailableOrConnectionReady())` here and in the admin client, so that we only use the new method if rebootstrap is configured? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org