[FLINK-5747] [distributed coordination] Eager scheduling allocates slots and deploys tasks in bulk
That way, strictly topological deployment can be guaranteed. Also, many quick deploy/not-enough-resources/fail/recover cycles can be avoided in the cases where resources need some time to appear. This closes #3295 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f113d794 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f113d794 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f113d794 Branch: refs/heads/master Commit: f113d79451ba88c487358861cc3e20aac3d19257 Parents: 5902ea0 Author: Stephan Ewen <se...@apache.org> Authored: Fri Feb 3 20:26:23 2017 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Mon Feb 20 01:01:24 2017 +0100 ---------------------------------------------------------------------- .../org/apache/flink/util/ExceptionUtils.java | 12 + .../apache/flink/util/ExceptionUtilsTest.java | 60 ++ .../flink/runtime/concurrent/Executors.java | 3 +- .../flink/runtime/concurrent/FutureUtils.java | 115 ++++ .../flink/runtime/executiongraph/Execution.java | 73 ++- .../executiongraph/ExecutionAndSlot.java | 46 ++ .../runtime/executiongraph/ExecutionGraph.java | 170 +++++- .../executiongraph/ExecutionGraphUtils.java | 106 ++++ .../executiongraph/ExecutionJobVertex.java | 46 +- .../runtime/executiongraph/ExecutionVertex.java | 3 +- .../IllegalExecutionStateException.java | 53 ++ .../apache/flink/runtime/instance/SlotPool.java | 9 +- .../runtime/concurrent/FutureUtilsTest.java | 194 ++++++ .../ExecutionGraphSchedulingTest.java | 610 +++++++++++++++++++ .../executiongraph/ExecutionGraphUtilsTest.java | 124 ++++ .../ExecutionVertexCancelTest.java | 2 +- .../ExecutionVertexSchedulingTest.java | 3 - .../executiongraph/PointwisePatternTest.java | 12 +- .../executiongraph/ProgrammedSlotProvider.java | 87 +++ .../TerminalJobStatusListener.java | 45 ++ .../LeaderChangeJobRecoveryTest.java | 23 +- .../runtime/minicluster/MiniClusterITCase.java | 28 +- .../Flip6LocalStreamEnvironment.java | 4 +- 23 files changed, 1735 insertions(+), 93 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/f113d794/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java index 6ba9ef6..69c2692 100644 --- a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java +++ b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java @@ -101,6 +101,18 @@ public final class ExceptionUtils { } /** + * Rethrows the given {@code Throwable}, if it represents an error that is fatal to the JVM. + * See {@link ExceptionUtils#isJvmFatalError(Throwable)} for a definition of fatal errors. + * + * @param t The Throwable to check and rethrow. + */ + public static void rethrowIfFatalError(Throwable t) { + if (isJvmFatalError(t)) { + throw (Error) t; + } + } + + /** * Adds a new exception as a {@link Throwable#addSuppressed(Throwable) suppressed exception} * to a prior exception, or returns the new exception, if no prior exception exists. * http://git-wip-us.apache.org/repos/asf/flink/blob/f113d794/flink-core/src/test/java/org/apache/flink/util/ExceptionUtilsTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/util/ExceptionUtilsTest.java b/flink-core/src/test/java/org/apache/flink/util/ExceptionUtilsTest.java new file mode 100644 index 0000000..343b9d6 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/util/ExceptionUtilsTest.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.util; + +import org.junit.Test; + +import static org.junit.Assert.*; + +/** + * Tests for the utility methods in {@link ExceptionUtils}. + */ +public class ExceptionUtilsTest { + + @Test + public void testStringifyNullException() { + assertNotNull(ExceptionUtils.STRINGIFIED_NULL_EXCEPTION); + assertEquals(ExceptionUtils.STRINGIFIED_NULL_EXCEPTION, ExceptionUtils.stringifyException(null)); + } + + @Test + public void testJvmFatalError() { + // not all errors are fatal + assertFalse(ExceptionUtils.isJvmFatalError(new Error())); + + // linkage errors are not fatal + assertFalse(ExceptionUtils.isJvmFatalError(new LinkageError())); + + // some errors are fatal + assertTrue(ExceptionUtils.isJvmFatalError(new InternalError())); + assertTrue(ExceptionUtils.isJvmFatalError(new UnknownError())); + } + + @Test + public void testRethrowFatalError() { + // fatal error is rethrown + try { + ExceptionUtils.rethrowIfFatalError(new InternalError()); + fail(); + } catch (InternalError ignored) {} + + // non-fatal error is not rethrown + ExceptionUtils.rethrowIfFatalError(new NoClassDefFoundError()); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f113d794/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java index 391f233..63b6a25 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.concurrent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nonnull; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; @@ -52,7 +53,7 @@ public class Executors { private DirectExecutor() {} @Override - public void execute(Runnable command) { + public void execute(@Nonnull Runnable command) { command.run(); } } http://git-wip-us.apache.org/repos/asf/flink/blob/f113d794/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java index a404c98..4948147 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java @@ -20,11 +20,22 @@ package org.apache.flink.runtime.concurrent; import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; +import java.util.Collection; import java.util.concurrent.Callable; import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicInteger; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A collection of utilities that expand the usage of {@link Future} and {@link CompletableFuture}. + */ public class FutureUtils { + // ------------------------------------------------------------------------ + // retrying operations + // ------------------------------------------------------------------------ + /** * Retry the given operation the given number of times in case of a failure. * @@ -88,4 +99,108 @@ public class FutureUtils { super(cause); } } + + // ------------------------------------------------------------------------ + // composing futures + // ------------------------------------------------------------------------ + + /** + * Creates a future that is complete once multiple other futures completed. + * The ConjunctFuture fails (completes exceptionally) once one of the Futures in the + * conjunction fails. + * + * <p>The ConjunctFuture gives access to how many Futures in the conjunction have already + * completed successfully, via {@link ConjunctFuture#getNumFuturesCompleted()}. + * + * @param futures The futures that make up the conjunction. No null entries are allowed. + * @return The ConjunctFuture that completes once all given futures are complete (or one fails). + */ + public static ConjunctFuture combineAll(Collection<? extends Future<?>> futures) { + checkNotNull(futures, "futures"); + + final ConjunctFutureImpl conjunct = new ConjunctFutureImpl(futures.size()); + + if (futures.isEmpty()) { + conjunct.complete(null); + } + else { + for (Future<?> future : futures) { + future.handle(conjunct.completionHandler); + } + } + + return conjunct; + } + + /** + * A future that is complete once multiple other futures completed. The futures are not + * necessarily of the same type, which is why the type of this Future is {@code Void}. + * The ConjunctFuture fails (completes exceptionally) once one of the Futures in the + * conjunction fails. + * + * <p>The advantage of using the ConjunctFuture over chaining all the futures (such as via + * {@link Future#thenCombine(Future, BiFunction)}) is that ConjunctFuture also tracks how + * many of the Futures are already complete. + */ + public interface ConjunctFuture extends CompletableFuture<Void> { + + /** + * Gets the total number of Futures in the conjunction. + * @return The total number of Futures in the conjunction. + */ + int getNumFuturesTotal(); + + /** + * Gets the number of Futures in the conjunction that are already complete. + * @return The number of Futures in the conjunction that are already complete + */ + int getNumFuturesCompleted(); + } + + /** + * The implementation of the {@link ConjunctFuture}. + * + * <p>Implementation notice: The member fields all have package-private access, because they are + * either accessed by an inner subclass or by the enclosing class. + */ + private static class ConjunctFutureImpl extends FlinkCompletableFuture<Void> implements ConjunctFuture { + + /** The total number of futures in the conjunction */ + final int numTotal; + + /** The number of futures in the conjunction that are already complete */ + final AtomicInteger numCompleted = new AtomicInteger(); + + /** The function that is attached to all futures in the conjunction. Once a future + * is complete, this function tracks the completion or fails the conjunct. + */ + final BiFunction<Object, Throwable, Void> completionHandler = new BiFunction<Object, Throwable, Void>() { + + @Override + public Void apply(Object o, Throwable throwable) { + if (throwable != null) { + completeExceptionally(throwable); + } + else if (numTotal == numCompleted.incrementAndGet()) { + complete(null); + } + + return null; + } + }; + + ConjunctFutureImpl(int numTotal) { + this.numTotal = numTotal; + } + + @Override + public int getNumFuturesTotal() { + return numTotal; + } + + @Override + public int getNumFuturesCompleted() { + return numCompleted.get(); + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/f113d794/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java index 60e5575..b3fe443 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java @@ -249,27 +249,8 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution * @throws IllegalStateException Thrown, if the vertex is not in CREATED state, which is the only state that permits scheduling. */ public boolean scheduleForExecution(SlotProvider slotProvider, boolean queued) { - if (slotProvider == null) { - throw new IllegalArgumentException("Cannot send null Scheduler when scheduling execution."); - } - - final SlotSharingGroup sharingGroup = vertex.getJobVertex().getSlotSharingGroup(); - final CoLocationConstraint locationConstraint = vertex.getLocationConstraint(); - - // sanity check - if (locationConstraint != null && sharingGroup == null) { - throw new RuntimeException("Trying to schedule with co-location constraint but without slot sharing allowed."); - } - - if (transitionState(CREATED, SCHEDULED)) { - - ScheduledUnit toSchedule = locationConstraint == null ? - new ScheduledUnit(this, sharingGroup) : - new ScheduledUnit(this, sharingGroup, locationConstraint); - - // IMPORTANT: To prevent leaks of cluster resources, we need to make sure that slots are returned - // in all cases where the deployment failed. we use many try {} finally {} clauses to assure that - final Future<SimpleSlot> slotAllocationFuture = slotProvider.allocateSlot(toSchedule, queued); + try { + final Future<SimpleSlot> slotAllocationFuture = allocateSlotForExecution(slotProvider, queued); // IMPORTANT: We have to use the synchronous handle operation (direct executor) here so // that we directly deploy the tasks if the slot allocation future is completed. This is @@ -296,28 +277,54 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution }); // if tasks have to scheduled immediately check that the task has been deployed - // TODO: This might be problematic if the future is not completed right away - if (!queued) { - if (!deploymentFuture.isDone()) { - markFailed(new IllegalArgumentException("The slot allocation future has not been completed yet.")); - } + if (!queued && !deploymentFuture.isDone()) { + markFailed(new IllegalArgumentException("The slot allocation future has not been completed yet.")); } - + return true; } + catch (IllegalExecutionStateException e) { + return false; + } + } + + public Future<SimpleSlot> allocateSlotForExecution(SlotProvider slotProvider, boolean queued) + throws IllegalExecutionStateException { + + checkNotNull(slotProvider); + + final SlotSharingGroup sharingGroup = vertex.getJobVertex().getSlotSharingGroup(); + final CoLocationConstraint locationConstraint = vertex.getLocationConstraint(); + + // sanity check + if (locationConstraint != null && sharingGroup == null) { + throw new IllegalStateException( + "Trying to schedule with co-location constraint but without slot sharing allowed."); + } + + // this method only works if the execution is in the state 'CREATED' + if (transitionState(CREATED, SCHEDULED)) { + + ScheduledUnit toSchedule = locationConstraint == null ? + new ScheduledUnit(this, sharingGroup) : + new ScheduledUnit(this, sharingGroup, locationConstraint); + + return slotProvider.allocateSlot(toSchedule, queued); + } else { // call race, already deployed, or already done - return false; + throw new IllegalExecutionStateException(this, CREATED, state); } } public void deployToSlot(final SimpleSlot slot) throws JobException { - // sanity checks - if (slot == null) { - throw new NullPointerException(); - } + checkNotNull(slot); + + // Check if the TaskManager died in the meantime + // This only speeds up the response to TaskManagers failing concurrently to deployments. + // The more general check is the timeout of the deployment call if (!slot.isAlive()) { - throw new JobException("Target slot for deployment is not alive."); + throw new JobException("Target slot (TaskManager) for deployment is no longer alive."); } // make sure exactly one deployment call happens from the correct state http://git-wip-us.apache.org/repos/asf/flink/blob/f113d794/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionAndSlot.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionAndSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionAndSlot.java new file mode 100644 index 0000000..ea6186e --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionAndSlot.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.instance.SimpleSlot; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A pair of an {@link Execution} together with a slot future. + */ +public class ExecutionAndSlot { + + public final Execution executionAttempt; + + public final Future<SimpleSlot> slotFuture; + + public ExecutionAndSlot(Execution executionAttempt, Future<SimpleSlot> slotFuture) { + this.executionAttempt = checkNotNull(executionAttempt); + this.slotFuture = checkNotNull(slotFuture); + } + + // ----------------------------------------------------------------------- + + @Override + public String toString() { + return super.toString(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f113d794/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index f25120c..ad4347d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.executiongraph; import org.apache.commons.lang3.StringUtils; + import org.apache.flink.api.common.Archiveable; import org.apache.flink.api.common.ArchivedExecutionConfig; import org.apache.flink.api.common.ExecutionConfig; @@ -40,9 +41,14 @@ import org.apache.flink.runtime.checkpoint.CheckpointIDCounter; import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot; import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker; import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore; +import org.apache.flink.runtime.concurrent.BiFunction; +import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.concurrent.FutureUtils.ConjunctFuture; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.execution.SuppressRestartsException; import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; +import org.apache.flink.runtime.instance.SimpleSlot; import org.apache.flink.runtime.instance.SlotProvider; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; @@ -53,6 +59,7 @@ import org.apache.flink.runtime.jobgraph.ScheduleMode; import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings; import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings; import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; +import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; import org.apache.flink.runtime.query.KvStateLocationRegistry; import org.apache.flink.runtime.taskmanager.TaskExecutionState; import org.apache.flink.runtime.util.SerializableObject; @@ -60,6 +67,7 @@ import org.apache.flink.runtime.util.SerializedThrowable; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.Preconditions; import org.apache.flink.util.SerializedValue; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -77,11 +85,14 @@ import java.util.Map; import java.util.NoSuchElementException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; /** * The execution graph is the central data structure that coordinates the distributed @@ -158,7 +169,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive private final long[] stateTimestamps; /** The timeout for all messages that require a response/acknowledgement */ - private final Time timeout; + private final Time rpcCallTimeout; // ------ Configuration of the Execution ------- @@ -171,6 +182,8 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive * from results than need to be materialized. */ private ScheduleMode scheduleMode = ScheduleMode.LAZY_FROM_SOURCES; + private final Time scheduleAllocationTimeout; + // ------ Execution status and progress. These values are volatile, and accessed under the lock ------- /** Current status of the job execution */ @@ -292,7 +305,8 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive this.stateTimestamps = new long[JobStatus.values().length]; this.stateTimestamps[JobStatus.CREATED.ordinal()] = System.currentTimeMillis(); - this.timeout = timeout; + this.rpcCallTimeout = checkNotNull(timeout); + this.scheduleAllocationTimeout = checkNotNull(timeout); this.restartStrategy = restartStrategy; @@ -695,7 +709,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive // create the execution job vertex and attach it to the graph ExecutionJobVertex ejv = - new ExecutionJobVertex(this, jobVertex, 1, timeout, createTimestamp); + new ExecutionJobVertex(this, jobVertex, 1, rpcCallTimeout, createTimestamp); ejv.connectToPredecessors(this.intermediateResults); ExecutionJobVertex previousTask = this.tasks.putIfAbsent(jobVertex.getID(), ejv); @@ -717,9 +731,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive } public void scheduleForExecution(SlotProvider slotProvider) throws JobException { - if (slotProvider == null) { - throw new IllegalArgumentException("Scheduler must not be null."); - } + checkNotNull(slotProvider); if (this.slotProvider != null && this.slotProvider != slotProvider) { throw new IllegalArgumentException("Cannot use different slot providers for the same job"); @@ -731,18 +743,11 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive switch (scheduleMode) { case LAZY_FROM_SOURCES: - // simply take the vertices without inputs. - for (ExecutionJobVertex ejv : this.tasks.values()) { - if (ejv.getJobVertex().isInputVertex()) { - ejv.scheduleAll(slotProvider, allowQueuedScheduling); - } - } + scheduleLazy(slotProvider); break; case EAGER: - for (ExecutionJobVertex ejv : getVerticesTopologically()) { - ejv.scheduleAll(slotProvider, allowQueuedScheduling); - } + scheduleEager(slotProvider, scheduleAllocationTimeout); break; default: @@ -754,6 +759,139 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive } } + private void scheduleLazy(SlotProvider slotProvider) throws NoResourceAvailableException { + // simply take the vertices without inputs. + for (ExecutionJobVertex ejv : this.tasks.values()) { + if (ejv.getJobVertex().isInputVertex()) { + ejv.scheduleAll(slotProvider, allowQueuedScheduling); + } + } + } + + /** + * + * + * @param slotProvider The resource provider from which the slots are allocated + * @param timeout The maximum time that the deployment may take, before a + * TimeoutException is thrown. + */ + private void scheduleEager(SlotProvider slotProvider, final Time timeout) { + checkState(state == JobStatus.RUNNING, "job is not running currently"); + + // Important: reserve all the space we need up front. + // that way we do not have any operation that can fail between allocating the slots + // and adding them to the list. If we had a failure in between there, that would + // cause the slots to get lost + final ArrayList<ExecutionAndSlot[]> resources = new ArrayList<>(getNumberOfExecutionJobVertices()); + final boolean queued = allowQueuedScheduling; + + // we use this flag to handle failures in a 'finally' clause + // that allows us to not go through clumsy cast-and-rethrow logic + boolean successful = false; + + try { + // collecting all the slots may resize and fail in that operation without slots getting lost + final ArrayList<Future<SimpleSlot>> slotFutures = new ArrayList<>(getNumberOfExecutionJobVertices()); + + // allocate the slots (obtain all their futures + for (ExecutionJobVertex ejv : getVerticesTopologically()) { + // these calls are not blocking, they only return futures + ExecutionAndSlot[] slots = ejv.allocateResourcesForAll(slotProvider, queued); + + // we need to first add the slots to this list, to be safe on release + resources.add(slots); + + for (ExecutionAndSlot ens : slots) { + slotFutures.add(ens.slotFuture); + } + } + + // this future is complete once all slot futures are complete. + // the future fails once one slot future fails. + final ConjunctFuture allAllocationsComplete = FutureUtils.combineAll(slotFutures); + + // make sure that we fail if the allocation timeout was exceeded + final ScheduledFuture<?> timeoutCancelHandle = futureExecutor.schedule(new Runnable() { + @Override + public void run() { + // When the timeout triggers, we try to complete the conjunct future with an exception. + // Note that this is a no-op if the future is already completed + int numTotal = allAllocationsComplete.getNumFuturesTotal(); + int numComplete = allAllocationsComplete.getNumFuturesCompleted(); + String message = "Could not allocate all requires slots within timeout of " + + timeout + ". Slots required: " + numTotal + ", slots allocated: " + numComplete; + + allAllocationsComplete.completeExceptionally(new NoResourceAvailableException(message)); + } + }, timeout.getSize(), timeout.getUnit()); + + + allAllocationsComplete.handleAsync(new BiFunction<Void, Throwable, Void>() { + + @Override + public Void apply(Void ignored, Throwable throwable) { + try { + // we do not need the cancellation timeout any more + timeoutCancelHandle.cancel(false); + + if (throwable == null) { + // successfully obtained all slots, now deploy + + for (ExecutionAndSlot[] jobVertexTasks : resources) { + for (ExecutionAndSlot execAndSlot : jobVertexTasks) { + + // the futures must all be ready - this is simply a sanity check + final SimpleSlot slot; + try { + slot = execAndSlot.slotFuture.getNow(null); + checkNotNull(slot); + } + catch (ExecutionException | NullPointerException e) { + throw new IllegalStateException("SlotFuture is incomplete " + + "or erroneous even though all futures completed"); + } + + // actual deployment + execAndSlot.executionAttempt.deployToSlot(slot); + } + } + } + else { + // let the exception handler deal with this + throw throwable; + } + } + catch (Throwable t) { + // we catch everything here to make sure cleanup happens and the + // ExecutionGraph notices + // we need to go into recovery and make sure to release all slots + try { + fail(t); + } + finally { + ExecutionGraphUtils.releaseAllSlotsSilently(resources); + } + } + + // Wouldn't it be nice if we could return an actual Void object? + // return (Void) Unsafe.getUnsafe().allocateInstance(Void.class); + return null; + } + }, futureExecutor); + + // from now on, slots will be rescued by the the futures and their completion, or by the timeout + successful = true; + } + finally { + if (!successful) { + // we come here only if the 'try' block finished with an exception + // we release the slots (possibly failing some executions on the way) and + // let the exception bubble up + ExecutionGraphUtils.releaseAllSlotsSilently(resources); + } + } + } + public void cancel() { while (true) { JobStatus current = state; @@ -971,7 +1109,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive } } catch (IOException | ClassNotFoundException e) { LOG.error("Couldn't create ArchivedExecutionConfig for job {} ", getJobID(), e); - }; + } return null; } http://git-wip-us.apache.org/repos/asf/flink/blob/f113d794/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphUtils.java new file mode 100644 index 0000000..cd6d6aa --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphUtils.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.runtime.concurrent.BiFunction; +import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.instance.SimpleSlot; +import org.apache.flink.util.ExceptionUtils; + +import java.util.List; + +/** + * Utilities for dealing with the execution graphs and scheduling. + */ +public class ExecutionGraphUtils { + + /** + * Releases the slot represented by the given future. If the future is complete, the + * slot is immediately released. Otherwise, the slot is released as soon as the future + * is completed. + * + * <p>Note that releasing the slot means cancelling any task execution currently + * associated with that slot. + * + * @param slotFuture The future for the slot to release. + */ + public static void releaseSlotFuture(Future<SimpleSlot> slotFuture) { + slotFuture.handle(ReleaseSlotFunction.INSTANCE); + } + + /** + * Releases the all the slots in the list of arrays of {@code ExecutionAndSlot}. + * For each future in that collection holds: If the future is complete, its slot is + * immediately released. Otherwise, the slot is released as soon as the future + * is completed. + * + * <p>This methods never throws any exceptions (except for fatal exceptions) and continues + * to release the remaining slots if one slot release failed. + * + * <p>Note that releasing the slot means cancelling any task execution currently + * associated with that slot. + * + * @param resources The collection of ExecutionAndSlot whose slots should be released. + */ + public static void releaseAllSlotsSilently(List<ExecutionAndSlot[]> resources) { + try { + for (ExecutionAndSlot[] jobVertexResources : resources) { + if (jobVertexResources != null) { + for (ExecutionAndSlot execAndSlot : jobVertexResources) { + if (execAndSlot != null) { + try { + releaseSlotFuture(execAndSlot.slotFuture); + } + catch (Throwable t) { + ExceptionUtils.rethrowIfFatalError(t); + } + } + } + } + } + } + catch (Throwable t) { + ExceptionUtils.rethrowIfFatalError(t); + } + } + + // ------------------------------------------------------------------------ + + /** + * A function to be applied into a future, releasing the slot immediately upon completion. + * Completion here refers to both the successful and exceptional completion. + */ + private static final class ReleaseSlotFunction implements BiFunction<SimpleSlot, Throwable, Void> { + + static final ReleaseSlotFunction INSTANCE = new ReleaseSlotFunction(); + + @Override + public Void apply(SimpleSlot simpleSlot, Throwable throwable) { + if (simpleSlot != null) { + simpleSlot.releaseSlot(); + } + return null; + } + } + + // ------------------------------------------------------------------------ + + /** Utility class is not meant to be instantiated */ + private ExecutionGraphUtils() {} +} http://git-wip-us.apache.org/repos/asf/flink/blob/f113d794/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java index 3828fc9..754148e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java @@ -30,7 +30,9 @@ import org.apache.flink.core.io.InputSplitSource; import org.apache.flink.core.io.LocatableInputSplit; import org.apache.flink.runtime.JobException; import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; +import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.instance.SimpleSlot; import org.apache.flink.runtime.instance.SlotProvider; import org.apache.flink.runtime.jobgraph.IntermediateDataSet; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; @@ -388,7 +390,7 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable public void scheduleAll(SlotProvider slotProvider, boolean queued) { - ExecutionVertex[] vertices = this.taskVertices; + final ExecutionVertex[] vertices = this.taskVertices; // kick off the tasks for (ExecutionVertex ev : vertices) { @@ -396,6 +398,48 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable } } + /** + * Acquires a slot for all the execution vertices of this ExecutionJobVertex. The method returns + * pairs of the slots and execution attempts, to ease correlation between vertices and execution + * attempts. + * + * <p>If this method throws an exception, it makes sure to release all so far requested slots. + * + * @param resourceProvider The resource provider from whom the slots are requested. + */ + public ExecutionAndSlot[] allocateResourcesForAll(SlotProvider resourceProvider, boolean queued) { + final ExecutionVertex[] vertices = this.taskVertices; + final ExecutionAndSlot[] slots = new ExecutionAndSlot[vertices.length]; + + // try to acquire a slot future for each execution. + // we store the execution with the future just to be on the safe side + for (int i = 0; i < vertices.length; i++) { + + // we use this flag to handle failures in a 'finally' clause + // that allows us to not go through clumsy cast-and-rethrow logic + boolean successful = false; + + try { + // allocate the next slot (future) + final Execution exec = vertices[i].getCurrentExecutionAttempt(); + final Future<SimpleSlot> future = exec.allocateSlotForExecution(resourceProvider, queued); + slots[i] = new ExecutionAndSlot(exec, future); + successful = true; + } + finally { + if (!successful) { + // this is the case if an exception was thrown + for (int k = 0; k < i; k++) { + ExecutionGraphUtils.releaseSlotFuture(slots[k].slotFuture); + } + } + } + } + + // all good, we acquired all slots + return slots; + } + public void cancel() { for (ExecutionVertex ev : getTaskVertices()) { ev.cancel(); http://git-wip-us.apache.org/repos/asf/flink/blob/f113d794/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java index 92327fd..ca8e07c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java @@ -102,6 +102,7 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi int subTaskIndex, IntermediateResult[] producedDataSets, Time timeout) { + this( jobVertex, subTaskIndex, @@ -133,7 +134,7 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi this.taskNameWithSubtask = String.format("%s (%d/%d)", jobVertex.getJobVertex().getName(), subTaskIndex + 1, jobVertex.getParallelism()); - this.resultPartitions = new LinkedHashMap<IntermediateResultPartitionID, IntermediateResultPartition>(producedDataSets.length, 1); + this.resultPartitions = new LinkedHashMap<>(producedDataSets.length, 1); for (IntermediateResult result : producedDataSets) { IntermediateResultPartition irp = new IntermediateResultPartition(result, this, subTaskIndex); http://git-wip-us.apache.org/repos/asf/flink/blob/f113d794/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IllegalExecutionStateException.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IllegalExecutionStateException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IllegalExecutionStateException.java new file mode 100644 index 0000000..44162ab --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IllegalExecutionStateException.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.runtime.execution.ExecutionState; + +/** + * A special {@link IllegalStateException} indicating a mismatch in the expected and actual + * {@link ExecutionState} of an {@link Execution}. + */ +public class IllegalExecutionStateException extends IllegalStateException { + + private static final long serialVersionUID = 1L; + + /** + * Creates a new IllegalExecutionStateException with the error message indicating + * the expected and actual state. + * + * @param expected The expected state + * @param actual The actual state + */ + public IllegalExecutionStateException(ExecutionState expected, ExecutionState actual) { + super("Invalid execution state: Expected " + expected + " , found " + actual); + } + + /** + * Creates a new IllegalExecutionStateException with the error message indicating + * the expected and actual state. + * + * @param expected The expected state + * @param actual The actual state + */ + public IllegalExecutionStateException(Execution execution, ExecutionState expected, ExecutionState actual) { + super(execution.getVertexWithAttempt() + " is no longer in expected state " + expected + + " but in state " + actual); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f113d794/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java index 4da6c7b..8ba5040 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java @@ -1048,9 +1048,7 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> { private final long timestamp; - SlotAndTimestamp( - AllocatedSlot slot, - long timestamp) { + SlotAndTimestamp(AllocatedSlot slot, long timestamp) { this.slot = slot; this.timestamp = timestamp; } @@ -1062,5 +1060,10 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> { public long timestamp() { return timestamp; } + + @Override + public String toString() { + return slot + " @ " + timestamp; + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/f113d794/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java new file mode 100644 index 0000000..43710cb --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.concurrent; + +import org.apache.flink.runtime.concurrent.FutureUtils.ConjunctFuture; +import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; + +import org.junit.Test; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.concurrent.ExecutionException; + +import static org.junit.Assert.*; + +/** + * Tests for the utility methods in {@link FutureUtils} + */ +public class FutureUtilsTest { + + @Test + public void testConjunctFutureFailsOnEmptyAndNull() throws Exception { + try { + FutureUtils.combineAll(null); + fail(); + } catch (NullPointerException ignored) {} + + try { + FutureUtils.combineAll(Arrays.asList( + new FlinkCompletableFuture<Object>(), + null, + new FlinkCompletableFuture<Object>())); + fail(); + } catch (NullPointerException ignored) {} + } + + @Test + public void testConjunctFutureCompletion() throws Exception { + // some futures that we combine + CompletableFuture<Object> future1 = new FlinkCompletableFuture<>(); + CompletableFuture<Object> future2 = new FlinkCompletableFuture<>(); + CompletableFuture<Object> future3 = new FlinkCompletableFuture<>(); + CompletableFuture<Object> future4 = new FlinkCompletableFuture<>(); + + // some future is initially completed + future2.complete(new Object()); + + // build the conjunct future + ConjunctFuture result = FutureUtils.combineAll(Arrays.asList(future1, future2, future3, future4)); + + Future<Void> resultMapped = result.thenAccept(new AcceptFunction<Void>() { + @Override + public void accept(Void value) {} + }); + + assertEquals(4, result.getNumFuturesTotal()); + assertEquals(1, result.getNumFuturesCompleted()); + assertFalse(result.isDone()); + assertFalse(resultMapped.isDone()); + + // complete two more futures + future4.complete(new Object()); + assertEquals(2, result.getNumFuturesCompleted()); + assertFalse(result.isDone()); + assertFalse(resultMapped.isDone()); + + future1.complete(new Object()); + assertEquals(3, result.getNumFuturesCompleted()); + assertFalse(result.isDone()); + assertFalse(resultMapped.isDone()); + + // complete one future again + future1.complete(new Object()); + assertEquals(3, result.getNumFuturesCompleted()); + assertFalse(result.isDone()); + assertFalse(resultMapped.isDone()); + + // complete the final future + future3.complete(new Object()); + assertEquals(4, result.getNumFuturesCompleted()); + assertTrue(result.isDone()); + assertTrue(resultMapped.isDone()); + } + + @Test + public void testConjunctFutureFailureOnFirst() throws Exception { + + CompletableFuture<Object> future1 = new FlinkCompletableFuture<>(); + CompletableFuture<Object> future2 = new FlinkCompletableFuture<>(); + CompletableFuture<Object> future3 = new FlinkCompletableFuture<>(); + CompletableFuture<Object> future4 = new FlinkCompletableFuture<>(); + + // build the conjunct future + ConjunctFuture result = FutureUtils.combineAll(Arrays.asList(future1, future2, future3, future4)); + + Future<Void> resultMapped = result.thenAccept(new AcceptFunction<Void>() { + @Override + public void accept(Void value) {} + }); + + assertEquals(4, result.getNumFuturesTotal()); + assertEquals(0, result.getNumFuturesCompleted()); + assertFalse(result.isDone()); + assertFalse(resultMapped.isDone()); + + future2.completeExceptionally(new IOException()); + + assertEquals(0, result.getNumFuturesCompleted()); + assertTrue(result.isDone()); + assertTrue(resultMapped.isDone()); + + try { + result.get(); + fail(); + } catch (ExecutionException e) { + assertTrue(e.getCause() instanceof IOException); + } + + try { + resultMapped.get(); + fail(); + } catch (ExecutionException e) { + assertTrue(e.getCause() instanceof IOException); + } + } + + @Test + public void testConjunctFutureFailureOnSuccessive() throws Exception { + + CompletableFuture<Object> future1 = new FlinkCompletableFuture<>(); + CompletableFuture<Object> future2 = new FlinkCompletableFuture<>(); + CompletableFuture<Object> future3 = new FlinkCompletableFuture<>(); + CompletableFuture<Object> future4 = new FlinkCompletableFuture<>(); + + // build the conjunct future + ConjunctFuture result = FutureUtils.combineAll(Arrays.asList(future1, future2, future3, future4)); + assertEquals(4, result.getNumFuturesTotal()); + + Future<Void> resultMapped = result.thenAccept(new AcceptFunction<Void>() { + @Override + public void accept(Void value) {} + }); + + future1.complete(new Object()); + future3.complete(new Object()); + future4.complete(new Object()); + + future2.completeExceptionally(new IOException()); + + assertEquals(3, result.getNumFuturesCompleted()); + assertTrue(result.isDone()); + assertTrue(resultMapped.isDone()); + + try { + result.get(); + fail(); + } catch (ExecutionException e) { + assertTrue(e.getCause() instanceof IOException); + } + + try { + resultMapped.get(); + fail(); + } catch (ExecutionException e) { + assertTrue(e.getCause() instanceof IOException); + } + } + + @Test + public void testConjunctOfNone() throws Exception { + final ConjunctFuture result = FutureUtils.combineAll(Collections.<Future<Object>>emptyList()); + + assertEquals(0, result.getNumFuturesTotal()); + assertEquals(0, result.getNumFuturesCompleted()); + assertTrue(result.isDone()); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f113d794/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java new file mode 100644 index 0000000..9834dc6 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java @@ -0,0 +1,610 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; +import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; +import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; +import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; +import org.apache.flink.runtime.instance.SimpleSlot; +import org.apache.flink.runtime.instance.Slot; +import org.apache.flink.runtime.instance.SlotProvider; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.ScheduleMode; +import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit; +import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot; +import org.apache.flink.runtime.jobmanager.slots.SlotOwner; +import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.runtime.testtasks.NoOpInvokable; +import org.apache.flink.util.TestLogger; + +import org.junit.After; +import org.junit.Test; + +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.mockito.verification.Timeout; + +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + +/** + * Tests for the scheduling of the execution graph. This tests that + * for example the order of deployments is correct and that bulk slot allocation + * works properly. + */ +public class ExecutionGraphSchedulingTest extends TestLogger { + + private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); + + @After + public void shutdown() { + executor.shutdownNow(); + } + + // ------------------------------------------------------------------------ + // Tests + // ------------------------------------------------------------------------ + + + /** + * Tests that with scheduling futures and pipelined deployment, the target vertex will + * not deploy its task before the source vertex does. + */ + @Test + public void testScheduleSourceBeforeTarget() throws Exception { + + // [pipelined] + // we construct a simple graph (source) ----------------> (target) + + final int parallelism = 1; + + final JobVertex sourceVertex = new JobVertex("source"); + sourceVertex.setParallelism(parallelism); + sourceVertex.setInvokableClass(NoOpInvokable.class); + + final JobVertex targetVertex = new JobVertex("target"); + targetVertex.setParallelism(parallelism); + targetVertex.setInvokableClass(NoOpInvokable.class); + + targetVertex.connectNewDataSetAsInput(sourceVertex, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED); + + final JobID jobId = new JobID(); + final JobGraph jobGraph = new JobGraph(jobId, "test", sourceVertex, targetVertex); + + final ExecutionGraph eg = createExecutionGraph(jobGraph); + + // + // set up two TaskManager gateways and slots + + final TaskManagerGateway gatewaySource = createTaskManager(); + final TaskManagerGateway gatewayTarget = createTaskManager(); + + final SimpleSlot sourceSlot = createSlot(gatewaySource, jobId); + final SimpleSlot targetSlot = createSlot(gatewayTarget, jobId); + + final FlinkCompletableFuture<SimpleSlot> sourceFuture = new FlinkCompletableFuture<>(); + final FlinkCompletableFuture<SimpleSlot> targetFuture = new FlinkCompletableFuture<>(); + + ProgrammedSlotProvider slotProvider = new ProgrammedSlotProvider(parallelism); + slotProvider.addSlot(sourceVertex.getID(), 0, sourceFuture); + slotProvider.addSlot(targetVertex.getID(), 0, targetFuture); + + eg.setScheduleMode(ScheduleMode.EAGER); + eg.setQueuedSchedulingAllowed(true); + eg.scheduleForExecution(slotProvider); + + // job should be running + assertEquals(JobStatus.RUNNING, eg.getState()); + + // we fulfill the target slot before the source slot + // that should not cause a deployment or deployment related failure + targetFuture.complete(targetSlot); + + verify(gatewayTarget, new Timeout(50, times(0))).submitTask(any(TaskDeploymentDescriptor.class), any(Time.class)); + assertEquals(JobStatus.RUNNING, eg.getState()); + + // now supply the source slot + sourceFuture.complete(sourceSlot); + + // by now, all deployments should have happened + verify(gatewaySource, timeout(1000)).submitTask(any(TaskDeploymentDescriptor.class), any(Time.class)); + verify(gatewayTarget, timeout(1000)).submitTask(any(TaskDeploymentDescriptor.class), any(Time.class)); + + assertEquals(JobStatus.RUNNING, eg.getState()); + } + + /** + * This test verifies that before deploying a pipelined connected component, the + * full set of slots is available, and that not some tasks are deployed, and later the + * system realizes that not enough resources are available. + */ + @Test + public void testDeployPipelinedConnectedComponentsTogether() throws Exception { + + // [pipelined] + // we construct a simple graph (source) ----------------> (target) + + final int parallelism = 8; + + final JobVertex sourceVertex = new JobVertex("source"); + sourceVertex.setParallelism(parallelism); + sourceVertex.setInvokableClass(NoOpInvokable.class); + + final JobVertex targetVertex = new JobVertex("target"); + targetVertex.setParallelism(parallelism); + targetVertex.setInvokableClass(NoOpInvokable.class); + + targetVertex.connectNewDataSetAsInput(sourceVertex, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED); + + final JobID jobId = new JobID(); + final JobGraph jobGraph = new JobGraph(jobId, "test", sourceVertex, targetVertex); + + final ExecutionGraph eg = createExecutionGraph(jobGraph); + + // + // Create the slots, futures, and the slot provider + + final TaskManagerGateway[] sourceTaskManagers = new TaskManagerGateway[parallelism]; + final TaskManagerGateway[] targetTaskManagers = new TaskManagerGateway[parallelism]; + + final SimpleSlot[] sourceSlots = new SimpleSlot[parallelism]; + final SimpleSlot[] targetSlots = new SimpleSlot[parallelism]; + + @SuppressWarnings({"unchecked", "rawtypes"}) + final FlinkCompletableFuture<SimpleSlot>[] sourceFutures = new FlinkCompletableFuture[parallelism]; + @SuppressWarnings({"unchecked", "rawtypes"}) + final FlinkCompletableFuture<SimpleSlot>[] targetFutures = new FlinkCompletableFuture[parallelism]; + + for (int i = 0; i < parallelism; i++) { + sourceTaskManagers[i] = createTaskManager(); + targetTaskManagers[i] = createTaskManager(); + + sourceSlots[i] = createSlot(sourceTaskManagers[i], jobId); + targetSlots[i] = createSlot(targetTaskManagers[i], jobId); + + sourceFutures[i] = new FlinkCompletableFuture<>(); + targetFutures[i] = new FlinkCompletableFuture<>(); + } + + ProgrammedSlotProvider slotProvider = new ProgrammedSlotProvider(parallelism); + slotProvider.addSlots(sourceVertex.getID(), sourceFutures); + slotProvider.addSlots(targetVertex.getID(), targetFutures); + + // + // we complete some of the futures + + for (int i = 0; i < parallelism; i += 2) { + sourceFutures[i].complete(sourceSlots[i]); + } + + // + // kick off the scheduling + + eg.setScheduleMode(ScheduleMode.EAGER); + eg.setQueuedSchedulingAllowed(true); + eg.scheduleForExecution(slotProvider); + + verifyNothingDeployed(eg, sourceTaskManagers); + + // complete the remaining sources + for (int i = 1; i < parallelism; i += 2) { + sourceFutures[i].complete(sourceSlots[i]); + } + verifyNothingDeployed(eg, sourceTaskManagers); + + // complete the targets except for one + for (int i = 1; i < parallelism; i++) { + targetFutures[i].complete(targetSlots[i]); + } + verifyNothingDeployed(eg, targetTaskManagers); + + // complete the last target slot future + targetFutures[0].complete(targetSlots[0]); + + // + // verify that all deployments have happened + + for (TaskManagerGateway gateway : sourceTaskManagers) { + verify(gateway, timeout(50)).submitTask(any(TaskDeploymentDescriptor.class), any(Time.class)); + } + for (TaskManagerGateway gateway : targetTaskManagers) { + verify(gateway, timeout(50)).submitTask(any(TaskDeploymentDescriptor.class), any(Time.class)); + } + } + + /** + * This test verifies that if one slot future fails, the deployment will be aborted. + */ + @Test + public void testOneSlotFailureAbortsDeploy() throws Exception { + + // [pipelined] + // we construct a simple graph (source) ----------------> (target) + + final int parallelism = 6; + + final JobVertex sourceVertex = new JobVertex("source"); + sourceVertex.setParallelism(parallelism); + sourceVertex.setInvokableClass(NoOpInvokable.class); + + final JobVertex targetVertex = new JobVertex("target"); + targetVertex.setParallelism(parallelism); + targetVertex.setInvokableClass(NoOpInvokable.class); + + targetVertex.connectNewDataSetAsInput(sourceVertex, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED); + + final JobID jobId = new JobID(); + final JobGraph jobGraph = new JobGraph(jobId, "test", sourceVertex, targetVertex); + + final ExecutionGraph eg = createExecutionGraph(jobGraph); + TerminalJobStatusListener testListener = new TerminalJobStatusListener(); + eg.registerJobStatusListener(testListener); + + // + // Create the slots, futures, and the slot provider + + final TaskManagerGateway taskManager = mock(TaskManagerGateway.class); + final SlotOwner slotOwner = mock(SlotOwner.class); + + final SimpleSlot[] sourceSlots = new SimpleSlot[parallelism]; + final SimpleSlot[] targetSlots = new SimpleSlot[parallelism]; + + @SuppressWarnings({"unchecked", "rawtypes"}) + final FlinkCompletableFuture<SimpleSlot>[] sourceFutures = new FlinkCompletableFuture[parallelism]; + @SuppressWarnings({"unchecked", "rawtypes"}) + final FlinkCompletableFuture<SimpleSlot>[] targetFutures = new FlinkCompletableFuture[parallelism]; + + for (int i = 0; i < parallelism; i++) { + sourceSlots[i] = createSlot(taskManager, jobId, slotOwner); + targetSlots[i] = createSlot(taskManager, jobId, slotOwner); + + sourceFutures[i] = new FlinkCompletableFuture<>(); + targetFutures[i] = new FlinkCompletableFuture<>(); + } + + ProgrammedSlotProvider slotProvider = new ProgrammedSlotProvider(parallelism); + slotProvider.addSlots(sourceVertex.getID(), sourceFutures); + slotProvider.addSlots(targetVertex.getID(), targetFutures); + + // + // we complete some of the futures + + for (int i = 0; i < parallelism; i += 2) { + sourceFutures[i].complete(sourceSlots[i]); + targetFutures[i + 1].complete(targetSlots[i + 1]); + } + + // + // kick off the scheduling + + eg.setScheduleMode(ScheduleMode.EAGER); + eg.setQueuedSchedulingAllowed(true); + eg.scheduleForExecution(slotProvider); + + // fail one slot + sourceFutures[1].completeExceptionally(new TestRuntimeException()); + + // wait until the job failed as a whole + testListener.waitForTerminalState(2000); + + // wait until all slots are back + verify(slotOwner, new Timeout(2000, times(6))).returnAllocatedSlot(any(Slot.class)); + + // no deployment calls must have happened + verify(taskManager, times(0)).submitTask(any(TaskDeploymentDescriptor.class), any(Time.class)); + + // all completed futures must have been returns + for (int i = 0; i < parallelism; i += 2) { + assertTrue(sourceSlots[i].isCanceled()); + assertTrue(targetSlots[i + 1].isCanceled()); + } + } + + /** + * This test verifies that the slot allocations times out after a certain time, and that + * all slots are released in that case. + */ + @Test + public void testTimeoutForSlotAllocation() throws Exception { + + // we construct a simple graph: (task) + + final int parallelism = 3; + + final JobVertex vertex = new JobVertex("task"); + vertex.setParallelism(parallelism); + vertex.setInvokableClass(NoOpInvokable.class); + + final JobID jobId = new JobID(); + final JobGraph jobGraph = new JobGraph(jobId, "test", vertex); + + final ExecutionGraph eg = createExecutionGraph(jobGraph, Time.milliseconds(20)); + final TerminalJobStatusListener statusListener = new TerminalJobStatusListener(); + eg.registerJobStatusListener(statusListener); + + final SlotOwner slotOwner = mock(SlotOwner.class); + + final TaskManagerGateway taskManager = mock(TaskManagerGateway.class); + final SimpleSlot[] slots = new SimpleSlot[parallelism]; + @SuppressWarnings({"unchecked", "rawtypes"}) + final FlinkCompletableFuture<SimpleSlot>[] slotFutures = new FlinkCompletableFuture[parallelism]; + + for (int i = 0; i < parallelism; i++) { + slots[i] = createSlot(taskManager, jobId, slotOwner); + slotFutures[i] = new FlinkCompletableFuture<>(); + } + + ProgrammedSlotProvider slotProvider = new ProgrammedSlotProvider(parallelism); + slotProvider.addSlots(vertex.getID(), slotFutures); + + // we complete one future + slotFutures[1].complete(slots[1]); + + // kick off the scheduling + + eg.setScheduleMode(ScheduleMode.EAGER); + eg.setQueuedSchedulingAllowed(true); + eg.scheduleForExecution(slotProvider); + + // we complete another future + slotFutures[2].complete(slots[2]); + + // since future[0] is still missing the while operation must time out + // we have no restarts allowed, so the job will go terminal + statusListener.waitForTerminalState(2000); + + // wait until all slots are back + verify(slotOwner, new Timeout(2000, times(2))).returnAllocatedSlot(any(Slot.class)); + + // verify that no deployments have happened + verify(taskManager, times(0)).submitTask(any(TaskDeploymentDescriptor.class), any(Time.class)); + + for (Future<SimpleSlot> future : slotFutures) { + if (future.isDone()) { + assertTrue(future.get().isCanceled()); + } + } + } + + /** + * Tests that the {@link ExecutionJobVertex#allocateResourcesForAll(SlotProvider, boolean)} method + * releases partially acquired resources upon exception. + */ + @Test + public void testExecutionJobVertexAllocateResourcesReleasesOnException() throws Exception { + final int parallelism = 8; + + final JobVertex vertex = new JobVertex("vertex"); + vertex.setParallelism(parallelism); + vertex.setInvokableClass(NoOpInvokable.class); + + final JobID jobId = new JobID(); + final JobGraph jobGraph = new JobGraph(jobId, "test", vertex); + + final ExecutionGraph eg = createExecutionGraph(jobGraph); + final ExecutionJobVertex ejv = eg.getJobVertex(vertex.getID()); + + // set up some available slots and some slot owner that accepts released slots back + final List<SimpleSlot> returnedSlots = new ArrayList<>(); + final SlotOwner recycler = new SlotOwner() { + @Override + public boolean returnAllocatedSlot(Slot slot) { + returnedSlots.add((SimpleSlot) slot); + return true; + } + }; + + final TaskManagerGateway taskManager = mock(TaskManagerGateway.class); + final List<SimpleSlot> availableSlots = new ArrayList<>(Arrays.asList( + createSlot(taskManager, jobId, recycler), + createSlot(taskManager, jobId, recycler), + createSlot(taskManager, jobId, recycler))); + + + // slot provider that hand out parallelism / 3 slots, then throws an exception + final SlotProvider slots = mock(SlotProvider.class); + + when(slots.allocateSlot(any(ScheduledUnit.class), anyBoolean())).then( + new Answer<Future<SimpleSlot>>() { + + @Override + public Future<SimpleSlot> answer(InvocationOnMock invocation) { + if (availableSlots.isEmpty()) { + throw new TestRuntimeException(); + } else { + return FlinkCompletableFuture.completed(availableSlots.remove(0)); + } + } + }); + + // acquire resources and check that all are back after the failure + + final int numSlotsToExpectBack = availableSlots.size(); + + try { + ejv.allocateResourcesForAll(slots, false); + fail("should have failed with an exception"); + } + catch (TestRuntimeException e) { + // expected + } + + assertEquals(numSlotsToExpectBack, returnedSlots.size()); + } + + /** + * Tests that the {@link ExecutionGraph#scheduleForExecution(SlotProvider)} method + * releases partially acquired resources upon exception. + */ + @Test + public void testExecutionGraphScheduleReleasesResourcesOnException() throws Exception { + + // [pipelined] + // we construct a simple graph (source) ----------------> (target) + + final int parallelism = 3; + + final JobVertex sourceVertex = new JobVertex("source"); + sourceVertex.setParallelism(parallelism); + sourceVertex.setInvokableClass(NoOpInvokable.class); + + final JobVertex targetVertex = new JobVertex("target"); + targetVertex.setParallelism(parallelism); + targetVertex.setInvokableClass(NoOpInvokable.class); + + targetVertex.connectNewDataSetAsInput(sourceVertex, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED); + + final JobID jobId = new JobID(); + final JobGraph jobGraph = new JobGraph(jobId, "test", sourceVertex, targetVertex); + + final ExecutionGraph eg = createExecutionGraph(jobGraph); + + // set up some available slots and some slot owner that accepts released slots back + final List<SimpleSlot> returnedSlots = new ArrayList<>(); + final SlotOwner recycler = new SlotOwner() { + @Override + public boolean returnAllocatedSlot(Slot slot) { + returnedSlots.add((SimpleSlot) slot); + return true; + } + }; + + final TaskManagerGateway taskManager = mock(TaskManagerGateway.class); + final List<SimpleSlot> availableSlots = new ArrayList<>(Arrays.asList( + createSlot(taskManager, jobId, recycler), + createSlot(taskManager, jobId, recycler), + createSlot(taskManager, jobId, recycler), + createSlot(taskManager, jobId, recycler), + createSlot(taskManager, jobId, recycler))); + + + // slot provider that hand out parallelism / 3 slots, then throws an exception + final SlotProvider slots = mock(SlotProvider.class); + + when(slots.allocateSlot(any(ScheduledUnit.class), anyBoolean())).then( + new Answer<Future<SimpleSlot>>() { + + @Override + public Future<SimpleSlot> answer(InvocationOnMock invocation) { + if (availableSlots.isEmpty()) { + throw new TestRuntimeException(); + } else { + return FlinkCompletableFuture.completed(availableSlots.remove(0)); + } + } + }); + + // acquire resources and check that all are back after the failure + + final int numSlotsToExpectBack = availableSlots.size(); + + try { + eg.setScheduleMode(ScheduleMode.EAGER); + eg.scheduleForExecution(slots); + fail("should have failed with an exception"); + } + catch (TestRuntimeException e) { + // expected + } + + assertEquals(numSlotsToExpectBack, returnedSlots.size()); + } + + // ------------------------------------------------------------------------ + // Utilities + // ------------------------------------------------------------------------ + + private ExecutionGraph createExecutionGraph(JobGraph jobGraph) throws Exception { + return createExecutionGraph(jobGraph, Time.minutes(10)); + } + + private ExecutionGraph createExecutionGraph(JobGraph jobGraph, Time timeout) throws Exception { + return ExecutionGraphBuilder.buildGraph( + null, + jobGraph, + new Configuration(), + executor, + executor, + getClass().getClassLoader(), + new StandaloneCheckpointRecoveryFactory(), + timeout, + new NoRestartStrategy(), + new UnregisteredMetricsGroup(), + 1, + log); + } + + private SimpleSlot createSlot(TaskManagerGateway taskManager, JobID jobId) { + return createSlot(taskManager, jobId, mock(SlotOwner.class)); + } + + private SimpleSlot createSlot(TaskManagerGateway taskManager, JobID jobId, SlotOwner slotOwner) { + TaskManagerLocation location = new TaskManagerLocation( + ResourceID.generate(), InetAddress.getLoopbackAddress(), 12345); + + AllocatedSlot slot = new AllocatedSlot( + new AllocationID(), jobId, location, 0, ResourceProfile.UNKNOWN, taskManager); + + return new SimpleSlot(slot, slotOwner, 0); + } + + private static TaskManagerGateway createTaskManager() { + TaskManagerGateway tm = mock(TaskManagerGateway.class); + when(tm.submitTask(any(TaskDeploymentDescriptor.class), any(Time.class))) + .thenReturn(FlinkCompletableFuture.completed(Acknowledge.get())); + + return tm; + } + + private static void verifyNothingDeployed(ExecutionGraph eg, TaskManagerGateway[] taskManagers) { + // job should still be running + assertEquals(JobStatus.RUNNING, eg.getState()); + + // none of the TaskManager should have gotten a deployment call, yet + for (TaskManagerGateway gateway : taskManagers) { + verify(gateway, new Timeout(50, times(0))).submitTask(any(TaskDeploymentDescriptor.class), any(Time.class)); + } + } + + private static class TestRuntimeException extends RuntimeException { + private static final long serialVersionUID = 1L; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f113d794/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphUtilsTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphUtilsTest.java new file mode 100644 index 0000000..2e6da98 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphUtilsTest.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; +import org.apache.flink.runtime.instance.SimpleSlot; +import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot; +import org.apache.flink.runtime.jobmanager.slots.SlotOwner; +import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; + +import org.junit.Test; + +import java.net.InetAddress; +import java.util.Arrays; +import java.util.List; + +import static org.mockito.Mockito.*; + +/** + * Tests for the utility methods in the class {@link ExecutionGraphUtils}. + */ +public class ExecutionGraphUtilsTest { + + @Test + public void testReleaseSlots() { + final JobID jid = new JobID(); + final SlotOwner owner = mock(SlotOwner.class); + + final SimpleSlot slot1 = new SimpleSlot(createAllocatedSlot(jid, 0), owner, 0); + final SimpleSlot slot2 = new SimpleSlot(createAllocatedSlot(jid, 1), owner, 1); + final SimpleSlot slot3 = new SimpleSlot(createAllocatedSlot(jid, 2), owner, 2); + + final FlinkCompletableFuture<SimpleSlot> incompleteFuture = new FlinkCompletableFuture<>(); + + final FlinkCompletableFuture<SimpleSlot> completeFuture = new FlinkCompletableFuture<>(); + completeFuture.complete(slot2); + + final FlinkCompletableFuture<SimpleSlot> disposedSlotFuture = new FlinkCompletableFuture<>(); + slot3.releaseSlot(); + disposedSlotFuture.complete(slot3); + + // release all futures + ExecutionGraphUtils.releaseSlotFuture(incompleteFuture); + ExecutionGraphUtils.releaseSlotFuture(completeFuture); + ExecutionGraphUtils.releaseSlotFuture(disposedSlotFuture); + + // only now complete the incomplete future + incompleteFuture.complete(slot1); + + // verify that each slot was returned once to the owner + verify(owner, times(1)).returnAllocatedSlot(eq(slot1)); + verify(owner, times(1)).returnAllocatedSlot(eq(slot2)); + verify(owner, times(1)).returnAllocatedSlot(eq(slot3)); + } + + @Test + public void testReleaseSlotsWithNulls() { + final JobID jid = new JobID(); + final SlotOwner owner = mock(SlotOwner.class); + + final Execution mockExecution = mock(Execution.class); + + final SimpleSlot slot1 = new SimpleSlot(createAllocatedSlot(jid, 0), owner, 0); + final SimpleSlot slot2 = new SimpleSlot(createAllocatedSlot(jid, 1), owner, 1); + final SimpleSlot slot3 = new SimpleSlot(createAllocatedSlot(jid, 2), owner, 2); + final SimpleSlot slot4 = new SimpleSlot(createAllocatedSlot(jid, 3), owner, 3); + final SimpleSlot slot5 = new SimpleSlot(createAllocatedSlot(jid, 4), owner, 4); + + ExecutionAndSlot[] slots1 = new ExecutionAndSlot[] { + null, + new ExecutionAndSlot(mockExecution, FlinkCompletableFuture.completed(slot1)), + null, + new ExecutionAndSlot(mockExecution, FlinkCompletableFuture.completed(slot2)), + null + }; + + ExecutionAndSlot[] slots2 = new ExecutionAndSlot[] { + new ExecutionAndSlot(mockExecution, FlinkCompletableFuture.completed(slot3)), + new ExecutionAndSlot(mockExecution, FlinkCompletableFuture.completed(slot4)), + new ExecutionAndSlot(mockExecution, FlinkCompletableFuture.completed(slot5)) + }; + + List<ExecutionAndSlot[]> resources = Arrays.asList(null, slots1, new ExecutionAndSlot[0], null, slots2); + + ExecutionGraphUtils.releaseAllSlotsSilently(resources); + + verify(owner, times(1)).returnAllocatedSlot(eq(slot1)); + verify(owner, times(1)).returnAllocatedSlot(eq(slot2)); + verify(owner, times(1)).returnAllocatedSlot(eq(slot3)); + verify(owner, times(1)).returnAllocatedSlot(eq(slot4)); + verify(owner, times(1)).returnAllocatedSlot(eq(slot5)); + } + + // ------------------------------------------------------------------------ + + private static AllocatedSlot createAllocatedSlot(JobID jid, int num) { + TaskManagerLocation loc = new TaskManagerLocation( + ResourceID.generate(), InetAddress.getLoopbackAddress(), 10000 + num); + + return new AllocatedSlot(new AllocationID(), jid, loc, num, + ResourceProfile.UNKNOWN, mock(TaskManagerGateway.class)); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f113d794/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java index 7b6c6ea..82561b2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java @@ -457,7 +457,7 @@ public class ExecutionVertexCancelTest { assertEquals(ExecutionState.CANCELED, vertex.getExecutionState()); // 1) - // scheduling after being created should be tolerated (no exception) because + // scheduling after being canceled should be tolerated (no exception) because // it can occur as the result of races { Scheduler scheduler = mock(Scheduler.class); http://git-wip-us.apache.org/repos/asf/flink/blob/f113d794/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java index 9132aee..1b029e8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java @@ -104,9 +104,6 @@ public class ExecutionVertexSchedulingTest { future.complete(slot); - // wait a second for future's future action be executed - Thread.sleep(1000); - // will have failed assertEquals(ExecutionState.FAILED, vertex.getExecutionState()); } http://git-wip-us.apache.org/repos/asf/flink/blob/f113d794/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java index 3a7e759..006f894 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java @@ -348,9 +348,9 @@ public class PointwisePatternTest { timesUsed[inEdges[0].getSource().getPartitionNumber()]++; } - - for (int i = 0; i < timesUsed.length; i++) { - assertTrue(timesUsed[i] >= factor && timesUsed[i] <= factor + delta); + + for (int used : timesUsed) { + assertTrue(used >= factor && used <= factor + delta); } } @@ -406,9 +406,9 @@ public class PointwisePatternTest { timesUsed[ee.getSource().getPartitionNumber()]++; } } - - for (int i = 0; i < timesUsed.length; i++) { - assertEquals(1, timesUsed[i]); + + for (int used : timesUsed) { + assertEquals(1, used); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/f113d794/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ProgrammedSlotProvider.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ProgrammedSlotProvider.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ProgrammedSlotProvider.java new file mode 100644 index 0000000..3acb2eb --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ProgrammedSlotProvider.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.instance.SimpleSlot; +import org.apache.flink.runtime.instance.SlotProvider; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit; + +import java.util.HashMap; +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A slot provider where one can pre-set the slot futures for tasks based on + * vertex ID and subtask index. + */ +class ProgrammedSlotProvider implements SlotProvider { + + private final Map<JobVertexID, Future<SimpleSlot>[]> slotFutures = new HashMap<>(); + + private final int parallelism; + + public ProgrammedSlotProvider(int parallelism) { + checkArgument(parallelism > 0); + this.parallelism = parallelism; + } + + public void addSlot(JobVertexID vertex, int subtaskIndex, Future<SimpleSlot> future) { + checkNotNull(vertex); + checkNotNull(future); + checkArgument(subtaskIndex >= 0 && subtaskIndex < parallelism); + + Future<SimpleSlot>[] futures = slotFutures.get(vertex); + if (futures == null) { + @SuppressWarnings("unchecked") + Future<SimpleSlot>[] newArray = (Future<SimpleSlot>[]) new Future<?>[parallelism]; + futures = newArray; + slotFutures.put(vertex, futures); + } + + futures[subtaskIndex] = future; + } + + public void addSlots(JobVertexID vertex, Future<SimpleSlot>[] futures) { + checkNotNull(vertex); + checkNotNull(futures); + checkArgument(futures.length == parallelism); + + slotFutures.put(vertex, futures); + } + + @Override + public Future<SimpleSlot> allocateSlot(ScheduledUnit task, boolean allowQueued) { + JobVertexID vertexId = task.getTaskToExecute().getVertex().getJobvertexId(); + int subtask = task.getTaskToExecute().getParallelSubtaskIndex(); + + Future<SimpleSlot>[] forTask = slotFutures.get(vertexId); + if (forTask != null) { + Future<SimpleSlot> future = forTask[subtask]; + if (future != null) { + return future; + } + } + + throw new IllegalArgumentException("No registered slot future for task " + vertexId + " (" + subtask + ')'); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f113d794/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalJobStatusListener.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalJobStatusListener.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalJobStatusListener.java new file mode 100644 index 0000000..c107d54 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalJobStatusListener.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.jobgraph.JobStatus; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * A job status listener that waits lets one block until the job is in a terminal state. + */ +public class TerminalJobStatusListener implements JobStatusListener { + + private final OneShotLatch terminalStateLatch = new OneShotLatch(); + + public void waitForTerminalState(long timeoutMillis) throws InterruptedException, TimeoutException { + terminalStateLatch.await(timeoutMillis, TimeUnit.MILLISECONDS); + } + + @Override + public void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long timestamp, Throwable error) { + if (newJobStatus.isGloballyTerminalState() || newJobStatus == JobStatus.SUSPENDED) { + terminalStateLatch.trigger(); + } + } +}