ptupitsyn commented on code in PR #3104:
URL: https://github.com/apache/ignite-3/pull/3104#discussion_r1469205439


##########
modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItEmbeddedWorkerShutdownTest.java:
##########
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.compute;
+
+import org.apache.ignite.compute.IgniteCompute;
+import org.apache.ignite.internal.app.IgniteImpl;
+
+class ItEmbeddedWorkerShutdownTest extends ItWorkerShutdownTest {
+

Review Comment:
   Redundant blank like



##########
modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java:
##########
@@ -271,16 +274,12 @@ private CompletableFuture<PayloadInputChannel> 
executeOnOneNode(
         return ch.serviceAsync(
                 ClientOp.COMPUTE_EXECUTE,
                 w -> {
-                    if 
(w.clientChannel().protocolContext().clusterNode().name().equals(node.name())) {
-                        w.out().packNil();
-                    } else {
-                        w.out().packString(node.name());
-                    }
-
+                    String[] nodeNames = 
nodes.stream().map(ClusterNode::name).toArray(String[]::new);
+                    w.out().packObjectArrayAsBinaryTuple(nodeNames);

Review Comment:
   1. Java streams are unnecessary here (and inefficient)
   2. `packObjectArrayAsBinaryTuple` is excessive here, it includes type code 
for every element
   
   Instead, let's do `packInt` for size and then `packString` in a loop.



##########
modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java:
##########
@@ -261,8 +263,9 @@ public <R> Map<ClusterNode, JobExecution<R>> broadcastAsync(
         return map;
     }
 
-    private CompletableFuture<PayloadInputChannel> executeOnOneNode(
-            ClusterNode node,
+    private CompletableFuture<PayloadInputChannel> executeOnNodesAsync(
+            ClusterNode coordinatorNode,

Review Comment:
   `Coordinator` does not make sense.
   * For `execute` it is not needed, we can use any node from the set (and we 
should pick one of the nodes where a direct client connection exists, if 
possible)
   * For `broadcast` we only have one node in the set anyway



##########
modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeInternal.java:
##########
@@ -17,16 +17,45 @@
 
 package org.apache.ignite.internal.compute;
 
+import java.util.List;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.DeploymentUnit;
 import org.apache.ignite.compute.IgniteCompute;
+import org.apache.ignite.compute.JobExecution;
+import org.apache.ignite.compute.JobExecutionOptions;
 import org.apache.ignite.compute.JobStatus;
+import org.apache.ignite.network.ClusterNode;
 import org.jetbrains.annotations.Nullable;
 
 /**
  * Internal compute facade.
  */
 public interface IgniteComputeInternal extends IgniteCompute {
+    /**
+     * Executes a {@link ComputeJob} of the given class on a single node. If 
the node leaves the cluster, it will be restarted on one of the
+     * candidate nodes.
+     *
+     * @param <R> Job result type.
+     * @param targetNode Node to execute the job on.
+     * @param candidates Candidate nodes; In case target node left the 
cluster, the job will be restarted on one of them.
+     * @param units Deployment units. Can be empty.
+     * @param jobClassName Name of the job class to execute.
+     * @param options Job execution options.
+     * @param args Arguments of the job.
+     * @return CompletableFuture Job result.
+     */
+    <R> JobExecution<R> executeAsyncWithFailover(
+            ClusterNode targetNode,
+            Set<ClusterNode> candidates,

Review Comment:
   Do we really need to pass instances of `Set` around in the internal API? 
`List` is more efficient and easier.



##########
modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteRequest.java:
##########
@@ -50,29 +56,82 @@ public class ClientComputeExecuteRequest {
     public static CompletableFuture<Void> process(
             ClientMessageUnpacker in,
             ClientMessagePacker out,
-            IgniteCompute compute,
+            IgniteComputeInternal compute,
             ClusterService cluster,
-            NotificationSender notificationSender) {
-        var nodeName = in.tryUnpackNil() ? null : in.unpackString();
-
-        var node = nodeName == null
-                ? cluster.topologyService().localMember()
-                : cluster.topologyService().getByConsistentId(nodeName);
-
-        if (node == null) {
-            throw new IgniteException("Specified node is not present in the 
cluster: " + nodeName);
-        }
+            NotificationSender notificationSender
+    ) {
+        Set<ClusterNode> candidates = unpackCandidateNodes(in, cluster);
 
         List<DeploymentUnit> deploymentUnits = unpackDeploymentUnits(in);
         String jobClassName = in.unpackString();
         JobExecutionOptions options = 
JobExecutionOptions.builder().priority(in.unpackInt()).maxRetries(in.unpackInt()).build();
         Object[] args = unpackArgs(in);
 
-        JobExecution<Object> execution = compute.executeAsync(Set.of(node), 
deploymentUnits, jobClassName, options, args);
+        ClusterNode localNode = cluster.topologyService().localMember();
+        ClusterNode targetNode = selectTargetNode(candidates, localNode);
+        candidates.remove(targetNode);
+
+        JobExecution<Object> execution = 
compute.executeAsyncWithFailover(targetNode, candidates,
+                deploymentUnits, jobClassName, options, args);
         sendResultAndStatus(execution, notificationSender);
         return execution.idAsync().thenAccept(out::packUuid);
     }
 
+    private static Set<ClusterNode> unpackCandidateNodes(ClientMessageUnpacker 
in, ClusterService cluster) {
+        Set<String> candidateIds = 
Arrays.stream(in.unpackObjectArrayFromBinaryTuple())
+                .map(String.class::cast)
+                .collect(Collectors.toSet());
+
+        Set<ClusterNode> candidates = new HashSet<>();
+        List<String> notFoundNodes = new ArrayList<>();
+        candidateIds.forEach(id -> {
+            ClusterNode node = cluster.topologyService().getByConsistentId(id);
+            if (node == null) {
+                notFoundNodes.add(id);
+            } else {
+                candidates.add(node);
+            }
+        });
+
+        if (!notFoundNodes.isEmpty()) {
+            throw new IgniteException("Specified nodes are not present in the 
cluster: " + notFoundNodes);
+        }
+        return candidates;
+    }
+
+    /**
+     * Selects a random node from the set of candidates, preferably not a 
local node.

Review Comment:
   Why do we prefer a non-local node?



##########
modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteRequest.java:
##########
@@ -50,29 +56,82 @@ public class ClientComputeExecuteRequest {
     public static CompletableFuture<Void> process(
             ClientMessageUnpacker in,
             ClientMessagePacker out,
-            IgniteCompute compute,
+            IgniteComputeInternal compute,
             ClusterService cluster,
-            NotificationSender notificationSender) {
-        var nodeName = in.tryUnpackNil() ? null : in.unpackString();
-
-        var node = nodeName == null
-                ? cluster.topologyService().localMember()
-                : cluster.topologyService().getByConsistentId(nodeName);
-
-        if (node == null) {
-            throw new IgniteException("Specified node is not present in the 
cluster: " + nodeName);
-        }
+            NotificationSender notificationSender
+    ) {
+        Set<ClusterNode> candidates = unpackCandidateNodes(in, cluster);
 
         List<DeploymentUnit> deploymentUnits = unpackDeploymentUnits(in);
         String jobClassName = in.unpackString();
         JobExecutionOptions options = 
JobExecutionOptions.builder().priority(in.unpackInt()).maxRetries(in.unpackInt()).build();
         Object[] args = unpackArgs(in);
 
-        JobExecution<Object> execution = compute.executeAsync(Set.of(node), 
deploymentUnits, jobClassName, options, args);
+        ClusterNode localNode = cluster.topologyService().localMember();
+        ClusterNode targetNode = selectTargetNode(candidates, localNode);
+        candidates.remove(targetNode);
+
+        JobExecution<Object> execution = 
compute.executeAsyncWithFailover(targetNode, candidates,
+                deploymentUnits, jobClassName, options, args);
         sendResultAndStatus(execution, notificationSender);
         return execution.idAsync().thenAccept(out::packUuid);
     }
 
+    private static Set<ClusterNode> unpackCandidateNodes(ClientMessageUnpacker 
in, ClusterService cluster) {
+        Set<String> candidateIds = 
Arrays.stream(in.unpackObjectArrayFromBinaryTuple())
+                .map(String.class::cast)
+                .collect(Collectors.toSet());
+
+        Set<ClusterNode> candidates = new HashSet<>();
+        List<String> notFoundNodes = new ArrayList<>();
+        candidateIds.forEach(id -> {
+            ClusterNode node = cluster.topologyService().getByConsistentId(id);
+            if (node == null) {
+                notFoundNodes.add(id);
+            } else {
+                candidates.add(node);
+            }
+        });
+
+        if (!notFoundNodes.isEmpty()) {
+            throw new IgniteException("Specified nodes are not present in the 
cluster: " + notFoundNodes);

Review Comment:
   Deprecated constructor



##########
modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteRequest.java:
##########
@@ -50,29 +56,82 @@ public class ClientComputeExecuteRequest {
     public static CompletableFuture<Void> process(
             ClientMessageUnpacker in,
             ClientMessagePacker out,
-            IgniteCompute compute,
+            IgniteComputeInternal compute,
             ClusterService cluster,
-            NotificationSender notificationSender) {
-        var nodeName = in.tryUnpackNil() ? null : in.unpackString();
-
-        var node = nodeName == null
-                ? cluster.topologyService().localMember()
-                : cluster.topologyService().getByConsistentId(nodeName);
-
-        if (node == null) {
-            throw new IgniteException("Specified node is not present in the 
cluster: " + nodeName);
-        }
+            NotificationSender notificationSender
+    ) {
+        Set<ClusterNode> candidates = unpackCandidateNodes(in, cluster);
 
         List<DeploymentUnit> deploymentUnits = unpackDeploymentUnits(in);
         String jobClassName = in.unpackString();
         JobExecutionOptions options = 
JobExecutionOptions.builder().priority(in.unpackInt()).maxRetries(in.unpackInt()).build();
         Object[] args = unpackArgs(in);
 
-        JobExecution<Object> execution = compute.executeAsync(Set.of(node), 
deploymentUnits, jobClassName, options, args);
+        ClusterNode localNode = cluster.topologyService().localMember();
+        ClusterNode targetNode = selectTargetNode(candidates, localNode);
+        candidates.remove(targetNode);
+
+        JobExecution<Object> execution = 
compute.executeAsyncWithFailover(targetNode, candidates,
+                deploymentUnits, jobClassName, options, args);
         sendResultAndStatus(execution, notificationSender);
         return execution.idAsync().thenAccept(out::packUuid);
     }
 
+    private static Set<ClusterNode> unpackCandidateNodes(ClientMessageUnpacker 
in, ClusterService cluster) {
+        Set<String> candidateIds = 
Arrays.stream(in.unpackObjectArrayFromBinaryTuple())
+                .map(String.class::cast)
+                .collect(Collectors.toSet());
+
+        Set<ClusterNode> candidates = new HashSet<>();
+        List<String> notFoundNodes = new ArrayList<>();
+        candidateIds.forEach(id -> {
+            ClusterNode node = cluster.topologyService().getByConsistentId(id);
+            if (node == null) {
+                notFoundNodes.add(id);
+            } else {
+                candidates.add(node);
+            }
+        });
+
+        if (!notFoundNodes.isEmpty()) {
+            throw new IgniteException("Specified nodes are not present in the 
cluster: " + notFoundNodes);
+        }
+        return candidates;

Review Comment:
   ```suggestion
       int size = in.unpackInt();
       List<ClusterNode> nodes = new ArrayList<>(size);
       for (int i = 0; i < size; i++) {
           String nodeName = in.unpackString();
           ClusterNode node = 
cluster.topologyService().getByConsistentId(nodeName);
           
           if (node == null) {
               // Throw on first issue
           }
           
           nodes.Add(node);
       }
   ```
   
   Keep it simple.



##########
modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItThinClientWorkerShutdownTest.java:
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.compute;
+
+import org.apache.ignite.client.IgniteClient;
+import org.apache.ignite.compute.IgniteCompute;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.junit.jupiter.api.AfterEach;
+
+class ItThinClientWorkerShutdownTest extends ItWorkerShutdownTest {
+
+    private IgniteClient client;
+
+    @AfterEach
+    void cleanup() throws Exception {
+        client.close();
+    }
+
+    @Override
+    IgniteCompute compute(IgniteImpl entryNode) {
+        String address = "127.0.0.1:" + entryNode.clientAddress().port();
+        client = IgniteClient.builder().addresses(address).build();

Review Comment:
   1. A new client is started on every call to this method
   2. The clients are never closed, causing extra resource consumption and 
potential issues in other tests
   
   Let's start one client before the test, save it to a field, and close after 
the test.



##########
modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteRequest.java:
##########
@@ -50,29 +56,82 @@ public class ClientComputeExecuteRequest {
     public static CompletableFuture<Void> process(
             ClientMessageUnpacker in,
             ClientMessagePacker out,
-            IgniteCompute compute,
+            IgniteComputeInternal compute,
             ClusterService cluster,
-            NotificationSender notificationSender) {
-        var nodeName = in.tryUnpackNil() ? null : in.unpackString();
-
-        var node = nodeName == null
-                ? cluster.topologyService().localMember()
-                : cluster.topologyService().getByConsistentId(nodeName);
-
-        if (node == null) {
-            throw new IgniteException("Specified node is not present in the 
cluster: " + nodeName);
-        }
+            NotificationSender notificationSender
+    ) {
+        Set<ClusterNode> candidates = unpackCandidateNodes(in, cluster);
 
         List<DeploymentUnit> deploymentUnits = unpackDeploymentUnits(in);
         String jobClassName = in.unpackString();
         JobExecutionOptions options = 
JobExecutionOptions.builder().priority(in.unpackInt()).maxRetries(in.unpackInt()).build();
         Object[] args = unpackArgs(in);
 
-        JobExecution<Object> execution = compute.executeAsync(Set.of(node), 
deploymentUnits, jobClassName, options, args);
+        ClusterNode localNode = cluster.topologyService().localMember();
+        ClusterNode targetNode = selectTargetNode(candidates, localNode);
+        candidates.remove(targetNode);
+
+        JobExecution<Object> execution = 
compute.executeAsyncWithFailover(targetNode, candidates,
+                deploymentUnits, jobClassName, options, args);
         sendResultAndStatus(execution, notificationSender);
         return execution.idAsync().thenAccept(out::packUuid);
     }
 
+    private static Set<ClusterNode> unpackCandidateNodes(ClientMessageUnpacker 
in, ClusterService cluster) {
+        Set<String> candidateIds = 
Arrays.stream(in.unpackObjectArrayFromBinaryTuple())
+                .map(String.class::cast)
+                .collect(Collectors.toSet());
+
+        Set<ClusterNode> candidates = new HashSet<>();
+        List<String> notFoundNodes = new ArrayList<>();
+        candidateIds.forEach(id -> {
+            ClusterNode node = cluster.topologyService().getByConsistentId(id);
+            if (node == null) {
+                notFoundNodes.add(id);
+            } else {
+                candidates.add(node);
+            }
+        });
+
+        if (!notFoundNodes.isEmpty()) {
+            throw new IgniteException("Specified nodes are not present in the 
cluster: " + notFoundNodes);
+        }
+        return candidates;
+    }
+
+    /**
+     * Selects a random node from the set of candidates, preferably not a 
local node.
+     *
+     * @param candidates Set of candidate nodes.
+     * @param localNode Local node.
+     *
+     * @return Target node to run a job on.
+     */
+    private static ClusterNode selectTargetNode(Set<ClusterNode> candidates, 
ClusterNode localNode) {

Review Comment:
   I don't think that node selection logic should be in the client request 
handler. Can we move this logic to the Compute component?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to