ptupitsyn commented on code in PR #3104: URL: https://github.com/apache/ignite-3/pull/3104#discussion_r1469780669
########## modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java: ########## @@ -271,28 +274,36 @@ private CompletableFuture<PayloadInputChannel> executeOnOneNode( return ch.serviceAsync( ClientOp.COMPUTE_EXECUTE, w -> { - if (w.clientChannel().protocolContext().clusterNode().name().equals(node.name())) { - w.out().packNil(); - } else { - w.out().packString(node.name()); - } - + packNodeNames(w.out(), nodes); packJob(w.out(), units, jobClassName, options, args); }, ch -> ch, - node.name(), + selectPreferredNodeName(nodes), null, true); } - private static ClusterNode randomNode(Set<ClusterNode> nodes) { + private @Nullable String selectPreferredNodeName(Set<ClusterNode> nodes) { + Set<String> preferredNodeNames = nodes.stream().map(ClusterNode::name).collect(Collectors.toCollection(HashSet::new)); + Set<String> connections = ch.connections().stream().map(ClusterNode::name).collect(Collectors.toSet()); + + // Select the preferred node from the intersection of the candidate nodes and existing connections + preferredNodeNames.retainAll(connections); + + if (preferredNodeNames.isEmpty()) { + return null; + } + return randomNode(preferredNodeNames); + } + + private static String randomNode(Set<String> nodes) { Review Comment: ```suggestion private @Nullable String selectPreferredNodeName(Set<ClusterNode> nodes) { List<String> candidateNodeNames = new ArrayList<>(nodes.size()); Set<String> connectedNodeNames = ch.connectedNodeNames(); for (ClusterNode node : nodes) { if (connectedNodeNames.contains(node.name())) { candidateNodeNames.add(node.name()); } } if (candidateNodeNames.isEmpty()) { return null; } if (candidateNodeNames.size() == 1) { return candidateNodeNames.get(0); } int randomIdx = ThreadLocalRandom.current().nextInt(candidateNodeNames.size()); return candidateNodeNames.get(randomIdx); } ``` * Avoid streams * Initialize collections with capacity `ReliableChannel.connectedNodeNames`: ```java /** * Gets connected node names. * * @return Set of connected node names. */ public Set<String> connectedNodeNames() { Set<String> res = new HashSet<>(channels.size()); for (var holder : nodeChannelsByName.values()) { var chFut = holder.chFut; if (chFut != null) { var ch = ClientFutureUtils.getNowSafe(chFut); if (ch != null && !ch.closed()) { res.add(ch.protocolContext().clusterNode().name()); } } } return res; } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org