valepakh commented on code in PR #3043: URL: https://github.com/apache/ignite-3/pull/3043#discussion_r1458989911
########## modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/utils/InteractiveJobs.java: ########## @@ -0,0 +1,369 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.compute.utils; + +import static org.awaitility.Awaitility.await; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.notNullValue; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.ignite.compute.ComputeJob; +import org.apache.ignite.compute.JobExecutionContext; +import org.apache.ignite.network.ClusterNode; + +/** + * Tests DSL for interactive jobs. "Interactive" means that you can send messages and get responses to/from running jobs. + * + * <p>For example, you can start {@link GlobalInteractiveJob} on some node, get the name of worker node for this job, + * ask this job to complete successfully or throw exception. Also, this class gives useful assertions for job states. + * + * @see org.apache.ignite.internal.compute.ItWorkerShutdownTest + */ +public final class InteractiveJobs { + /** + * ACK for {@link Signal#CONTINUE}. Returned by a job that has received the signal. Used to check that the job is alive. + */ + private static final Object ack = new Object(); + + /** + * Class-wide queue that is used as a communication channel between {@link GlobalInteractiveJob} and test code. You can send a signal to + * the job via this channel and get a response from the job via {@link #GLOBAL_CHANNEL}. + */ + private static final BlockingQueue<Signal> GLOBAL_SIGNALS = new LinkedBlockingQueue<>(); + + /** + * Class-wide queue that is used as a communication channel between {@link GlobalInteractiveJob} and test code. You can send a signal to + * the job via {@link #GLOBAL_SIGNALS} and get a response from the job via this channel. + */ + private static final BlockingQueue<Object> GLOBAL_CHANNEL = new LinkedBlockingQueue<>(); + + /** + * Node-specific queues that are used as a communication channel between {@link InteractiveJob} and test code. The semantics are the + * same as for {@link #GLOBAL_SIGNALS} except that each node has its own queue. So, test code can communicate with a + * {@link InteractiveJob} that is running on specific node. + */ + private static final Map<String, BlockingQueue<Signal>> NODE_SIGNALS = new ConcurrentHashMap<>(); + + /** + * Node-specific queues that are used as a communication channel between {@link InteractiveJob} and test code. The semantics are the + * same as for {@link #GLOBAL_CHANNEL} except that each node has its own queue. So, test code can communicate with a + * {@link InteractiveJob} that is running on specific node. + */ + private static final Map<String, BlockingQueue<Object>> NODE_CHANNELS = new ConcurrentHashMap<>(); + + /** + * Node-specific counters that are used to count how many times {@link InteractiveJob} has been run on specific node. + */ + private static final Map<String, Integer> INTERACTIVE_JOB_RUN_TIMES = new ConcurrentHashMap<>(); + + /** + * Counts for all running interactive jobs. + */ + private static final AtomicInteger RUNNING_INTERACTIVE_JOBS_CNT = new AtomicInteger(0); + + /** + * Counts how many times the global job was called. Cleaned up in {@link #clearState}. + */ + private static final AtomicInteger RUNNING_GLOBAL_JOBS_CNT = new AtomicInteger(0); + + /** + * The timeout in seconds that defines how long should we wait for async calls. Almost all methods use this timeout. + */ + private static final long WAIT_TIMEOUT_SECONDS = 15; + + /** + * Clear global state. Must be called before each testing scenario. + */ + public static void clearState() { + assertThat( + "Interactive job is running. Can not clear global state. Please, stop the job first.", + RUNNING_INTERACTIVE_JOBS_CNT.get(), + equalTo(0) + ); + assertThat( + "Global job is running. Can not clear global state. Please, stop the job first.", + RUNNING_GLOBAL_JOBS_CNT.get(), + equalTo(0) + ); + + GLOBAL_SIGNALS.clear(); + GLOBAL_CHANNEL.clear(); + NODE_SIGNALS.clear(); + NODE_CHANNELS.clear(); + INTERACTIVE_JOB_RUN_TIMES.clear(); + } + + public static String interactiveJobName() { + return InteractiveJob.class.getName(); + } + + /** + * Signals that are sent by test code to the jobs. + */ + private enum Signal { + /** + * Signal to the job to continue running and send ACK as a response. + */ + CONTINUE, + /** + * Ask job to throw an exception. + */ + THROW, + /** + * Ask job to return result. + */ + RETURN, + /** + * Signal to the job to continue running and send current worker name to the response channel. + */ + GET_WORKER_NAME + } + + /** + * Interactive job that communicates via {@link #GLOBAL_CHANNEL} and {@link #GLOBAL_SIGNALS}. + */ + private static class GlobalInteractiveJob implements ComputeJob<String> { + private static Signal listenSignal() { + Signal recievedSignal; Review Comment: Please replace all `recievedSignal` with `receivedSignal` in this file. ########## modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeJobFailover.java: ########## @@ -54,16 +61,16 @@ class ComputeJobFailover<T> { /** * The failover listens this event source to know when the {@link #runningWorkerNode} has left the cluster. Review Comment: This comment is now invalid. ########## modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/utils/InteractiveJobs.java: ########## @@ -0,0 +1,369 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.compute.utils; + +import static org.awaitility.Awaitility.await; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.notNullValue; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.ignite.compute.ComputeJob; +import org.apache.ignite.compute.JobExecutionContext; +import org.apache.ignite.network.ClusterNode; + +/** + * Tests DSL for interactive jobs. "Interactive" means that you can send messages and get responses to/from running jobs. + * + * <p>For example, you can start {@link GlobalInteractiveJob} on some node, get the name of worker node for this job, + * ask this job to complete successfully or throw exception. Also, this class gives useful assertions for job states. + * + * @see org.apache.ignite.internal.compute.ItWorkerShutdownTest + */ +public final class InteractiveJobs { + /** + * ACK for {@link Signal#CONTINUE}. Returned by a job that has received the signal. Used to check that the job is alive. + */ + private static final Object ack = new Object(); + + /** + * Class-wide queue that is used as a communication channel between {@link GlobalInteractiveJob} and test code. You can send a signal to + * the job via this channel and get a response from the job via {@link #GLOBAL_CHANNEL}. + */ + private static final BlockingQueue<Signal> GLOBAL_SIGNALS = new LinkedBlockingQueue<>(); + + /** + * Class-wide queue that is used as a communication channel between {@link GlobalInteractiveJob} and test code. You can send a signal to + * the job via {@link #GLOBAL_SIGNALS} and get a response from the job via this channel. + */ + private static final BlockingQueue<Object> GLOBAL_CHANNEL = new LinkedBlockingQueue<>(); + + /** + * Node-specific queues that are used as a communication channel between {@link InteractiveJob} and test code. The semantics are the + * same as for {@link #GLOBAL_SIGNALS} except that each node has its own queue. So, test code can communicate with a + * {@link InteractiveJob} that is running on specific node. + */ + private static final Map<String, BlockingQueue<Signal>> NODE_SIGNALS = new ConcurrentHashMap<>(); + + /** + * Node-specific queues that are used as a communication channel between {@link InteractiveJob} and test code. The semantics are the + * same as for {@link #GLOBAL_CHANNEL} except that each node has its own queue. So, test code can communicate with a + * {@link InteractiveJob} that is running on specific node. + */ + private static final Map<String, BlockingQueue<Object>> NODE_CHANNELS = new ConcurrentHashMap<>(); + + /** + * Node-specific counters that are used to count how many times {@link InteractiveJob} has been run on specific node. + */ + private static final Map<String, Integer> INTERACTIVE_JOB_RUN_TIMES = new ConcurrentHashMap<>(); + + /** + * Counts for all running interactive jobs. + */ + private static final AtomicInteger RUNNING_INTERACTIVE_JOBS_CNT = new AtomicInteger(0); + + /** + * Counts how many times the global job was called. Cleaned up in {@link #clearState}. Review Comment: Could you please rephrase that to indicate that this is a current running jobs counter? ########## modules/compute/src/main/java/org/apache/ignite/internal/compute/NextCollocatedWorkerSelector.java: ########## @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.compute; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import org.apache.ignite.internal.hlc.HybridClock; +import org.apache.ignite.internal.logger.IgniteLogger; +import org.apache.ignite.internal.logger.Loggers; +import org.apache.ignite.internal.placementdriver.PlacementDriver; +import org.apache.ignite.internal.placementdriver.ReplicaMeta; +import org.apache.ignite.internal.replicator.TablePartitionId; +import org.apache.ignite.internal.table.IgniteTablesInternal; +import org.apache.ignite.internal.table.TableViewInternal; +import org.apache.ignite.lang.TableNotFoundException; +import org.apache.ignite.lang.util.IgniteNameUtils; +import org.apache.ignite.network.ClusterNode; +import org.apache.ignite.network.TopologyService; +import org.apache.ignite.table.Tuple; +import org.apache.ignite.table.mapper.Mapper; +import org.jetbrains.annotations.Nullable; + +/** + * Next worker selector that returns primary replica node for next worker. If there is no such node (we lost the majority, for example) the + * {@code CompletableFuture.completedFuture(null)} will be returned. + * + * @param <K> type of the key for the colocated table. + */ +public class NextCollocatedWorkerSelector<K> implements NextWorkerSelector { Review Comment: Should be `colocated`, not `collocated` ########## modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java: ########## @@ -125,23 +149,27 @@ private <R> JobExecution<R> executeOnOneNodeWithFailover( return computeComponent.executeLocally(units, jobClassName, args); } else { return new ComputeJobFailover<R>( - computeComponent, nodeLeftEventsSource, - targetNode, failoverCandidates, units, + computeComponent, logicalTopologyService, + targetNode, nextWorkerSelector, units, jobClassName, args ).failSafeExecute(); } } - private <R> JobExecution<R> executeOnOneNode( - ClusterNode targetNode, - List<DeploymentUnit> units, - String jobClassName, - Object[] args - ) { - if (isLocal(targetNode)) { - return computeComponent.executeLocally(units, jobClassName, args); - } else { - return computeComponent.executeRemotely(targetNode, units, jobClassName, args); + private static class DeqNexWorkerSelector implements NextWorkerSelector { + private final ConcurrentLinkedDeque<ClusterNode> deque; + + private DeqNexWorkerSelector(ConcurrentLinkedDeque<ClusterNode> deque) { + this.deque = deque; + } + + @Override + public CompletableFuture<ClusterNode> next() { + try { + return CompletableFuture.completedFuture(deque.pop()); Review Comment: Static import? ########## modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeJobFailover.java: ########## @@ -54,16 +61,16 @@ class ComputeJobFailover<T> { /** * The failover listens this event source to know when the {@link #runningWorkerNode} has left the cluster. */ - private final NodeLeftEventsSource nodeLeftEventsSource; + private final LogicalTopologyService logicalTopologyService; /** - * Set of worker candidates in case {@link #runningWorkerNode} has left the cluster. + * The sector that returns the next worker node to execute job on. Review Comment: ```suggestion * The selector that returns the next worker node to execute job on. ``` ########## modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java: ########## @@ -269,6 +307,7 @@ public <R> Map<ClusterNode, JobExecution<R>> broadcastAsync( .collect(toUnmodifiableMap(identity(), // No failover nodes for broadcast. We use failover here in order to complete futures with exceptions // if worker node has left the cluster. - node -> new JobExecutionWrapper<>(executeOnOneNodeWithFailover(node, Set.of(), units, jobClassName, args)))); + node -> new JobExecutionWrapper<>(executeOnOneNodeWithFailover(node, + () -> CompletableFuture.completedFuture(null), units, jobClassName, args)))); Review Comment: `nullCompletedFuture` ########## modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/utils/InteractiveJobs.java: ########## @@ -0,0 +1,369 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.compute.utils; + +import static org.awaitility.Awaitility.await; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.notNullValue; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.ignite.compute.ComputeJob; +import org.apache.ignite.compute.JobExecutionContext; +import org.apache.ignite.network.ClusterNode; + +/** + * Tests DSL for interactive jobs. "Interactive" means that you can send messages and get responses to/from running jobs. + * + * <p>For example, you can start {@link GlobalInteractiveJob} on some node, get the name of worker node for this job, + * ask this job to complete successfully or throw exception. Also, this class gives useful assertions for job states. + * + * @see org.apache.ignite.internal.compute.ItWorkerShutdownTest + */ +public final class InteractiveJobs { + /** + * ACK for {@link Signal#CONTINUE}. Returned by a job that has received the signal. Used to check that the job is alive. + */ + private static final Object ack = new Object(); Review Comment: `ACK` since it's static? ########## modules/compute/src/main/java/org/apache/ignite/internal/compute/NextCollocatedWorkerSelector.java: ########## @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.compute; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import org.apache.ignite.internal.hlc.HybridClock; +import org.apache.ignite.internal.logger.IgniteLogger; +import org.apache.ignite.internal.logger.Loggers; +import org.apache.ignite.internal.placementdriver.PlacementDriver; +import org.apache.ignite.internal.placementdriver.ReplicaMeta; +import org.apache.ignite.internal.replicator.TablePartitionId; +import org.apache.ignite.internal.table.IgniteTablesInternal; +import org.apache.ignite.internal.table.TableViewInternal; +import org.apache.ignite.lang.TableNotFoundException; +import org.apache.ignite.lang.util.IgniteNameUtils; +import org.apache.ignite.network.ClusterNode; +import org.apache.ignite.network.TopologyService; +import org.apache.ignite.table.Tuple; +import org.apache.ignite.table.mapper.Mapper; +import org.jetbrains.annotations.Nullable; + +/** + * Next worker selector that returns primary replica node for next worker. If there is no such node (we lost the majority, for example) the + * {@code CompletableFuture.completedFuture(null)} will be returned. + * + * @param <K> type of the key for the colocated table. + */ +public class NextCollocatedWorkerSelector<K> implements NextWorkerSelector { + private static final IgniteLogger LOG = Loggers.forClass(NextCollocatedWorkerSelector.class); + + private static final int PRIMARY_REPLICA_ASK_CLOCK_ADDITION_MILLIS = 10_000; + + private static final int AWAIT_FOR_PRIMARY_REPLICA_SECONDS = 15; + + private static final String DEFAULT_SCHEMA_NAME = "PUBLIC"; + + private final IgniteTablesInternal tables; + + private final PlacementDriver placementDriver; + + private final TopologyService topologyService; + + private final HybridClock clock; + + @Nullable + private final K key; + + @Nullable + private final Mapper<K> keyMapper; + + private final Tuple tuple; + + private final TableViewInternal table; + + NextCollocatedWorkerSelector( + IgniteTablesInternal tables, + PlacementDriver placementDriver, + TopologyService topologyService, + HybridClock clock, + String tableName, + @Nullable K key, + @Nullable Mapper<K> keyMapper) { + this(tables, placementDriver, topologyService, clock, tableName, key, keyMapper, null); + } + + NextCollocatedWorkerSelector( + IgniteTablesInternal tables, + PlacementDriver placementDriver, + TopologyService topologyService, + HybridClock clock, + String tableName, + Tuple tuple) { + this(tables, placementDriver, topologyService, clock, tableName, null, null, tuple); + } + + private NextCollocatedWorkerSelector( + IgniteTablesInternal tables, + PlacementDriver placementDriver, + TopologyService topologyService, + HybridClock clock, + String tableName, + @Nullable K key, + @Nullable Mapper<K> keyMapper, + @Nullable Tuple tuple) { + this.tables = tables; + this.placementDriver = placementDriver; + this.topologyService = topologyService; + this.table = getTableViewInternal(tableName); + this.clock = clock; + this.key = key; + this.keyMapper = keyMapper; + this.tuple = tuple; + } + + private TableViewInternal getTableViewInternal(String tableName) { + TableViewInternal table; + try { + table = requiredTable(tableName).get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + return table; + } + + private CompletableFuture<ClusterNode> tryToFindPrimaryReplica(TablePartitionId tablePartitionId) + throws ExecutionException, InterruptedException { + return placementDriver.awaitPrimaryReplica( + tablePartitionId, + clock.now().addPhysicalTime(PRIMARY_REPLICA_ASK_CLOCK_ADDITION_MILLIS), + AWAIT_FOR_PRIMARY_REPLICA_SECONDS, + TimeUnit.SECONDS + ).thenApply(ReplicaMeta::getLeaseholderId) + .thenApply(topologyService::getById); + } + + @Override + public CompletableFuture<ClusterNode> next() { + TablePartitionId tablePartitionId = tablePartitionId(); + try { + return tryToFindPrimaryReplica(tablePartitionId); + } catch (InterruptedException | ExecutionException e) { + LOG.warn("Failed to resolve new primary replica for partition " + tablePartitionId); + } + + return CompletableFuture.completedFuture(null); + } + + private TablePartitionId tablePartitionId() { + TablePartitionId tablePartitionId; + if (key != null && keyMapper != null) { + tablePartitionId = new TablePartitionId(table.tableId(), table.partition(key, keyMapper)); + } else { + tablePartitionId = new TablePartitionId(table.tableId(), table.partition(tuple)); + } + return tablePartitionId; Review Comment: ```suggestion if (key != null && keyMapper != null) { return new TablePartitionId(table.tableId(), table.partition(key, keyMapper)); } else { return new TablePartitionId(table.tableId(), table.partition(tuple)); } ``` ########## modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java: ########## @@ -236,21 +269,26 @@ private CompletableFuture<TableViewInternal> requiredTable(String tableName) { }); } - private static ClusterNode leaderOfTablePartitionByTupleKey(TableViewInternal table, Tuple key) { - return requiredLeaderByPartition(table, table.partition(key)); + private @Nullable ClusterNode primaryReplicaForPartitionByTupleKey(TableViewInternal table, Tuple key) { + return primaryReplicaForPartition(table, table.partition(key)); } - private static <K> ClusterNode leaderOfTablePartitionByMappedKey(TableViewInternal table, K key, Mapper<K> keyMapper) { - return requiredLeaderByPartition(table, table.partition(key, keyMapper)); + private <K> @Nullable ClusterNode primaryReplicaForPartitionByMappedKey(TableViewInternal table, K key, Mapper<K> keyMapper) { + return primaryReplicaForPartition(table, table.partition(key, keyMapper)); } - private static ClusterNode requiredLeaderByPartition(TableViewInternal table, int partitionIndex) { - ClusterNode leaderNode = table.leaderAssignment(partitionIndex); - if (leaderNode == null) { - throw new IgniteInternalException(Common.INTERNAL_ERR, "Leader not found for partition " + partitionIndex); + private @Nullable ClusterNode primaryReplicaForPartition(TableViewInternal table, int partitionIndex) { + TablePartitionId tablePartitionId = new TablePartitionId(table.tableId(), partitionIndex); + try { + ReplicaMeta replicaMeta = placementDriver.awaitPrimaryReplica(tablePartitionId, clock.now(), 30, TimeUnit.SECONDS).get(); Review Comment: Can we use future chaining here? ########## modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java: ########## @@ -125,23 +149,27 @@ private <R> JobExecution<R> executeOnOneNodeWithFailover( return computeComponent.executeLocally(units, jobClassName, args); } else { return new ComputeJobFailover<R>( - computeComponent, nodeLeftEventsSource, - targetNode, failoverCandidates, units, + computeComponent, logicalTopologyService, + targetNode, nextWorkerSelector, units, jobClassName, args ).failSafeExecute(); } } - private <R> JobExecution<R> executeOnOneNode( - ClusterNode targetNode, - List<DeploymentUnit> units, - String jobClassName, - Object[] args - ) { - if (isLocal(targetNode)) { - return computeComponent.executeLocally(units, jobClassName, args); - } else { - return computeComponent.executeRemotely(targetNode, units, jobClassName, args); + private static class DeqNexWorkerSelector implements NextWorkerSelector { + private final ConcurrentLinkedDeque<ClusterNode> deque; + + private DeqNexWorkerSelector(ConcurrentLinkedDeque<ClusterNode> deque) { + this.deque = deque; + } + + @Override + public CompletableFuture<ClusterNode> next() { + try { + return CompletableFuture.completedFuture(deque.pop()); + } catch (NoSuchElementException ex) { + return CompletableFuture.completedFuture(null); Review Comment: `CompletableFutures.nullCompletedFuture` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
