rpuch commented on a change in pull request #534:
URL: https://github.com/apache/ignite-3/pull/534#discussion_r780912102



##########
File path: modules/api/src/main/java/org/apache/ignite/compute/ComputeTask.java
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.compute;
+
+import java.util.Collection;
+
+/**
+ * Compute task.

Review comment:
       Probably more javadoc is needed to explain in detail what it is. Is it 
planned to be added later?

##########
File path: 
modules/api/src/main/java/org/apache/ignite/compute/IgniteCompute.java
##########
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.compute;
+
+/**
+ * Interface that provides methods for executing {@link ComputeTask}s.
+ */
+public interface IgniteCompute {
+    /**
+     * Executes compute task.
+     *
+     * @param clazz Task class.
+     * @param argument Argument object.
+     * @param <T> Argument type.
+     * @param <R> Return type.
+     * @return Computation result.
+     */
+    <T, R, F> F execute(Class<? extends ComputeTask<T, R, F>> clazz, T 
argument) throws ComputeException;

Review comment:
       On the `ComputeTask`, the type variables are T, I, R, while here they 
are T, R, F. This could confuse a reader. Is there a reason for naming same 
things differently?

##########
File path: 
modules/compute/src/main/java/org/apache/ignite/compute/internal/IgniteComputeImpl.java
##########
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.compute.internal;
+
+import static java.util.stream.Collectors.toList;
+import static 
org.apache.ignite.compute.internal.util.GenericUtils.getTypeArguments;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Type;
+import java.util.Collection;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.ignite.compute.ComputeException;
+import org.apache.ignite.compute.ComputeTask;
+import org.apache.ignite.compute.IgniteCompute;
+import org.apache.ignite.compute.internal.message.ComputeMessagesFactory;
+import org.apache.ignite.compute.internal.message.ComputeMessagesType;
+import org.apache.ignite.compute.internal.message.ExecuteMessage;
+import org.apache.ignite.compute.internal.message.ResultMessage;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.MessagingService;
+import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.NetworkMessageHandler;
+import org.apache.ignite.network.TopologyEventHandler;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Implementation of the compute service.
+ */
+public class IgniteComputeImpl implements IgniteCompute, IgniteComponent {
+    private final ClusterService clusterService;
+
+    private final ExecutorService computeExecutor = 
Executors.newSingleThreadExecutor();
+
+    private final ComputeMessagesFactory messagesFactory = new 
ComputeMessagesFactory();
+
+    private final ConcurrentMap<UUID, Execution<?>> executions = new 
ConcurrentHashMap<>();
+
+    public IgniteComputeImpl(ClusterService clusterService) {
+        this.clusterService = clusterService;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public <T, R, F> F execute(Class<? extends ComputeTask<T, R, F>> clazz, T 
argument) throws ComputeException {
+        UUID executionId = UUID.randomUUID();
+
+        Execution<R> execution = registerExecution(clazz, argument, 
executionId);
+
+        Collection<ExecutionResult<R>> executionResults = 
execution.getResult();
+
+        executions.remove(executionId);
+
+        ComputeTask<T, R, F> computeTask;
+        try {
+            Constructor<? extends ComputeTask<T, R, F>> declaredConstructor = 
clazz.getDeclaredConstructor();
+            declaredConstructor.setAccessible(true);
+            computeTask = declaredConstructor.newInstance();
+        } catch (InstantiationException | IllegalAccessException | 
NoSuchMethodException | InvocationTargetException e) {
+            throw new ComputeException("Failed to execute reduce: " + 
e.getMessage(), e);
+        }
+
+        return 
computeTask.reduce(executionResults.stream().map(ExecutionResult::result).collect(toList()));

Review comment:
       Is it ok that `reduce()` is executed on the caller thread?

##########
File path: 
modules/api/src/main/java/org/apache/ignite/compute/IgniteCompute.java
##########
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.compute;
+
+/**
+ * Interface that provides methods for executing {@link ComputeTask}s.

Review comment:
       Should we mention 'interface' and 'methods' at all? These are low-level 
technical terms, while we could describe in high-level terma like 'Compute 
functionality allowing to execute tasks'.

##########
File path: 
modules/compute/src/integrationTest/java/org/apache/ignite/compute/internal/ItComputeTest.java
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.compute.internal;
+
+import static java.lang.Thread.sleep;
+import static java.util.stream.Collectors.toList;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BooleanSupplier;
+import org.apache.ignite.compute.ComputeException;
+import org.apache.ignite.compute.ComputeTask;
+import 
org.apache.ignite.compute.internal.message.ComputeMessagesSerializationRegistryInitializer;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.MessageSerializationRegistryImpl;
+import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.StaticNodeFinder;
+import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory;
+import org.apache.ignite.utils.ClusterServiceTestUtils;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+
+/**
+ * Integration tests for {@link IgniteComputeImpl}.
+ */
+public class ItComputeTest {
+    private static CountDownLatch latch1;
+    private static CountDownLatch latch2;
+
+    private static AtomicInteger counter;
+
+    private List<ClusterService> cluster;
+
+    @AfterEach
+    void tearDown() {
+        cluster.forEach(ClusterService::stop);
+
+        cluster = null;
+    }
+
+    @AfterAll
+    static void afterAll() {
+        latch1 = null;
+        latch2 = null;
+        counter = null;
+    }
+
+    private static class Compute implements ComputeTask<Integer, Integer, 
Integer> {

Review comment:
       Should this be renamed to avoid confusion with `IgniteCompute`? Like 
`SimpleTask` or something similar.

##########
File path: modules/api/src/main/java/org/apache/ignite/compute/ComputeTask.java
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.compute;
+
+import java.util.Collection;
+
+/**
+ * Compute task.
+ *
+ * @param <T> Argument type.
+ * @param <I> Intermediate result's type.
+ * @param <R> Reduce result's type.
+ */
+public interface ComputeTask<T, I, R> {
+    /**
+     * Executes the compute task with a given argument.
+     *
+     * @param argument Compute task's argument.
+     * @return Intermediate result of the computation.
+     */
+    I execute(T argument);

Review comment:
       Should we still name it just `execute()`? How about other possibilities, 
like `executeLocally()` or `executePhase1()` (meaning that there is phase 2 
called 'reduce')? I understand that these alternatives seem ugly, but the point 
is that we should probably think carefully about the terminology and naming 
here as the current `execute()` seems a bit too abstract for phase-one 
computation.

##########
File path: 
modules/compute/src/integrationTest/java/org/apache/ignite/compute/internal/ItComputeTest.java
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.compute.internal;
+
+import static java.lang.Thread.sleep;
+import static java.util.stream.Collectors.toList;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BooleanSupplier;
+import org.apache.ignite.compute.ComputeException;
+import org.apache.ignite.compute.ComputeTask;
+import 
org.apache.ignite.compute.internal.message.ComputeMessagesSerializationRegistryInitializer;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.MessageSerializationRegistryImpl;
+import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.StaticNodeFinder;
+import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory;
+import org.apache.ignite.utils.ClusterServiceTestUtils;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+
+/**
+ * Integration tests for {@link IgniteComputeImpl}.
+ */
+public class ItComputeTest {
+    private static CountDownLatch latch1;
+    private static CountDownLatch latch2;
+
+    private static AtomicInteger counter;
+
+    private List<ClusterService> cluster;
+
+    @AfterEach
+    void tearDown() {
+        cluster.forEach(ClusterService::stop);
+
+        cluster = null;
+    }
+
+    @AfterAll
+    static void afterAll() {
+        latch1 = null;
+        latch2 = null;
+        counter = null;
+    }
+
+    private static class Compute implements ComputeTask<Integer, Integer, 
Integer> {
+        @Override
+        public Integer execute(Integer argument) {
+            return 1 + argument;
+        }
+
+        @Override
+        public Integer reduce(Collection<Integer> results) {
+            return results.stream().reduce(Integer::sum).get();
+        }
+    }
+
+    private static class DisconnectingCompute implements ComputeTask<Integer, 
Integer, Integer> {
+        @Override
+        public Integer execute(Integer argument) {
+            latch1.countDown();
+
+            try {
+                latch2.await();
+            } catch (InterruptedException e) {
+                fail();
+            }
+
+            return argument + 1;
+        }
+
+        @Override
+        public Integer reduce(Collection<Integer> results) {
+            return results.stream().reduce(Integer::sum).get();
+        }
+    }
+
+    private static class ExceptionalCompute implements ComputeTask<Integer, 
Integer, Integer> {
+        @Override
+        public Integer execute(Integer argument) {
+            if (counter.getAndIncrement() == 1) {
+                throw new RuntimeException("Fail");
+            }
+
+            return argument + 1;
+        }
+
+        @Override
+        public Integer reduce(Collection<Integer> results) {
+            return results.stream().reduce(Integer::sum).get();
+        }
+    }
+
+    @Test
+    public void testSuccess(TestInfo testInfo) {
+        cluster = createCluster(testInfo, 5);
+
+        IgniteComputeImpl compute = createCompute(cluster);
+
+        Integer execute = compute.execute(Compute.class, 1);
+
+        assertEquals(10, execute);
+    }
+
+    @Test
+    public void testPartialSuccess(TestInfo testInfo) {
+        latch1 = new CountDownLatch(5);
+        latch2 = new CountDownLatch(1);
+
+        cluster = createCluster(testInfo, 5);
+
+        IgniteComputeImpl compute = createCompute(cluster);
+
+        Thread t = new Thread(() -> {
+            try {
+                latch1.await();
+            } catch (InterruptedException e) {
+                fail();
+            }
+
+            ClusterService clusterService = cluster.get(1);
+            clusterService.stop();
+            latch2.countDown();
+        });
+
+        t.start();
+
+        Integer execute = compute.execute(DisconnectingCompute.class, 1);
+
+        assertEquals(8, execute);

Review comment:
       Why is it 8?

##########
File path: 
modules/client/src/main/java/org/apache/ignite/internal/client/TcpIgniteClient.java
##########
@@ -108,6 +109,12 @@ public IgniteTransactions transactions() {
         return transactions;
     }
 
+    /** {@inheritDoc} */
+    @Override
+    public IgniteCompute compute() {
+        throw new UnsupportedOperationException();

Review comment:
       Is it a temporary exception, or there is a reason to not support it at 
all?

##########
File path: 
modules/compute/src/integrationTest/java/org/apache/ignite/compute/internal/ItComputeTest.java
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.compute.internal;
+
+import static java.lang.Thread.sleep;
+import static java.util.stream.Collectors.toList;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BooleanSupplier;
+import org.apache.ignite.compute.ComputeException;
+import org.apache.ignite.compute.ComputeTask;
+import 
org.apache.ignite.compute.internal.message.ComputeMessagesSerializationRegistryInitializer;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.MessageSerializationRegistryImpl;
+import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.StaticNodeFinder;
+import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory;
+import org.apache.ignite.utils.ClusterServiceTestUtils;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+
+/**
+ * Integration tests for {@link IgniteComputeImpl}.
+ */
+public class ItComputeTest {
+    private static CountDownLatch latch1;
+    private static CountDownLatch latch2;
+
+    private static AtomicInteger counter;
+
+    private List<ClusterService> cluster;
+
+    @AfterEach
+    void tearDown() {
+        cluster.forEach(ClusterService::stop);
+
+        cluster = null;
+    }
+
+    @AfterAll
+    static void afterAll() {
+        latch1 = null;
+        latch2 = null;
+        counter = null;
+    }
+
+    private static class Compute implements ComputeTask<Integer, Integer, 
Integer> {
+        @Override
+        public Integer execute(Integer argument) {
+            return 1 + argument;
+        }
+
+        @Override
+        public Integer reduce(Collection<Integer> results) {
+            return results.stream().reduce(Integer::sum).get();
+        }
+    }
+
+    private static class DisconnectingCompute implements ComputeTask<Integer, 
Integer, Integer> {
+        @Override
+        public Integer execute(Integer argument) {
+            latch1.countDown();
+
+            try {
+                latch2.await();
+            } catch (InterruptedException e) {
+                fail();
+            }
+
+            return argument + 1;
+        }
+
+        @Override
+        public Integer reduce(Collection<Integer> results) {
+            return results.stream().reduce(Integer::sum).get();
+        }
+    }
+
+    private static class ExceptionalCompute implements ComputeTask<Integer, 
Integer, Integer> {
+        @Override
+        public Integer execute(Integer argument) {
+            if (counter.getAndIncrement() == 1) {
+                throw new RuntimeException("Fail");
+            }
+
+            return argument + 1;
+        }
+
+        @Override
+        public Integer reduce(Collection<Integer> results) {
+            return results.stream().reduce(Integer::sum).get();
+        }
+    }
+
+    @Test
+    public void testSuccess(TestInfo testInfo) {
+        cluster = createCluster(testInfo, 5);
+
+        IgniteComputeImpl compute = createCompute(cluster);
+
+        Integer execute = compute.execute(Compute.class, 1);

Review comment:
       `execute` -> `result`?

##########
File path: 
modules/compute/src/integrationTest/java/org/apache/ignite/compute/internal/ItComputeTest.java
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.compute.internal;
+
+import static java.lang.Thread.sleep;
+import static java.util.stream.Collectors.toList;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BooleanSupplier;
+import org.apache.ignite.compute.ComputeException;
+import org.apache.ignite.compute.ComputeTask;
+import 
org.apache.ignite.compute.internal.message.ComputeMessagesSerializationRegistryInitializer;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.MessageSerializationRegistryImpl;
+import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.StaticNodeFinder;
+import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory;
+import org.apache.ignite.utils.ClusterServiceTestUtils;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+
+/**
+ * Integration tests for {@link IgniteComputeImpl}.
+ */
+public class ItComputeTest {
+    private static CountDownLatch latch1;
+    private static CountDownLatch latch2;
+
+    private static AtomicInteger counter;
+
+    private List<ClusterService> cluster;
+
+    @AfterEach
+    void tearDown() {
+        cluster.forEach(ClusterService::stop);
+
+        cluster = null;
+    }
+
+    @AfterAll
+    static void afterAll() {
+        latch1 = null;
+        latch2 = null;
+        counter = null;
+    }
+
+    private static class Compute implements ComputeTask<Integer, Integer, 
Integer> {
+        @Override
+        public Integer execute(Integer argument) {
+            return 1 + argument;
+        }
+
+        @Override
+        public Integer reduce(Collection<Integer> results) {
+            return results.stream().reduce(Integer::sum).get();
+        }
+    }
+
+    private static class DisconnectingCompute implements ComputeTask<Integer, 
Integer, Integer> {
+        @Override
+        public Integer execute(Integer argument) {
+            latch1.countDown();
+
+            try {
+                latch2.await();
+            } catch (InterruptedException e) {
+                fail();
+            }
+
+            return argument + 1;
+        }
+
+        @Override
+        public Integer reduce(Collection<Integer> results) {
+            return results.stream().reduce(Integer::sum).get();
+        }
+    }
+
+    private static class ExceptionalCompute implements ComputeTask<Integer, 
Integer, Integer> {
+        @Override
+        public Integer execute(Integer argument) {
+            if (counter.getAndIncrement() == 1) {
+                throw new RuntimeException("Fail");
+            }
+
+            return argument + 1;
+        }
+
+        @Override
+        public Integer reduce(Collection<Integer> results) {
+            return results.stream().reduce(Integer::sum).get();
+        }
+    }
+
+    @Test
+    public void testSuccess(TestInfo testInfo) {
+        cluster = createCluster(testInfo, 5);
+
+        IgniteComputeImpl compute = createCompute(cluster);
+
+        Integer execute = compute.execute(Compute.class, 1);
+
+        assertEquals(10, execute);
+    }
+
+    @Test
+    public void testPartialSuccess(TestInfo testInfo) {
+        latch1 = new CountDownLatch(5);
+        latch2 = new CountDownLatch(1);
+
+        cluster = createCluster(testInfo, 5);
+
+        IgniteComputeImpl compute = createCompute(cluster);
+
+        Thread t = new Thread(() -> {
+            try {
+                latch1.await();
+            } catch (InterruptedException e) {
+                fail();
+            }
+
+            ClusterService clusterService = cluster.get(1);
+            clusterService.stop();
+            latch2.countDown();
+        });
+
+        t.start();
+
+        Integer execute = compute.execute(DisconnectingCompute.class, 1);
+
+        assertEquals(8, execute);
+    }
+
+    @Test
+    public void testException(TestInfo testInfo) {
+        counter = new AtomicInteger();
+
+        cluster = createCluster(testInfo, 5);
+
+        IgniteComputeImpl compute = createCompute(cluster);
+
+        assertThrows(ComputeException.class, () -> 
compute.execute(ExceptionalCompute.class, 1));
+    }
+
+    private IgniteComputeImpl createCompute(List<ClusterService> cluster) {
+        List<IgniteComputeImpl> computes = 
cluster.stream().map(IgniteComputeImpl::new).collect(toList());
+
+        computes.forEach(IgniteComputeImpl::start);
+
+        return computes.get(0);
+    }
+
+    private List<ClusterService> createCluster(TestInfo testInfo, int count) {
+        int port = 3344;
+
+        List<ClusterService> services = new ArrayList<>();
+
+        for (int i = 0; i < count; i++) {
+            ClusterService service = clusterService(testInfo, port + i);
+            service.start();
+
+            services.add(service);
+        }
+
+        for (ClusterService service : services) {
+            assertTrue(waitForTopology(service, count, 1000));
+        }
+
+        return services;
+    }
+
+    private ClusterService clusterService(TestInfo testInfo, int port) {
+        MessageSerializationRegistryImpl registry = new 
MessageSerializationRegistryImpl();
+        
ComputeMessagesSerializationRegistryInitializer.registerFactories(registry);
+
+        return ClusterServiceTestUtils.clusterService(
+            testInfo,
+            port,
+            new StaticNodeFinder(List.of(new NetworkAddress("localhost", 
3344))),
+            new TestScaleCubeClusterServiceFactory(),
+            registry
+        );
+    }
+
+    private static boolean waitForTopology(ClusterService cluster, int 
expected, int timeout) {
+        return waitForCondition(() -> 
cluster.topologyService().allMembers().size() >= expected, timeout);
+    }
+
+    @SuppressWarnings("BusyWait")
+    private static boolean waitForCondition(BooleanSupplier cond, long 
timeout) {

Review comment:
       I suggest to extract it to a utility method so that everyone can use it

##########
File path: 
modules/compute/src/integrationTest/java/org/apache/ignite/compute/internal/ItComputeTest.java
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.compute.internal;
+
+import static java.lang.Thread.sleep;
+import static java.util.stream.Collectors.toList;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BooleanSupplier;
+import org.apache.ignite.compute.ComputeException;
+import org.apache.ignite.compute.ComputeTask;
+import 
org.apache.ignite.compute.internal.message.ComputeMessagesSerializationRegistryInitializer;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.MessageSerializationRegistryImpl;
+import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.StaticNodeFinder;
+import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory;
+import org.apache.ignite.utils.ClusterServiceTestUtils;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+
+/**
+ * Integration tests for {@link IgniteComputeImpl}.
+ */
+public class ItComputeTest {
+    private static CountDownLatch latch1;
+    private static CountDownLatch latch2;
+
+    private static AtomicInteger counter;
+
+    private List<ClusterService> cluster;
+
+    @AfterEach
+    void tearDown() {
+        cluster.forEach(ClusterService::stop);
+
+        cluster = null;
+    }
+
+    @AfterAll
+    static void afterAll() {
+        latch1 = null;
+        latch2 = null;
+        counter = null;
+    }
+
+    private static class Compute implements ComputeTask<Integer, Integer, 
Integer> {
+        @Override
+        public Integer execute(Integer argument) {
+            return 1 + argument;
+        }
+
+        @Override
+        public Integer reduce(Collection<Integer> results) {
+            return results.stream().reduce(Integer::sum).get();
+        }
+    }
+
+    private static class DisconnectingCompute implements ComputeTask<Integer, 
Integer, Integer> {
+        @Override
+        public Integer execute(Integer argument) {
+            latch1.countDown();
+
+            try {
+                latch2.await();
+            } catch (InterruptedException e) {
+                fail();
+            }
+
+            return argument + 1;
+        }
+
+        @Override
+        public Integer reduce(Collection<Integer> results) {
+            return results.stream().reduce(Integer::sum).get();
+        }
+    }
+
+    private static class ExceptionalCompute implements ComputeTask<Integer, 
Integer, Integer> {
+        @Override
+        public Integer execute(Integer argument) {
+            if (counter.getAndIncrement() == 1) {
+                throw new RuntimeException("Fail");
+            }
+
+            return argument + 1;
+        }
+
+        @Override
+        public Integer reduce(Collection<Integer> results) {
+            return results.stream().reduce(Integer::sum).get();
+        }
+    }
+
+    @Test
+    public void testSuccess(TestInfo testInfo) {
+        cluster = createCluster(testInfo, 5);
+
+        IgniteComputeImpl compute = createCompute(cluster);
+
+        Integer execute = compute.execute(Compute.class, 1);
+
+        assertEquals(10, execute);
+    }
+
+    @Test
+    public void testPartialSuccess(TestInfo testInfo) {
+        latch1 = new CountDownLatch(5);
+        latch2 = new CountDownLatch(1);
+
+        cluster = createCluster(testInfo, 5);
+
+        IgniteComputeImpl compute = createCompute(cluster);
+
+        Thread t = new Thread(() -> {
+            try {
+                latch1.await();
+            } catch (InterruptedException e) {
+                fail();
+            }
+
+            ClusterService clusterService = cluster.get(1);
+            clusterService.stop();
+            latch2.countDown();
+        });
+
+        t.start();
+
+        Integer execute = compute.execute(DisconnectingCompute.class, 1);
+
+        assertEquals(8, execute);
+    }
+
+    @Test
+    public void testException(TestInfo testInfo) {
+        counter = new AtomicInteger();
+
+        cluster = createCluster(testInfo, 5);
+
+        IgniteComputeImpl compute = createCompute(cluster);
+
+        assertThrows(ComputeException.class, () -> 
compute.execute(ExceptionalCompute.class, 1));

Review comment:
       Let's also inspect the thrown exception to make sure that it's what was 
thrown by a task (and not something unrelated)

##########
File path: 
modules/compute/src/main/java/org/apache/ignite/compute/internal/Execution.java
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.compute.internal;
+
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import org.apache.ignite.compute.ComputeException;
+import org.apache.ignite.compute.ComputeTask;
+import org.apache.ignite.network.ClusterNode;
+
+/**
+ * {@link ComputeTask}'s execution.
+ *
+ * @param <T> Intermediate result's type.
+ */
+public class Execution<T> {
+    /** Map of the compute task's results. */
+    private final ConcurrentMap<String, ExecutionResult<T>> results = new 
ConcurrentHashMap<>();
+
+    /** Set of the nodes that are required to send back the results. */
+    private final Set<String> requiredNodes;
+
+    /** Future that is resolved when the all of the results are acquired or 
exception has been caught during task's execution. */
+    private final CompletableFuture<Collection<ExecutionResult<T>>> future = 
new CompletableFuture<>();
+
+    public Execution(Collection<String> requiredNodes) {
+        this.requiredNodes = ConcurrentHashMap.newKeySet(requiredNodes.size());
+        this.requiredNodes.addAll(requiredNodes);
+    }
+
+    /**
+     * Waits and returns the result of the execution of the compute task on 
every required node.
+     *
+     * @return Collection of execution results.
+     * @throws ComputeException If execution failed on any node or if the 
thread was interrupted.
+     */
+    Collection<ExecutionResult<T>> getResult() throws ComputeException {
+        try {
+            return future.get();
+        } catch (InterruptedException e) {
+            throw new ComputeException("Failed to wait for compute result: " + 
e.getMessage(), e);
+        } catch (ExecutionException e) {
+            Throwable cause = e.getCause();
+
+            assert cause instanceof ComputeException;
+
+            throw (ComputeException) cause;
+        }
+    }
+
+    /**
+     * Handles receiving of the result from a node.
+     *
+     * @param nodeId Which node sent the result.
+     * @param result Result ({@code null} if task failed on that node.
+     * @param exception Task error ({@code null} if task finished 
successfully).
+     */
+    void onResultReceived(String nodeId, T result, Throwable exception) {
+        if (exception != null) {
+            // We shouldn't always fail the future, different fail-over modes 
should be introduced
+            assert exception instanceof ComputeException;
+
+            future.completeExceptionally(exception);
+            return;
+        }
+
+        requiredNodes.remove(nodeId);
+        results.put(nodeId, new ExecutionResult<>(nodeId, result, exception));

Review comment:
       May a result from same node be received twice?

##########
File path: 
modules/compute/src/main/java/org/apache/ignite/compute/internal/IgniteComputeImpl.java
##########
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.compute.internal;
+
+import static java.util.stream.Collectors.toList;
+import static 
org.apache.ignite.compute.internal.util.GenericUtils.getTypeArguments;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Type;
+import java.util.Collection;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.ignite.compute.ComputeException;
+import org.apache.ignite.compute.ComputeTask;
+import org.apache.ignite.compute.IgniteCompute;
+import org.apache.ignite.compute.internal.message.ComputeMessagesFactory;
+import org.apache.ignite.compute.internal.message.ComputeMessagesType;
+import org.apache.ignite.compute.internal.message.ExecuteMessage;
+import org.apache.ignite.compute.internal.message.ResultMessage;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.MessagingService;
+import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.NetworkMessageHandler;
+import org.apache.ignite.network.TopologyEventHandler;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Implementation of the compute service.
+ */
+public class IgniteComputeImpl implements IgniteCompute, IgniteComponent {
+    private final ClusterService clusterService;
+
+    private final ExecutorService computeExecutor = 
Executors.newSingleThreadExecutor();
+
+    private final ComputeMessagesFactory messagesFactory = new 
ComputeMessagesFactory();
+
+    private final ConcurrentMap<UUID, Execution<?>> executions = new 
ConcurrentHashMap<>();
+
+    public IgniteComputeImpl(ClusterService clusterService) {
+        this.clusterService = clusterService;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public <T, R, F> F execute(Class<? extends ComputeTask<T, R, F>> clazz, T 
argument) throws ComputeException {
+        UUID executionId = UUID.randomUUID();
+
+        Execution<R> execution = registerExecution(clazz, argument, 
executionId);
+
+        Collection<ExecutionResult<R>> executionResults = 
execution.getResult();
+
+        executions.remove(executionId);
+
+        ComputeTask<T, R, F> computeTask;
+        try {
+            Constructor<? extends ComputeTask<T, R, F>> declaredConstructor = 
clazz.getDeclaredConstructor();
+            declaredConstructor.setAccessible(true);
+            computeTask = declaredConstructor.newInstance();
+        } catch (InstantiationException | IllegalAccessException | 
NoSuchMethodException | InvocationTargetException e) {
+            throw new ComputeException("Failed to execute reduce: " + 
e.getMessage(), e);
+        }
+
+        return 
computeTask.reduce(executionResults.stream().map(ExecutionResult::result).collect(toList()));
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void start() {
+        
clusterService.messagingService().addMessageHandler(ComputeMessagesType.class, 
new ComputeNetworkHandler());
+
+        clusterService.topologyService().addEventHandler(new 
TopologyHandler());
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void stop() throws Exception {
+        IgniteUtils.shutdownAndAwaitTermination(computeExecutor, 5, 
TimeUnit.SECONDS);
+    }
+
+    /**
+     * Creates new execution based on a task, an argument and an id.
+     *
+     * @param clazz Task's class.
+     * @param argument Task's argument.
+     * @param executionId Execution id.
+     * @param <T> Argument's type.
+     * @param <R> Intermediate result's type.
+     * @param <F> Result's type.
+     * @return Execution.
+     */
+    @NotNull
+    private <T, R, F> Execution<R> registerExecution(Class<? extends 
ComputeTask<T, R, F>> clazz, T argument, UUID executionId) {
+        Collection<ClusterNode> currentTopology = 
this.clusterService.topologyService().allMembers();
+
+        List<String> nodeIds = 
currentTopology.stream().map(ClusterNode::id).collect(toList());
+
+        var execution = new Execution<R>(nodeIds);
+
+        Execution<?> prev = executions.putIfAbsent(executionId, execution);
+        assert prev == null;
+
+        MessagingService messagingService = 
this.clusterService.messagingService();
+
+        ExecuteMessage executeMessage = messagesFactory.executeMessage()
+                
.executionId(executionId).className(clazz.getName()).argument(argument).build();

Review comment:
       Let's add line breaks just before `.argument(` and `.build()` so that 
the structure becomes more obvious

##########
File path: 
modules/compute/src/main/java/org/apache/ignite/compute/internal/IgniteComputeImpl.java
##########
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.compute.internal;
+
+import static java.util.stream.Collectors.toList;
+import static 
org.apache.ignite.compute.internal.util.GenericUtils.getTypeArguments;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Type;
+import java.util.Collection;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.ignite.compute.ComputeException;
+import org.apache.ignite.compute.ComputeTask;
+import org.apache.ignite.compute.IgniteCompute;
+import org.apache.ignite.compute.internal.message.ComputeMessagesFactory;
+import org.apache.ignite.compute.internal.message.ComputeMessagesType;
+import org.apache.ignite.compute.internal.message.ExecuteMessage;
+import org.apache.ignite.compute.internal.message.ResultMessage;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.MessagingService;
+import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.NetworkMessageHandler;
+import org.apache.ignite.network.TopologyEventHandler;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Implementation of the compute service.
+ */
+public class IgniteComputeImpl implements IgniteCompute, IgniteComponent {
+    private final ClusterService clusterService;
+
+    private final ExecutorService computeExecutor = 
Executors.newSingleThreadExecutor();
+
+    private final ComputeMessagesFactory messagesFactory = new 
ComputeMessagesFactory();
+
+    private final ConcurrentMap<UUID, Execution<?>> executions = new 
ConcurrentHashMap<>();
+
+    public IgniteComputeImpl(ClusterService clusterService) {
+        this.clusterService = clusterService;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public <T, R, F> F execute(Class<? extends ComputeTask<T, R, F>> clazz, T 
argument) throws ComputeException {
+        UUID executionId = UUID.randomUUID();
+
+        Execution<R> execution = registerExecution(clazz, argument, 
executionId);
+
+        Collection<ExecutionResult<R>> executionResults = 
execution.getResult();
+
+        executions.remove(executionId);
+
+        ComputeTask<T, R, F> computeTask;

Review comment:
       Let's extract task instantiation in a method to avoid cluttering the 
code with low-level details.

##########
File path: 
modules/compute/src/main/java/org/apache/ignite/compute/internal/IgniteComputeImpl.java
##########
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.compute.internal;
+
+import static java.util.stream.Collectors.toList;
+import static 
org.apache.ignite.compute.internal.util.GenericUtils.getTypeArguments;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Type;
+import java.util.Collection;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.ignite.compute.ComputeException;
+import org.apache.ignite.compute.ComputeTask;
+import org.apache.ignite.compute.IgniteCompute;
+import org.apache.ignite.compute.internal.message.ComputeMessagesFactory;
+import org.apache.ignite.compute.internal.message.ComputeMessagesType;
+import org.apache.ignite.compute.internal.message.ExecuteMessage;
+import org.apache.ignite.compute.internal.message.ResultMessage;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.MessagingService;
+import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.NetworkMessageHandler;
+import org.apache.ignite.network.TopologyEventHandler;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Implementation of the compute service.
+ */
+public class IgniteComputeImpl implements IgniteCompute, IgniteComponent {
+    private final ClusterService clusterService;
+
+    private final ExecutorService computeExecutor = 
Executors.newSingleThreadExecutor();
+
+    private final ComputeMessagesFactory messagesFactory = new 
ComputeMessagesFactory();
+
+    private final ConcurrentMap<UUID, Execution<?>> executions = new 
ConcurrentHashMap<>();
+
+    public IgniteComputeImpl(ClusterService clusterService) {
+        this.clusterService = clusterService;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public <T, R, F> F execute(Class<? extends ComputeTask<T, R, F>> clazz, T 
argument) throws ComputeException {
+        UUID executionId = UUID.randomUUID();
+
+        Execution<R> execution = registerExecution(clazz, argument, 
executionId);
+
+        Collection<ExecutionResult<R>> executionResults = 
execution.getResult();
+
+        executions.remove(executionId);
+
+        ComputeTask<T, R, F> computeTask;
+        try {
+            Constructor<? extends ComputeTask<T, R, F>> declaredConstructor = 
clazz.getDeclaredConstructor();
+            declaredConstructor.setAccessible(true);
+            computeTask = declaredConstructor.newInstance();
+        } catch (InstantiationException | IllegalAccessException | 
NoSuchMethodException | InvocationTargetException e) {
+            throw new ComputeException("Failed to execute reduce: " + 
e.getMessage(), e);
+        }
+
+        return 
computeTask.reduce(executionResults.stream().map(ExecutionResult::result).collect(toList()));
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void start() {
+        
clusterService.messagingService().addMessageHandler(ComputeMessagesType.class, 
new ComputeNetworkHandler());
+
+        clusterService.topologyService().addEventHandler(new 
TopologyHandler());
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void stop() throws Exception {
+        IgniteUtils.shutdownAndAwaitTermination(computeExecutor, 5, 
TimeUnit.SECONDS);
+    }
+
+    /**
+     * Creates new execution based on a task, an argument and an id.
+     *
+     * @param clazz Task's class.
+     * @param argument Task's argument.
+     * @param executionId Execution id.
+     * @param <T> Argument's type.
+     * @param <R> Intermediate result's type.
+     * @param <F> Result's type.
+     * @return Execution.
+     */
+    @NotNull
+    private <T, R, F> Execution<R> registerExecution(Class<? extends 
ComputeTask<T, R, F>> clazz, T argument, UUID executionId) {
+        Collection<ClusterNode> currentTopology = 
this.clusterService.topologyService().allMembers();
+
+        List<String> nodeIds = 
currentTopology.stream().map(ClusterNode::id).collect(toList());
+
+        var execution = new Execution<R>(nodeIds);
+
+        Execution<?> prev = executions.putIfAbsent(executionId, execution);
+        assert prev == null;
+
+        MessagingService messagingService = 
this.clusterService.messagingService();
+
+        ExecuteMessage executeMessage = messagesFactory.executeMessage()
+                
.executionId(executionId).className(clazz.getName()).argument(argument).build();
+
+        currentTopology.forEach(clusterNode -> {
+            try {
+                messagingService.send(clusterNode, executeMessage).get(3, 
TimeUnit.SECONDS);
+            } catch (InterruptedException | ExecutionException | 
TimeoutException e) {

Review comment:
       What if a different exception is thrown here? Will the execution get 
stuck forever?

##########
File path: 
modules/compute/src/main/java/org/apache/ignite/compute/internal/Execution.java
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.compute.internal;
+
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import org.apache.ignite.compute.ComputeException;
+import org.apache.ignite.compute.ComputeTask;
+import org.apache.ignite.network.ClusterNode;
+
+/**
+ * {@link ComputeTask}'s execution.
+ *
+ * @param <T> Intermediate result's type.
+ */
+public class Execution<T> {
+    /** Map of the compute task's results. */
+    private final ConcurrentMap<String, ExecutionResult<T>> results = new 
ConcurrentHashMap<>();
+
+    /** Set of the nodes that are required to send back the results. */
+    private final Set<String> requiredNodes;
+
+    /** Future that is resolved when the all of the results are acquired or 
exception has been caught during task's execution. */

Review comment:
       'the all' -> 'all'

##########
File path: 
modules/compute/src/main/java/org/apache/ignite/compute/internal/Execution.java
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.compute.internal;
+
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import org.apache.ignite.compute.ComputeException;
+import org.apache.ignite.compute.ComputeTask;
+import org.apache.ignite.network.ClusterNode;
+
+/**
+ * {@link ComputeTask}'s execution.
+ *
+ * @param <T> Intermediate result's type.
+ */
+public class Execution<T> {
+    /** Map of the compute task's results. */
+    private final ConcurrentMap<String, ExecutionResult<T>> results = new 
ConcurrentHashMap<>();
+
+    /** Set of the nodes that are required to send back the results. */
+    private final Set<String> requiredNodes;
+
+    /** Future that is resolved when the all of the results are acquired or 
exception has been caught during task's execution. */
+    private final CompletableFuture<Collection<ExecutionResult<T>>> future = 
new CompletableFuture<>();
+
+    public Execution(Collection<String> requiredNodes) {
+        this.requiredNodes = ConcurrentHashMap.newKeySet(requiredNodes.size());
+        this.requiredNodes.addAll(requiredNodes);
+    }
+
+    /**
+     * Waits and returns the result of the execution of the compute task on 
every required node.
+     *
+     * @return Collection of execution results.
+     * @throws ComputeException If execution failed on any node or if the 
thread was interrupted.
+     */
+    Collection<ExecutionResult<T>> getResult() throws ComputeException {
+        try {
+            return future.get();
+        } catch (InterruptedException e) {
+            throw new ComputeException("Failed to wait for compute result: " + 
e.getMessage(), e);
+        } catch (ExecutionException e) {
+            Throwable cause = e.getCause();
+
+            assert cause instanceof ComputeException;
+
+            throw (ComputeException) cause;
+        }
+    }
+
+    /**
+     * Handles receiving of the result from a node.
+     *
+     * @param nodeId Which node sent the result.
+     * @param result Result ({@code null} if task failed on that node.
+     * @param exception Task error ({@code null} if task finished 
successfully).
+     */
+    void onResultReceived(String nodeId, T result, Throwable exception) {
+        if (exception != null) {
+            // We shouldn't always fail the future, different fail-over modes 
should be introduced
+            assert exception instanceof ComputeException;
+
+            future.completeExceptionally(exception);
+            return;
+        }
+
+        requiredNodes.remove(nodeId);
+        results.put(nodeId, new ExecutionResult<>(nodeId, result, exception));

Review comment:
       Also, why do we need an exception inside `ExecutionResult` if we always 
complete exceptionally on an exception? Is this needed for future when we have 
those different fail-over modes?

##########
File path: 
modules/compute/src/test/java/org/apache/ignite/compute/internal/IgniteComputeImplTest.java
##########
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.compute.internal;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.Mockito.mock;
+
+import java.util.Collection;
+import java.util.stream.Stream;
+import org.apache.ignite.compute.ComputeException;
+import org.apache.ignite.compute.ComputeTask;
+import org.apache.ignite.network.ClusterService;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+/**
+ * Unit tests for compute implementation.
+ */
+class IgniteComputeImplTest {

Review comment:
       These tests actually test `execute0()` method, so only the bare 
execution logic is tested, without network/coordination/pools. It probably 
makes sense to note this in comments and class names, like 
`IgniteComputeImplBareExecutionTest`.

##########
File path: 
modules/compute/src/main/java/org/apache/ignite/compute/internal/IgniteComputeImpl.java
##########
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.compute.internal;
+
+import static java.util.stream.Collectors.toList;
+import static 
org.apache.ignite.compute.internal.util.GenericUtils.getTypeArguments;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Type;
+import java.util.Collection;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.ignite.compute.ComputeException;
+import org.apache.ignite.compute.ComputeTask;
+import org.apache.ignite.compute.IgniteCompute;
+import org.apache.ignite.compute.internal.message.ComputeMessagesFactory;
+import org.apache.ignite.compute.internal.message.ComputeMessagesType;
+import org.apache.ignite.compute.internal.message.ExecuteMessage;
+import org.apache.ignite.compute.internal.message.ResultMessage;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.MessagingService;
+import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.NetworkMessageHandler;
+import org.apache.ignite.network.TopologyEventHandler;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Implementation of the compute service.
+ */
+public class IgniteComputeImpl implements IgniteCompute, IgniteComponent {
+    private final ClusterService clusterService;
+
+    private final ExecutorService computeExecutor = 
Executors.newSingleThreadExecutor();
+
+    private final ComputeMessagesFactory messagesFactory = new 
ComputeMessagesFactory();
+
+    private final ConcurrentMap<UUID, Execution<?>> executions = new 
ConcurrentHashMap<>();
+
+    public IgniteComputeImpl(ClusterService clusterService) {
+        this.clusterService = clusterService;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public <T, R, F> F execute(Class<? extends ComputeTask<T, R, F>> clazz, T 
argument) throws ComputeException {
+        UUID executionId = UUID.randomUUID();
+
+        Execution<R> execution = registerExecution(clazz, argument, 
executionId);
+
+        Collection<ExecutionResult<R>> executionResults = 
execution.getResult();
+
+        executions.remove(executionId);
+
+        ComputeTask<T, R, F> computeTask;
+        try {
+            Constructor<? extends ComputeTask<T, R, F>> declaredConstructor = 
clazz.getDeclaredConstructor();
+            declaredConstructor.setAccessible(true);
+            computeTask = declaredConstructor.newInstance();
+        } catch (InstantiationException | IllegalAccessException | 
NoSuchMethodException | InvocationTargetException e) {
+            throw new ComputeException("Failed to execute reduce: " + 
e.getMessage(), e);
+        }
+
+        return 
computeTask.reduce(executionResults.stream().map(ExecutionResult::result).collect(toList()));
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void start() {
+        
clusterService.messagingService().addMessageHandler(ComputeMessagesType.class, 
new ComputeNetworkHandler());
+
+        clusterService.topologyService().addEventHandler(new 
TopologyHandler());
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void stop() throws Exception {
+        IgniteUtils.shutdownAndAwaitTermination(computeExecutor, 5, 
TimeUnit.SECONDS);
+    }
+
+    /**
+     * Creates new execution based on a task, an argument and an id.
+     *
+     * @param clazz Task's class.
+     * @param argument Task's argument.
+     * @param executionId Execution id.
+     * @param <T> Argument's type.
+     * @param <R> Intermediate result's type.
+     * @param <F> Result's type.
+     * @return Execution.
+     */
+    @NotNull
+    private <T, R, F> Execution<R> registerExecution(Class<? extends 
ComputeTask<T, R, F>> clazz, T argument, UUID executionId) {
+        Collection<ClusterNode> currentTopology = 
this.clusterService.topologyService().allMembers();
+
+        List<String> nodeIds = 
currentTopology.stream().map(ClusterNode::id).collect(toList());
+
+        var execution = new Execution<R>(nodeIds);
+
+        Execution<?> prev = executions.putIfAbsent(executionId, execution);
+        assert prev == null;
+
+        MessagingService messagingService = 
this.clusterService.messagingService();
+
+        ExecuteMessage executeMessage = messagesFactory.executeMessage()
+                
.executionId(executionId).className(clazz.getName()).argument(argument).build();
+
+        currentTopology.forEach(clusterNode -> {
+            try {
+                messagingService.send(clusterNode, executeMessage).get(3, 
TimeUnit.SECONDS);
+            } catch (InterruptedException | ExecutionException | 
TimeoutException e) {
+                executions.remove(executionId);
+
+                throw new ComputeException("Failed to send compute message: " 
+ e.getMessage(), e);
+            }
+        });
+        return execution;
+    }
+
+    /**
+     * Notifies execution's of node disappearances.
+     */
+    private class TopologyHandler implements TopologyEventHandler {
+        @Override
+        public void onAppeared(ClusterNode member) {
+            // No-op.
+        }
+
+        @Override
+        public void onDisappeared(ClusterNode member) {
+            executions.values().forEach(execution -> 
execution.notifyNodeDisappeared(member));
+        }
+    }
+
+    /** Handles execute commands and execution results. */
+    private class ComputeNetworkHandler implements NetworkMessageHandler {
+        /** {@inheritDoc} */
+        @Override
+        public void onReceived(NetworkMessage message, NetworkAddress 
senderAddr, String correlationId) {
+            short messageType = message.messageType();
+
+            switch (messageType) {
+                case ComputeMessagesType.EXECUTE_MESSAGE:
+                    assert message instanceof ExecuteMessage;
+
+                    handleExecuteCommand((ExecuteMessage) message, senderAddr);
+                    break;
+
+                case ComputeMessagesType.RESULT_MESSAGE:
+                    assert message instanceof ResultMessage;
+
+                    handleResult((ResultMessage) message, senderAddr);
+                    break;
+
+                default:
+                    throw new IllegalArgumentException("Unknown message type: 
" + messageType);
+            }
+        }
+    }
+
+    /**
+     * Handles an intermediate result sent by a node.
+     *
+     * @param message Message with the intermediate result.
+     * @param senderAddr Node's address.
+     */
+    private void handleResult(ResultMessage message, NetworkAddress 
senderAddr) {
+        UUID executionId = message.executionId();
+
+        @SuppressWarnings("rawtypes") Execution execution = 
executions.get(executionId);
+
+        ClusterNode clusterNode = 
clusterService.topologyService().getByAddress(senderAddr);
+
+        if (clusterNode == null) {
+            // Node left the topology, this will be handled by the 
TopologyHandler
+            return;
+        }
+
+        //noinspection unchecked
+        execution.onResultReceived(clusterNode.id(), message.result(), 
message.exception());
+    }
+
+    /**
+     * Handles the execution command sent by a node.
+     *
+     * @param msg Message with the execution command.
+     * @param senderAddr Node's address.
+     */
+    private void handleExecuteCommand(ExecuteMessage msg, NetworkAddress 
senderAddr) {
+        String className = msg.className();
+        Object argument = msg.argument();
+
+        executeOnExecutor(className, argument).whenComplete((result, 
throwable) -> {
+            if (throwable != null) {
+                throwable = throwable.getCause();
+
+                assert throwable instanceof ComputeException;
+            }
+
+            ResultMessage resultMessage = messagesFactory.resultMessage()
+                    
.executionId(msg.executionId()).result(result).exception(throwable).build();
+
+            this.clusterService.messagingService().send(senderAddr, 
resultMessage, null);
+        });
+    }
+
+    private CompletableFuture<Object> executeOnExecutor(@NotNull String 
className, @NotNull Object argument) {

Review comment:
       Why is `argument` not null? This does not seem to be checked from 
`IgniteCompute#execute()` till this point.

##########
File path: 
modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/MessageReaderMethodResolver.java
##########
@@ -59,14 +59,14 @@ CodeBlock resolveReadMethod(ExecutableElement getter) {
 
         String parameterName = getter.getSimpleName().toString();
 
-        String methodName = 
methodNameResolver.resolveBaseMethodName(parameterType);
-
         if (getter.getAnnotation(Marshallable.class) != null) {
             return CodeBlock.builder()
-                    .add("readMarshallable($S)", parameterName)

Review comment:
       Aren't the line continuations equal to 2 indents (i.e. 8 spaces)?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to