ptupitsyn commented on code in PR #3104:
URL: https://github.com/apache/ignite-3/pull/3104#discussion_r1469780669


##########
modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java:
##########
@@ -271,28 +274,36 @@ private CompletableFuture<PayloadInputChannel> 
executeOnOneNode(
         return ch.serviceAsync(
                 ClientOp.COMPUTE_EXECUTE,
                 w -> {
-                    if 
(w.clientChannel().protocolContext().clusterNode().name().equals(node.name())) {
-                        w.out().packNil();
-                    } else {
-                        w.out().packString(node.name());
-                    }
-
+                    packNodeNames(w.out(), nodes);
                     packJob(w.out(), units, jobClassName, options, args);
                 },
                 ch -> ch,
-                node.name(),
+                selectPreferredNodeName(nodes),
                 null,
                 true);
     }
 
-    private static ClusterNode randomNode(Set<ClusterNode> nodes) {
+    private @Nullable String selectPreferredNodeName(Set<ClusterNode> nodes) {
+        Set<String> preferredNodeNames = 
nodes.stream().map(ClusterNode::name).collect(Collectors.toCollection(HashSet::new));
+        Set<String> connections = 
ch.connections().stream().map(ClusterNode::name).collect(Collectors.toSet());
+
+        // Select the preferred node from the intersection of the candidate 
nodes and existing connections
+        preferredNodeNames.retainAll(connections);
+
+        if (preferredNodeNames.isEmpty()) {
+            return null;
+        }
+        return randomNode(preferredNodeNames);
+    }
+
+    private static String randomNode(Set<String> nodes) {

Review Comment:
   ```suggestion
       private @Nullable String selectPreferredNodeName(Set<ClusterNode> nodes) 
{
           List<String> candidateNodeNames = new ArrayList<>(nodes.size());
           Set<String> connectedNodeNames = ch.connectedNodeNames();
   
           for (ClusterNode node : nodes) {
               if (connectedNodeNames.contains(node.name())) {
                   candidateNodeNames.add(node.name());
               }
           }
   
           if (candidateNodeNames.isEmpty()) {
               return null;
           }
   
           if (candidateNodeNames.size() == 1) {
               return candidateNodeNames.get(0);
           }
   
           int randomIdx = 
ThreadLocalRandom.current().nextInt(candidateNodeNames.size());
           return candidateNodeNames.get(randomIdx);
       }
   ```
   
   * Avoid streams
   * Initialize collections with capacity
   
   
   `ReliableChannel.connectedNodeNames`:
   ```java
       /**
        * Gets connected node names.
        *
        * @return Set of connected node names.
        */
       public Set<String> connectedNodeNames() {
           Set<String> res = new HashSet<>(channels.size());
   
           for (var holder : nodeChannelsByName.values()) {
               var chFut = holder.chFut;
   
               if (chFut != null) {
                   var ch = ClientFutureUtils.getNowSafe(chFut);
   
                   if (ch != null && !ch.closed()) {
                       res.add(ch.protocolContext().clusterNode().name());
                   }
               }
           }
   
           return res;
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to