rpuch commented on a change in pull request #534: URL: https://github.com/apache/ignite-3/pull/534#discussion_r780912102
########## File path: modules/api/src/main/java/org/apache/ignite/compute/ComputeTask.java ########## @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.compute; + +import java.util.Collection; + +/** + * Compute task. Review comment: Probably more javadoc is needed to explain in detail what it is. Is it planned to be added later? ########## File path: modules/api/src/main/java/org/apache/ignite/compute/IgniteCompute.java ########## @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.compute; + +/** + * Interface that provides methods for executing {@link ComputeTask}s. + */ +public interface IgniteCompute { + /** + * Executes compute task. + * + * @param clazz Task class. + * @param argument Argument object. + * @param <T> Argument type. + * @param <R> Return type. + * @return Computation result. + */ + <T, R, F> F execute(Class<? extends ComputeTask<T, R, F>> clazz, T argument) throws ComputeException; Review comment: On the `ComputeTask`, the type variables are T, I, R, while here they are T, R, F. This could confuse a reader. Is there a reason for naming same things differently? ########## File path: modules/compute/src/main/java/org/apache/ignite/compute/internal/IgniteComputeImpl.java ########## @@ -0,0 +1,302 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.compute.internal; + +import static java.util.stream.Collectors.toList; +import static org.apache.ignite.compute.internal.util.GenericUtils.getTypeArguments; + +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Type; +import java.util.Collection; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import org.apache.ignite.compute.ComputeException; +import org.apache.ignite.compute.ComputeTask; +import org.apache.ignite.compute.IgniteCompute; +import org.apache.ignite.compute.internal.message.ComputeMessagesFactory; +import org.apache.ignite.compute.internal.message.ComputeMessagesType; +import org.apache.ignite.compute.internal.message.ExecuteMessage; +import org.apache.ignite.compute.internal.message.ResultMessage; +import org.apache.ignite.internal.manager.IgniteComponent; +import org.apache.ignite.internal.util.IgniteUtils; +import org.apache.ignite.network.ClusterNode; +import org.apache.ignite.network.ClusterService; +import org.apache.ignite.network.MessagingService; +import org.apache.ignite.network.NetworkAddress; +import org.apache.ignite.network.NetworkMessage; +import org.apache.ignite.network.NetworkMessageHandler; +import org.apache.ignite.network.TopologyEventHandler; +import org.jetbrains.annotations.NotNull; + +/** + * Implementation of the compute service. + */ +public class IgniteComputeImpl implements IgniteCompute, IgniteComponent { + private final ClusterService clusterService; + + private final ExecutorService computeExecutor = Executors.newSingleThreadExecutor(); + + private final ComputeMessagesFactory messagesFactory = new ComputeMessagesFactory(); + + private final ConcurrentMap<UUID, Execution<?>> executions = new ConcurrentHashMap<>(); + + public IgniteComputeImpl(ClusterService clusterService) { + this.clusterService = clusterService; + } + + /** {@inheritDoc} */ + @Override + public <T, R, F> F execute(Class<? extends ComputeTask<T, R, F>> clazz, T argument) throws ComputeException { + UUID executionId = UUID.randomUUID(); + + Execution<R> execution = registerExecution(clazz, argument, executionId); + + Collection<ExecutionResult<R>> executionResults = execution.getResult(); + + executions.remove(executionId); + + ComputeTask<T, R, F> computeTask; + try { + Constructor<? extends ComputeTask<T, R, F>> declaredConstructor = clazz.getDeclaredConstructor(); + declaredConstructor.setAccessible(true); + computeTask = declaredConstructor.newInstance(); + } catch (InstantiationException | IllegalAccessException | NoSuchMethodException | InvocationTargetException e) { + throw new ComputeException("Failed to execute reduce: " + e.getMessage(), e); + } + + return computeTask.reduce(executionResults.stream().map(ExecutionResult::result).collect(toList())); Review comment: Is it ok that `reduce()` is executed on the caller thread? ########## File path: modules/api/src/main/java/org/apache/ignite/compute/IgniteCompute.java ########## @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.compute; + +/** + * Interface that provides methods for executing {@link ComputeTask}s. Review comment: Should we mention 'interface' and 'methods' at all? These are low-level technical terms, while we could describe in high-level terma like 'Compute functionality allowing to execute tasks'. ########## File path: modules/compute/src/integrationTest/java/org/apache/ignite/compute/internal/ItComputeTest.java ########## @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.compute.internal; + +import static java.lang.Thread.sleep; +import static java.util.stream.Collectors.toList; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BooleanSupplier; +import org.apache.ignite.compute.ComputeException; +import org.apache.ignite.compute.ComputeTask; +import org.apache.ignite.compute.internal.message.ComputeMessagesSerializationRegistryInitializer; +import org.apache.ignite.network.ClusterService; +import org.apache.ignite.network.MessageSerializationRegistryImpl; +import org.apache.ignite.network.NetworkAddress; +import org.apache.ignite.network.StaticNodeFinder; +import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory; +import org.apache.ignite.utils.ClusterServiceTestUtils; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; + +/** + * Integration tests for {@link IgniteComputeImpl}. + */ +public class ItComputeTest { + private static CountDownLatch latch1; + private static CountDownLatch latch2; + + private static AtomicInteger counter; + + private List<ClusterService> cluster; + + @AfterEach + void tearDown() { + cluster.forEach(ClusterService::stop); + + cluster = null; + } + + @AfterAll + static void afterAll() { + latch1 = null; + latch2 = null; + counter = null; + } + + private static class Compute implements ComputeTask<Integer, Integer, Integer> { Review comment: Should this be renamed to avoid confusion with `IgniteCompute`? Like `SimpleTask` or something similar. ########## File path: modules/api/src/main/java/org/apache/ignite/compute/ComputeTask.java ########## @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.compute; + +import java.util.Collection; + +/** + * Compute task. + * + * @param <T> Argument type. + * @param <I> Intermediate result's type. + * @param <R> Reduce result's type. + */ +public interface ComputeTask<T, I, R> { + /** + * Executes the compute task with a given argument. + * + * @param argument Compute task's argument. + * @return Intermediate result of the computation. + */ + I execute(T argument); Review comment: Should we still name it just `execute()`? How about other possibilities, like `executeLocally()` or `executePhase1()` (meaning that there is phase 2 called 'reduce')? I understand that these alternatives seem ugly, but the point is that we should probably think carefully about the terminology and naming here as the current `execute()` seems a bit too abstract for phase-one computation. ########## File path: modules/compute/src/integrationTest/java/org/apache/ignite/compute/internal/ItComputeTest.java ########## @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.compute.internal; + +import static java.lang.Thread.sleep; +import static java.util.stream.Collectors.toList; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BooleanSupplier; +import org.apache.ignite.compute.ComputeException; +import org.apache.ignite.compute.ComputeTask; +import org.apache.ignite.compute.internal.message.ComputeMessagesSerializationRegistryInitializer; +import org.apache.ignite.network.ClusterService; +import org.apache.ignite.network.MessageSerializationRegistryImpl; +import org.apache.ignite.network.NetworkAddress; +import org.apache.ignite.network.StaticNodeFinder; +import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory; +import org.apache.ignite.utils.ClusterServiceTestUtils; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; + +/** + * Integration tests for {@link IgniteComputeImpl}. + */ +public class ItComputeTest { + private static CountDownLatch latch1; + private static CountDownLatch latch2; + + private static AtomicInteger counter; + + private List<ClusterService> cluster; + + @AfterEach + void tearDown() { + cluster.forEach(ClusterService::stop); + + cluster = null; + } + + @AfterAll + static void afterAll() { + latch1 = null; + latch2 = null; + counter = null; + } + + private static class Compute implements ComputeTask<Integer, Integer, Integer> { + @Override + public Integer execute(Integer argument) { + return 1 + argument; + } + + @Override + public Integer reduce(Collection<Integer> results) { + return results.stream().reduce(Integer::sum).get(); + } + } + + private static class DisconnectingCompute implements ComputeTask<Integer, Integer, Integer> { + @Override + public Integer execute(Integer argument) { + latch1.countDown(); + + try { + latch2.await(); + } catch (InterruptedException e) { + fail(); + } + + return argument + 1; + } + + @Override + public Integer reduce(Collection<Integer> results) { + return results.stream().reduce(Integer::sum).get(); + } + } + + private static class ExceptionalCompute implements ComputeTask<Integer, Integer, Integer> { + @Override + public Integer execute(Integer argument) { + if (counter.getAndIncrement() == 1) { + throw new RuntimeException("Fail"); + } + + return argument + 1; + } + + @Override + public Integer reduce(Collection<Integer> results) { + return results.stream().reduce(Integer::sum).get(); + } + } + + @Test + public void testSuccess(TestInfo testInfo) { + cluster = createCluster(testInfo, 5); + + IgniteComputeImpl compute = createCompute(cluster); + + Integer execute = compute.execute(Compute.class, 1); + + assertEquals(10, execute); + } + + @Test + public void testPartialSuccess(TestInfo testInfo) { + latch1 = new CountDownLatch(5); + latch2 = new CountDownLatch(1); + + cluster = createCluster(testInfo, 5); + + IgniteComputeImpl compute = createCompute(cluster); + + Thread t = new Thread(() -> { + try { + latch1.await(); + } catch (InterruptedException e) { + fail(); + } + + ClusterService clusterService = cluster.get(1); + clusterService.stop(); + latch2.countDown(); + }); + + t.start(); + + Integer execute = compute.execute(DisconnectingCompute.class, 1); + + assertEquals(8, execute); Review comment: Why is it 8? ########## File path: modules/client/src/main/java/org/apache/ignite/internal/client/TcpIgniteClient.java ########## @@ -108,6 +109,12 @@ public IgniteTransactions transactions() { return transactions; } + /** {@inheritDoc} */ + @Override + public IgniteCompute compute() { + throw new UnsupportedOperationException(); Review comment: Is it a temporary exception, or there is a reason to not support it at all? ########## File path: modules/compute/src/integrationTest/java/org/apache/ignite/compute/internal/ItComputeTest.java ########## @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.compute.internal; + +import static java.lang.Thread.sleep; +import static java.util.stream.Collectors.toList; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BooleanSupplier; +import org.apache.ignite.compute.ComputeException; +import org.apache.ignite.compute.ComputeTask; +import org.apache.ignite.compute.internal.message.ComputeMessagesSerializationRegistryInitializer; +import org.apache.ignite.network.ClusterService; +import org.apache.ignite.network.MessageSerializationRegistryImpl; +import org.apache.ignite.network.NetworkAddress; +import org.apache.ignite.network.StaticNodeFinder; +import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory; +import org.apache.ignite.utils.ClusterServiceTestUtils; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; + +/** + * Integration tests for {@link IgniteComputeImpl}. + */ +public class ItComputeTest { + private static CountDownLatch latch1; + private static CountDownLatch latch2; + + private static AtomicInteger counter; + + private List<ClusterService> cluster; + + @AfterEach + void tearDown() { + cluster.forEach(ClusterService::stop); + + cluster = null; + } + + @AfterAll + static void afterAll() { + latch1 = null; + latch2 = null; + counter = null; + } + + private static class Compute implements ComputeTask<Integer, Integer, Integer> { + @Override + public Integer execute(Integer argument) { + return 1 + argument; + } + + @Override + public Integer reduce(Collection<Integer> results) { + return results.stream().reduce(Integer::sum).get(); + } + } + + private static class DisconnectingCompute implements ComputeTask<Integer, Integer, Integer> { + @Override + public Integer execute(Integer argument) { + latch1.countDown(); + + try { + latch2.await(); + } catch (InterruptedException e) { + fail(); + } + + return argument + 1; + } + + @Override + public Integer reduce(Collection<Integer> results) { + return results.stream().reduce(Integer::sum).get(); + } + } + + private static class ExceptionalCompute implements ComputeTask<Integer, Integer, Integer> { + @Override + public Integer execute(Integer argument) { + if (counter.getAndIncrement() == 1) { + throw new RuntimeException("Fail"); + } + + return argument + 1; + } + + @Override + public Integer reduce(Collection<Integer> results) { + return results.stream().reduce(Integer::sum).get(); + } + } + + @Test + public void testSuccess(TestInfo testInfo) { + cluster = createCluster(testInfo, 5); + + IgniteComputeImpl compute = createCompute(cluster); + + Integer execute = compute.execute(Compute.class, 1); Review comment: `execute` -> `result`? ########## File path: modules/compute/src/integrationTest/java/org/apache/ignite/compute/internal/ItComputeTest.java ########## @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.compute.internal; + +import static java.lang.Thread.sleep; +import static java.util.stream.Collectors.toList; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BooleanSupplier; +import org.apache.ignite.compute.ComputeException; +import org.apache.ignite.compute.ComputeTask; +import org.apache.ignite.compute.internal.message.ComputeMessagesSerializationRegistryInitializer; +import org.apache.ignite.network.ClusterService; +import org.apache.ignite.network.MessageSerializationRegistryImpl; +import org.apache.ignite.network.NetworkAddress; +import org.apache.ignite.network.StaticNodeFinder; +import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory; +import org.apache.ignite.utils.ClusterServiceTestUtils; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; + +/** + * Integration tests for {@link IgniteComputeImpl}. + */ +public class ItComputeTest { + private static CountDownLatch latch1; + private static CountDownLatch latch2; + + private static AtomicInteger counter; + + private List<ClusterService> cluster; + + @AfterEach + void tearDown() { + cluster.forEach(ClusterService::stop); + + cluster = null; + } + + @AfterAll + static void afterAll() { + latch1 = null; + latch2 = null; + counter = null; + } + + private static class Compute implements ComputeTask<Integer, Integer, Integer> { + @Override + public Integer execute(Integer argument) { + return 1 + argument; + } + + @Override + public Integer reduce(Collection<Integer> results) { + return results.stream().reduce(Integer::sum).get(); + } + } + + private static class DisconnectingCompute implements ComputeTask<Integer, Integer, Integer> { + @Override + public Integer execute(Integer argument) { + latch1.countDown(); + + try { + latch2.await(); + } catch (InterruptedException e) { + fail(); + } + + return argument + 1; + } + + @Override + public Integer reduce(Collection<Integer> results) { + return results.stream().reduce(Integer::sum).get(); + } + } + + private static class ExceptionalCompute implements ComputeTask<Integer, Integer, Integer> { + @Override + public Integer execute(Integer argument) { + if (counter.getAndIncrement() == 1) { + throw new RuntimeException("Fail"); + } + + return argument + 1; + } + + @Override + public Integer reduce(Collection<Integer> results) { + return results.stream().reduce(Integer::sum).get(); + } + } + + @Test + public void testSuccess(TestInfo testInfo) { + cluster = createCluster(testInfo, 5); + + IgniteComputeImpl compute = createCompute(cluster); + + Integer execute = compute.execute(Compute.class, 1); + + assertEquals(10, execute); + } + + @Test + public void testPartialSuccess(TestInfo testInfo) { + latch1 = new CountDownLatch(5); + latch2 = new CountDownLatch(1); + + cluster = createCluster(testInfo, 5); + + IgniteComputeImpl compute = createCompute(cluster); + + Thread t = new Thread(() -> { + try { + latch1.await(); + } catch (InterruptedException e) { + fail(); + } + + ClusterService clusterService = cluster.get(1); + clusterService.stop(); + latch2.countDown(); + }); + + t.start(); + + Integer execute = compute.execute(DisconnectingCompute.class, 1); + + assertEquals(8, execute); + } + + @Test + public void testException(TestInfo testInfo) { + counter = new AtomicInteger(); + + cluster = createCluster(testInfo, 5); + + IgniteComputeImpl compute = createCompute(cluster); + + assertThrows(ComputeException.class, () -> compute.execute(ExceptionalCompute.class, 1)); + } + + private IgniteComputeImpl createCompute(List<ClusterService> cluster) { + List<IgniteComputeImpl> computes = cluster.stream().map(IgniteComputeImpl::new).collect(toList()); + + computes.forEach(IgniteComputeImpl::start); + + return computes.get(0); + } + + private List<ClusterService> createCluster(TestInfo testInfo, int count) { + int port = 3344; + + List<ClusterService> services = new ArrayList<>(); + + for (int i = 0; i < count; i++) { + ClusterService service = clusterService(testInfo, port + i); + service.start(); + + services.add(service); + } + + for (ClusterService service : services) { + assertTrue(waitForTopology(service, count, 1000)); + } + + return services; + } + + private ClusterService clusterService(TestInfo testInfo, int port) { + MessageSerializationRegistryImpl registry = new MessageSerializationRegistryImpl(); + ComputeMessagesSerializationRegistryInitializer.registerFactories(registry); + + return ClusterServiceTestUtils.clusterService( + testInfo, + port, + new StaticNodeFinder(List.of(new NetworkAddress("localhost", 3344))), + new TestScaleCubeClusterServiceFactory(), + registry + ); + } + + private static boolean waitForTopology(ClusterService cluster, int expected, int timeout) { + return waitForCondition(() -> cluster.topologyService().allMembers().size() >= expected, timeout); + } + + @SuppressWarnings("BusyWait") + private static boolean waitForCondition(BooleanSupplier cond, long timeout) { Review comment: I suggest to extract it to a utility method so that everyone can use it ########## File path: modules/compute/src/integrationTest/java/org/apache/ignite/compute/internal/ItComputeTest.java ########## @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.compute.internal; + +import static java.lang.Thread.sleep; +import static java.util.stream.Collectors.toList; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BooleanSupplier; +import org.apache.ignite.compute.ComputeException; +import org.apache.ignite.compute.ComputeTask; +import org.apache.ignite.compute.internal.message.ComputeMessagesSerializationRegistryInitializer; +import org.apache.ignite.network.ClusterService; +import org.apache.ignite.network.MessageSerializationRegistryImpl; +import org.apache.ignite.network.NetworkAddress; +import org.apache.ignite.network.StaticNodeFinder; +import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory; +import org.apache.ignite.utils.ClusterServiceTestUtils; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; + +/** + * Integration tests for {@link IgniteComputeImpl}. + */ +public class ItComputeTest { + private static CountDownLatch latch1; + private static CountDownLatch latch2; + + private static AtomicInteger counter; + + private List<ClusterService> cluster; + + @AfterEach + void tearDown() { + cluster.forEach(ClusterService::stop); + + cluster = null; + } + + @AfterAll + static void afterAll() { + latch1 = null; + latch2 = null; + counter = null; + } + + private static class Compute implements ComputeTask<Integer, Integer, Integer> { + @Override + public Integer execute(Integer argument) { + return 1 + argument; + } + + @Override + public Integer reduce(Collection<Integer> results) { + return results.stream().reduce(Integer::sum).get(); + } + } + + private static class DisconnectingCompute implements ComputeTask<Integer, Integer, Integer> { + @Override + public Integer execute(Integer argument) { + latch1.countDown(); + + try { + latch2.await(); + } catch (InterruptedException e) { + fail(); + } + + return argument + 1; + } + + @Override + public Integer reduce(Collection<Integer> results) { + return results.stream().reduce(Integer::sum).get(); + } + } + + private static class ExceptionalCompute implements ComputeTask<Integer, Integer, Integer> { + @Override + public Integer execute(Integer argument) { + if (counter.getAndIncrement() == 1) { + throw new RuntimeException("Fail"); + } + + return argument + 1; + } + + @Override + public Integer reduce(Collection<Integer> results) { + return results.stream().reduce(Integer::sum).get(); + } + } + + @Test + public void testSuccess(TestInfo testInfo) { + cluster = createCluster(testInfo, 5); + + IgniteComputeImpl compute = createCompute(cluster); + + Integer execute = compute.execute(Compute.class, 1); + + assertEquals(10, execute); + } + + @Test + public void testPartialSuccess(TestInfo testInfo) { + latch1 = new CountDownLatch(5); + latch2 = new CountDownLatch(1); + + cluster = createCluster(testInfo, 5); + + IgniteComputeImpl compute = createCompute(cluster); + + Thread t = new Thread(() -> { + try { + latch1.await(); + } catch (InterruptedException e) { + fail(); + } + + ClusterService clusterService = cluster.get(1); + clusterService.stop(); + latch2.countDown(); + }); + + t.start(); + + Integer execute = compute.execute(DisconnectingCompute.class, 1); + + assertEquals(8, execute); + } + + @Test + public void testException(TestInfo testInfo) { + counter = new AtomicInteger(); + + cluster = createCluster(testInfo, 5); + + IgniteComputeImpl compute = createCompute(cluster); + + assertThrows(ComputeException.class, () -> compute.execute(ExceptionalCompute.class, 1)); Review comment: Let's also inspect the thrown exception to make sure that it's what was thrown by a task (and not something unrelated) ########## File path: modules/compute/src/main/java/org/apache/ignite/compute/internal/Execution.java ########## @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.compute.internal; + +import java.util.Collection; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; +import org.apache.ignite.compute.ComputeException; +import org.apache.ignite.compute.ComputeTask; +import org.apache.ignite.network.ClusterNode; + +/** + * {@link ComputeTask}'s execution. + * + * @param <T> Intermediate result's type. + */ +public class Execution<T> { + /** Map of the compute task's results. */ + private final ConcurrentMap<String, ExecutionResult<T>> results = new ConcurrentHashMap<>(); + + /** Set of the nodes that are required to send back the results. */ + private final Set<String> requiredNodes; + + /** Future that is resolved when the all of the results are acquired or exception has been caught during task's execution. */ + private final CompletableFuture<Collection<ExecutionResult<T>>> future = new CompletableFuture<>(); + + public Execution(Collection<String> requiredNodes) { + this.requiredNodes = ConcurrentHashMap.newKeySet(requiredNodes.size()); + this.requiredNodes.addAll(requiredNodes); + } + + /** + * Waits and returns the result of the execution of the compute task on every required node. + * + * @return Collection of execution results. + * @throws ComputeException If execution failed on any node or if the thread was interrupted. + */ + Collection<ExecutionResult<T>> getResult() throws ComputeException { + try { + return future.get(); + } catch (InterruptedException e) { + throw new ComputeException("Failed to wait for compute result: " + e.getMessage(), e); + } catch (ExecutionException e) { + Throwable cause = e.getCause(); + + assert cause instanceof ComputeException; + + throw (ComputeException) cause; + } + } + + /** + * Handles receiving of the result from a node. + * + * @param nodeId Which node sent the result. + * @param result Result ({@code null} if task failed on that node. + * @param exception Task error ({@code null} if task finished successfully). + */ + void onResultReceived(String nodeId, T result, Throwable exception) { + if (exception != null) { + // We shouldn't always fail the future, different fail-over modes should be introduced + assert exception instanceof ComputeException; + + future.completeExceptionally(exception); + return; + } + + requiredNodes.remove(nodeId); + results.put(nodeId, new ExecutionResult<>(nodeId, result, exception)); Review comment: May a result from same node be received twice? ########## File path: modules/compute/src/main/java/org/apache/ignite/compute/internal/IgniteComputeImpl.java ########## @@ -0,0 +1,302 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.compute.internal; + +import static java.util.stream.Collectors.toList; +import static org.apache.ignite.compute.internal.util.GenericUtils.getTypeArguments; + +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Type; +import java.util.Collection; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import org.apache.ignite.compute.ComputeException; +import org.apache.ignite.compute.ComputeTask; +import org.apache.ignite.compute.IgniteCompute; +import org.apache.ignite.compute.internal.message.ComputeMessagesFactory; +import org.apache.ignite.compute.internal.message.ComputeMessagesType; +import org.apache.ignite.compute.internal.message.ExecuteMessage; +import org.apache.ignite.compute.internal.message.ResultMessage; +import org.apache.ignite.internal.manager.IgniteComponent; +import org.apache.ignite.internal.util.IgniteUtils; +import org.apache.ignite.network.ClusterNode; +import org.apache.ignite.network.ClusterService; +import org.apache.ignite.network.MessagingService; +import org.apache.ignite.network.NetworkAddress; +import org.apache.ignite.network.NetworkMessage; +import org.apache.ignite.network.NetworkMessageHandler; +import org.apache.ignite.network.TopologyEventHandler; +import org.jetbrains.annotations.NotNull; + +/** + * Implementation of the compute service. + */ +public class IgniteComputeImpl implements IgniteCompute, IgniteComponent { + private final ClusterService clusterService; + + private final ExecutorService computeExecutor = Executors.newSingleThreadExecutor(); + + private final ComputeMessagesFactory messagesFactory = new ComputeMessagesFactory(); + + private final ConcurrentMap<UUID, Execution<?>> executions = new ConcurrentHashMap<>(); + + public IgniteComputeImpl(ClusterService clusterService) { + this.clusterService = clusterService; + } + + /** {@inheritDoc} */ + @Override + public <T, R, F> F execute(Class<? extends ComputeTask<T, R, F>> clazz, T argument) throws ComputeException { + UUID executionId = UUID.randomUUID(); + + Execution<R> execution = registerExecution(clazz, argument, executionId); + + Collection<ExecutionResult<R>> executionResults = execution.getResult(); + + executions.remove(executionId); + + ComputeTask<T, R, F> computeTask; + try { + Constructor<? extends ComputeTask<T, R, F>> declaredConstructor = clazz.getDeclaredConstructor(); + declaredConstructor.setAccessible(true); + computeTask = declaredConstructor.newInstance(); + } catch (InstantiationException | IllegalAccessException | NoSuchMethodException | InvocationTargetException e) { + throw new ComputeException("Failed to execute reduce: " + e.getMessage(), e); + } + + return computeTask.reduce(executionResults.stream().map(ExecutionResult::result).collect(toList())); + } + + /** {@inheritDoc} */ + @Override + public void start() { + clusterService.messagingService().addMessageHandler(ComputeMessagesType.class, new ComputeNetworkHandler()); + + clusterService.topologyService().addEventHandler(new TopologyHandler()); + } + + /** {@inheritDoc} */ + @Override + public void stop() throws Exception { + IgniteUtils.shutdownAndAwaitTermination(computeExecutor, 5, TimeUnit.SECONDS); + } + + /** + * Creates new execution based on a task, an argument and an id. + * + * @param clazz Task's class. + * @param argument Task's argument. + * @param executionId Execution id. + * @param <T> Argument's type. + * @param <R> Intermediate result's type. + * @param <F> Result's type. + * @return Execution. + */ + @NotNull + private <T, R, F> Execution<R> registerExecution(Class<? extends ComputeTask<T, R, F>> clazz, T argument, UUID executionId) { + Collection<ClusterNode> currentTopology = this.clusterService.topologyService().allMembers(); + + List<String> nodeIds = currentTopology.stream().map(ClusterNode::id).collect(toList()); + + var execution = new Execution<R>(nodeIds); + + Execution<?> prev = executions.putIfAbsent(executionId, execution); + assert prev == null; + + MessagingService messagingService = this.clusterService.messagingService(); + + ExecuteMessage executeMessage = messagesFactory.executeMessage() + .executionId(executionId).className(clazz.getName()).argument(argument).build(); Review comment: Let's add line breaks just before `.argument(` and `.build()` so that the structure becomes more obvious ########## File path: modules/compute/src/main/java/org/apache/ignite/compute/internal/IgniteComputeImpl.java ########## @@ -0,0 +1,302 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.compute.internal; + +import static java.util.stream.Collectors.toList; +import static org.apache.ignite.compute.internal.util.GenericUtils.getTypeArguments; + +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Type; +import java.util.Collection; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import org.apache.ignite.compute.ComputeException; +import org.apache.ignite.compute.ComputeTask; +import org.apache.ignite.compute.IgniteCompute; +import org.apache.ignite.compute.internal.message.ComputeMessagesFactory; +import org.apache.ignite.compute.internal.message.ComputeMessagesType; +import org.apache.ignite.compute.internal.message.ExecuteMessage; +import org.apache.ignite.compute.internal.message.ResultMessage; +import org.apache.ignite.internal.manager.IgniteComponent; +import org.apache.ignite.internal.util.IgniteUtils; +import org.apache.ignite.network.ClusterNode; +import org.apache.ignite.network.ClusterService; +import org.apache.ignite.network.MessagingService; +import org.apache.ignite.network.NetworkAddress; +import org.apache.ignite.network.NetworkMessage; +import org.apache.ignite.network.NetworkMessageHandler; +import org.apache.ignite.network.TopologyEventHandler; +import org.jetbrains.annotations.NotNull; + +/** + * Implementation of the compute service. + */ +public class IgniteComputeImpl implements IgniteCompute, IgniteComponent { + private final ClusterService clusterService; + + private final ExecutorService computeExecutor = Executors.newSingleThreadExecutor(); + + private final ComputeMessagesFactory messagesFactory = new ComputeMessagesFactory(); + + private final ConcurrentMap<UUID, Execution<?>> executions = new ConcurrentHashMap<>(); + + public IgniteComputeImpl(ClusterService clusterService) { + this.clusterService = clusterService; + } + + /** {@inheritDoc} */ + @Override + public <T, R, F> F execute(Class<? extends ComputeTask<T, R, F>> clazz, T argument) throws ComputeException { + UUID executionId = UUID.randomUUID(); + + Execution<R> execution = registerExecution(clazz, argument, executionId); + + Collection<ExecutionResult<R>> executionResults = execution.getResult(); + + executions.remove(executionId); + + ComputeTask<T, R, F> computeTask; Review comment: Let's extract task instantiation in a method to avoid cluttering the code with low-level details. ########## File path: modules/compute/src/main/java/org/apache/ignite/compute/internal/IgniteComputeImpl.java ########## @@ -0,0 +1,302 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.compute.internal; + +import static java.util.stream.Collectors.toList; +import static org.apache.ignite.compute.internal.util.GenericUtils.getTypeArguments; + +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Type; +import java.util.Collection; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import org.apache.ignite.compute.ComputeException; +import org.apache.ignite.compute.ComputeTask; +import org.apache.ignite.compute.IgniteCompute; +import org.apache.ignite.compute.internal.message.ComputeMessagesFactory; +import org.apache.ignite.compute.internal.message.ComputeMessagesType; +import org.apache.ignite.compute.internal.message.ExecuteMessage; +import org.apache.ignite.compute.internal.message.ResultMessage; +import org.apache.ignite.internal.manager.IgniteComponent; +import org.apache.ignite.internal.util.IgniteUtils; +import org.apache.ignite.network.ClusterNode; +import org.apache.ignite.network.ClusterService; +import org.apache.ignite.network.MessagingService; +import org.apache.ignite.network.NetworkAddress; +import org.apache.ignite.network.NetworkMessage; +import org.apache.ignite.network.NetworkMessageHandler; +import org.apache.ignite.network.TopologyEventHandler; +import org.jetbrains.annotations.NotNull; + +/** + * Implementation of the compute service. + */ +public class IgniteComputeImpl implements IgniteCompute, IgniteComponent { + private final ClusterService clusterService; + + private final ExecutorService computeExecutor = Executors.newSingleThreadExecutor(); + + private final ComputeMessagesFactory messagesFactory = new ComputeMessagesFactory(); + + private final ConcurrentMap<UUID, Execution<?>> executions = new ConcurrentHashMap<>(); + + public IgniteComputeImpl(ClusterService clusterService) { + this.clusterService = clusterService; + } + + /** {@inheritDoc} */ + @Override + public <T, R, F> F execute(Class<? extends ComputeTask<T, R, F>> clazz, T argument) throws ComputeException { + UUID executionId = UUID.randomUUID(); + + Execution<R> execution = registerExecution(clazz, argument, executionId); + + Collection<ExecutionResult<R>> executionResults = execution.getResult(); + + executions.remove(executionId); + + ComputeTask<T, R, F> computeTask; + try { + Constructor<? extends ComputeTask<T, R, F>> declaredConstructor = clazz.getDeclaredConstructor(); + declaredConstructor.setAccessible(true); + computeTask = declaredConstructor.newInstance(); + } catch (InstantiationException | IllegalAccessException | NoSuchMethodException | InvocationTargetException e) { + throw new ComputeException("Failed to execute reduce: " + e.getMessage(), e); + } + + return computeTask.reduce(executionResults.stream().map(ExecutionResult::result).collect(toList())); + } + + /** {@inheritDoc} */ + @Override + public void start() { + clusterService.messagingService().addMessageHandler(ComputeMessagesType.class, new ComputeNetworkHandler()); + + clusterService.topologyService().addEventHandler(new TopologyHandler()); + } + + /** {@inheritDoc} */ + @Override + public void stop() throws Exception { + IgniteUtils.shutdownAndAwaitTermination(computeExecutor, 5, TimeUnit.SECONDS); + } + + /** + * Creates new execution based on a task, an argument and an id. + * + * @param clazz Task's class. + * @param argument Task's argument. + * @param executionId Execution id. + * @param <T> Argument's type. + * @param <R> Intermediate result's type. + * @param <F> Result's type. + * @return Execution. + */ + @NotNull + private <T, R, F> Execution<R> registerExecution(Class<? extends ComputeTask<T, R, F>> clazz, T argument, UUID executionId) { + Collection<ClusterNode> currentTopology = this.clusterService.topologyService().allMembers(); + + List<String> nodeIds = currentTopology.stream().map(ClusterNode::id).collect(toList()); + + var execution = new Execution<R>(nodeIds); + + Execution<?> prev = executions.putIfAbsent(executionId, execution); + assert prev == null; + + MessagingService messagingService = this.clusterService.messagingService(); + + ExecuteMessage executeMessage = messagesFactory.executeMessage() + .executionId(executionId).className(clazz.getName()).argument(argument).build(); + + currentTopology.forEach(clusterNode -> { + try { + messagingService.send(clusterNode, executeMessage).get(3, TimeUnit.SECONDS); + } catch (InterruptedException | ExecutionException | TimeoutException e) { Review comment: What if a different exception is thrown here? Will the execution get stuck forever? ########## File path: modules/compute/src/main/java/org/apache/ignite/compute/internal/Execution.java ########## @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.compute.internal; + +import java.util.Collection; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; +import org.apache.ignite.compute.ComputeException; +import org.apache.ignite.compute.ComputeTask; +import org.apache.ignite.network.ClusterNode; + +/** + * {@link ComputeTask}'s execution. + * + * @param <T> Intermediate result's type. + */ +public class Execution<T> { + /** Map of the compute task's results. */ + private final ConcurrentMap<String, ExecutionResult<T>> results = new ConcurrentHashMap<>(); + + /** Set of the nodes that are required to send back the results. */ + private final Set<String> requiredNodes; + + /** Future that is resolved when the all of the results are acquired or exception has been caught during task's execution. */ Review comment: 'the all' -> 'all' ########## File path: modules/compute/src/main/java/org/apache/ignite/compute/internal/Execution.java ########## @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.compute.internal; + +import java.util.Collection; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; +import org.apache.ignite.compute.ComputeException; +import org.apache.ignite.compute.ComputeTask; +import org.apache.ignite.network.ClusterNode; + +/** + * {@link ComputeTask}'s execution. + * + * @param <T> Intermediate result's type. + */ +public class Execution<T> { + /** Map of the compute task's results. */ + private final ConcurrentMap<String, ExecutionResult<T>> results = new ConcurrentHashMap<>(); + + /** Set of the nodes that are required to send back the results. */ + private final Set<String> requiredNodes; + + /** Future that is resolved when the all of the results are acquired or exception has been caught during task's execution. */ + private final CompletableFuture<Collection<ExecutionResult<T>>> future = new CompletableFuture<>(); + + public Execution(Collection<String> requiredNodes) { + this.requiredNodes = ConcurrentHashMap.newKeySet(requiredNodes.size()); + this.requiredNodes.addAll(requiredNodes); + } + + /** + * Waits and returns the result of the execution of the compute task on every required node. + * + * @return Collection of execution results. + * @throws ComputeException If execution failed on any node or if the thread was interrupted. + */ + Collection<ExecutionResult<T>> getResult() throws ComputeException { + try { + return future.get(); + } catch (InterruptedException e) { + throw new ComputeException("Failed to wait for compute result: " + e.getMessage(), e); + } catch (ExecutionException e) { + Throwable cause = e.getCause(); + + assert cause instanceof ComputeException; + + throw (ComputeException) cause; + } + } + + /** + * Handles receiving of the result from a node. + * + * @param nodeId Which node sent the result. + * @param result Result ({@code null} if task failed on that node. + * @param exception Task error ({@code null} if task finished successfully). + */ + void onResultReceived(String nodeId, T result, Throwable exception) { + if (exception != null) { + // We shouldn't always fail the future, different fail-over modes should be introduced + assert exception instanceof ComputeException; + + future.completeExceptionally(exception); + return; + } + + requiredNodes.remove(nodeId); + results.put(nodeId, new ExecutionResult<>(nodeId, result, exception)); Review comment: Also, why do we need an exception inside `ExecutionResult` if we always complete exceptionally on an exception? Is this needed for future when we have those different fail-over modes? ########## File path: modules/compute/src/test/java/org/apache/ignite/compute/internal/IgniteComputeImplTest.java ########## @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.compute.internal; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.fail; +import static org.mockito.Mockito.mock; + +import java.util.Collection; +import java.util.stream.Stream; +import org.apache.ignite.compute.ComputeException; +import org.apache.ignite.compute.ComputeTask; +import org.apache.ignite.network.ClusterService; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; + +/** + * Unit tests for compute implementation. + */ +class IgniteComputeImplTest { Review comment: These tests actually test `execute0()` method, so only the bare execution logic is tested, without network/coordination/pools. It probably makes sense to note this in comments and class names, like `IgniteComputeImplBareExecutionTest`. ########## File path: modules/compute/src/main/java/org/apache/ignite/compute/internal/IgniteComputeImpl.java ########## @@ -0,0 +1,302 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.compute.internal; + +import static java.util.stream.Collectors.toList; +import static org.apache.ignite.compute.internal.util.GenericUtils.getTypeArguments; + +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Type; +import java.util.Collection; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import org.apache.ignite.compute.ComputeException; +import org.apache.ignite.compute.ComputeTask; +import org.apache.ignite.compute.IgniteCompute; +import org.apache.ignite.compute.internal.message.ComputeMessagesFactory; +import org.apache.ignite.compute.internal.message.ComputeMessagesType; +import org.apache.ignite.compute.internal.message.ExecuteMessage; +import org.apache.ignite.compute.internal.message.ResultMessage; +import org.apache.ignite.internal.manager.IgniteComponent; +import org.apache.ignite.internal.util.IgniteUtils; +import org.apache.ignite.network.ClusterNode; +import org.apache.ignite.network.ClusterService; +import org.apache.ignite.network.MessagingService; +import org.apache.ignite.network.NetworkAddress; +import org.apache.ignite.network.NetworkMessage; +import org.apache.ignite.network.NetworkMessageHandler; +import org.apache.ignite.network.TopologyEventHandler; +import org.jetbrains.annotations.NotNull; + +/** + * Implementation of the compute service. + */ +public class IgniteComputeImpl implements IgniteCompute, IgniteComponent { + private final ClusterService clusterService; + + private final ExecutorService computeExecutor = Executors.newSingleThreadExecutor(); + + private final ComputeMessagesFactory messagesFactory = new ComputeMessagesFactory(); + + private final ConcurrentMap<UUID, Execution<?>> executions = new ConcurrentHashMap<>(); + + public IgniteComputeImpl(ClusterService clusterService) { + this.clusterService = clusterService; + } + + /** {@inheritDoc} */ + @Override + public <T, R, F> F execute(Class<? extends ComputeTask<T, R, F>> clazz, T argument) throws ComputeException { + UUID executionId = UUID.randomUUID(); + + Execution<R> execution = registerExecution(clazz, argument, executionId); + + Collection<ExecutionResult<R>> executionResults = execution.getResult(); + + executions.remove(executionId); + + ComputeTask<T, R, F> computeTask; + try { + Constructor<? extends ComputeTask<T, R, F>> declaredConstructor = clazz.getDeclaredConstructor(); + declaredConstructor.setAccessible(true); + computeTask = declaredConstructor.newInstance(); + } catch (InstantiationException | IllegalAccessException | NoSuchMethodException | InvocationTargetException e) { + throw new ComputeException("Failed to execute reduce: " + e.getMessage(), e); + } + + return computeTask.reduce(executionResults.stream().map(ExecutionResult::result).collect(toList())); + } + + /** {@inheritDoc} */ + @Override + public void start() { + clusterService.messagingService().addMessageHandler(ComputeMessagesType.class, new ComputeNetworkHandler()); + + clusterService.topologyService().addEventHandler(new TopologyHandler()); + } + + /** {@inheritDoc} */ + @Override + public void stop() throws Exception { + IgniteUtils.shutdownAndAwaitTermination(computeExecutor, 5, TimeUnit.SECONDS); + } + + /** + * Creates new execution based on a task, an argument and an id. + * + * @param clazz Task's class. + * @param argument Task's argument. + * @param executionId Execution id. + * @param <T> Argument's type. + * @param <R> Intermediate result's type. + * @param <F> Result's type. + * @return Execution. + */ + @NotNull + private <T, R, F> Execution<R> registerExecution(Class<? extends ComputeTask<T, R, F>> clazz, T argument, UUID executionId) { + Collection<ClusterNode> currentTopology = this.clusterService.topologyService().allMembers(); + + List<String> nodeIds = currentTopology.stream().map(ClusterNode::id).collect(toList()); + + var execution = new Execution<R>(nodeIds); + + Execution<?> prev = executions.putIfAbsent(executionId, execution); + assert prev == null; + + MessagingService messagingService = this.clusterService.messagingService(); + + ExecuteMessage executeMessage = messagesFactory.executeMessage() + .executionId(executionId).className(clazz.getName()).argument(argument).build(); + + currentTopology.forEach(clusterNode -> { + try { + messagingService.send(clusterNode, executeMessage).get(3, TimeUnit.SECONDS); + } catch (InterruptedException | ExecutionException | TimeoutException e) { + executions.remove(executionId); + + throw new ComputeException("Failed to send compute message: " + e.getMessage(), e); + } + }); + return execution; + } + + /** + * Notifies execution's of node disappearances. + */ + private class TopologyHandler implements TopologyEventHandler { + @Override + public void onAppeared(ClusterNode member) { + // No-op. + } + + @Override + public void onDisappeared(ClusterNode member) { + executions.values().forEach(execution -> execution.notifyNodeDisappeared(member)); + } + } + + /** Handles execute commands and execution results. */ + private class ComputeNetworkHandler implements NetworkMessageHandler { + /** {@inheritDoc} */ + @Override + public void onReceived(NetworkMessage message, NetworkAddress senderAddr, String correlationId) { + short messageType = message.messageType(); + + switch (messageType) { + case ComputeMessagesType.EXECUTE_MESSAGE: + assert message instanceof ExecuteMessage; + + handleExecuteCommand((ExecuteMessage) message, senderAddr); + break; + + case ComputeMessagesType.RESULT_MESSAGE: + assert message instanceof ResultMessage; + + handleResult((ResultMessage) message, senderAddr); + break; + + default: + throw new IllegalArgumentException("Unknown message type: " + messageType); + } + } + } + + /** + * Handles an intermediate result sent by a node. + * + * @param message Message with the intermediate result. + * @param senderAddr Node's address. + */ + private void handleResult(ResultMessage message, NetworkAddress senderAddr) { + UUID executionId = message.executionId(); + + @SuppressWarnings("rawtypes") Execution execution = executions.get(executionId); + + ClusterNode clusterNode = clusterService.topologyService().getByAddress(senderAddr); + + if (clusterNode == null) { + // Node left the topology, this will be handled by the TopologyHandler + return; + } + + //noinspection unchecked + execution.onResultReceived(clusterNode.id(), message.result(), message.exception()); + } + + /** + * Handles the execution command sent by a node. + * + * @param msg Message with the execution command. + * @param senderAddr Node's address. + */ + private void handleExecuteCommand(ExecuteMessage msg, NetworkAddress senderAddr) { + String className = msg.className(); + Object argument = msg.argument(); + + executeOnExecutor(className, argument).whenComplete((result, throwable) -> { + if (throwable != null) { + throwable = throwable.getCause(); + + assert throwable instanceof ComputeException; + } + + ResultMessage resultMessage = messagesFactory.resultMessage() + .executionId(msg.executionId()).result(result).exception(throwable).build(); + + this.clusterService.messagingService().send(senderAddr, resultMessage, null); + }); + } + + private CompletableFuture<Object> executeOnExecutor(@NotNull String className, @NotNull Object argument) { Review comment: Why is `argument` not null? This does not seem to be checked from `IgniteCompute#execute()` till this point. ########## File path: modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/MessageReaderMethodResolver.java ########## @@ -59,14 +59,14 @@ CodeBlock resolveReadMethod(ExecutableElement getter) { String parameterName = getter.getSimpleName().toString(); - String methodName = methodNameResolver.resolveBaseMethodName(parameterType); - if (getter.getAnnotation(Marshallable.class) != null) { return CodeBlock.builder() - .add("readMarshallable($S)", parameterName) Review comment: Aren't the line continuations equal to 2 indents (i.e. 8 spaces)? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
