[FLINK-5747] [distributed coordination] Eager scheduling allocates slots and 
deploys tasks in bulk

That way, strictly topological deployment can be guaranteed.

Also, many quick deploy/not-enough-resources/fail/recover cycles can be
avoided in the cases where resources need some time to appear.

This closes #3295


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f113d794
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f113d794
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f113d794

Branch: refs/heads/master
Commit: f113d79451ba88c487358861cc3e20aac3d19257
Parents: 5902ea0
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Feb 3 20:26:23 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Feb 20 01:01:24 2017 +0100

----------------------------------------------------------------------
 .../org/apache/flink/util/ExceptionUtils.java   |  12 +
 .../apache/flink/util/ExceptionUtilsTest.java   |  60 ++
 .../flink/runtime/concurrent/Executors.java     |   3 +-
 .../flink/runtime/concurrent/FutureUtils.java   | 115 ++++
 .../flink/runtime/executiongraph/Execution.java |  73 ++-
 .../executiongraph/ExecutionAndSlot.java        |  46 ++
 .../runtime/executiongraph/ExecutionGraph.java  | 170 +++++-
 .../executiongraph/ExecutionGraphUtils.java     | 106 ++++
 .../executiongraph/ExecutionJobVertex.java      |  46 +-
 .../runtime/executiongraph/ExecutionVertex.java |   3 +-
 .../IllegalExecutionStateException.java         |  53 ++
 .../apache/flink/runtime/instance/SlotPool.java |   9 +-
 .../runtime/concurrent/FutureUtilsTest.java     | 194 ++++++
 .../ExecutionGraphSchedulingTest.java           | 610 +++++++++++++++++++
 .../executiongraph/ExecutionGraphUtilsTest.java | 124 ++++
 .../ExecutionVertexCancelTest.java              |   2 +-
 .../ExecutionVertexSchedulingTest.java          |   3 -
 .../executiongraph/PointwisePatternTest.java    |  12 +-
 .../executiongraph/ProgrammedSlotProvider.java  |  87 +++
 .../TerminalJobStatusListener.java              |  45 ++
 .../LeaderChangeJobRecoveryTest.java            |  23 +-
 .../runtime/minicluster/MiniClusterITCase.java  |  28 +-
 .../Flip6LocalStreamEnvironment.java            |   4 +-
 23 files changed, 1735 insertions(+), 93 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f113d794/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java 
b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
index 6ba9ef6..69c2692 100644
--- a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
@@ -101,6 +101,18 @@ public final class ExceptionUtils {
        }
 
        /**
+        * Rethrows the given {@code Throwable}, if it represents an error that 
is fatal to the JVM.
+        * See {@link ExceptionUtils#isJvmFatalError(Throwable)} for a 
definition of fatal errors.
+        * 
+        * @param t The Throwable to check and rethrow.
+        */
+       public static void rethrowIfFatalError(Throwable t) {
+               if (isJvmFatalError(t)) {
+                       throw (Error) t;
+               }
+       }
+
+       /**
         * Adds a new exception as a {@link Throwable#addSuppressed(Throwable) 
suppressed exception}
         * to a prior exception, or returns the new exception, if no prior 
exception exists.
         *

http://git-wip-us.apache.org/repos/asf/flink/blob/f113d794/flink-core/src/test/java/org/apache/flink/util/ExceptionUtilsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/util/ExceptionUtilsTest.java 
b/flink-core/src/test/java/org/apache/flink/util/ExceptionUtilsTest.java
new file mode 100644
index 0000000..343b9d6
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/util/ExceptionUtilsTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+/**
+ * Tests for the utility methods in {@link ExceptionUtils}.
+ */
+public class ExceptionUtilsTest {
+
+       @Test
+       public void testStringifyNullException() {
+               assertNotNull(ExceptionUtils.STRINGIFIED_NULL_EXCEPTION);
+               assertEquals(ExceptionUtils.STRINGIFIED_NULL_EXCEPTION, 
ExceptionUtils.stringifyException(null));
+       }
+
+       @Test
+       public void testJvmFatalError() {
+               // not all errors are fatal
+               assertFalse(ExceptionUtils.isJvmFatalError(new Error()));
+
+               // linkage errors are not fatal
+               assertFalse(ExceptionUtils.isJvmFatalError(new LinkageError()));
+
+               // some errors are fatal
+               assertTrue(ExceptionUtils.isJvmFatalError(new InternalError()));
+               assertTrue(ExceptionUtils.isJvmFatalError(new UnknownError()));
+       }
+
+       @Test
+       public void testRethrowFatalError() {
+               // fatal error is rethrown
+               try {
+                       ExceptionUtils.rethrowIfFatalError(new InternalError());
+                       fail();
+               } catch (InternalError ignored) {}
+
+               // non-fatal error is not rethrown
+               ExceptionUtils.rethrowIfFatalError(new NoClassDefFoundError());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f113d794/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java
index 391f233..63b6a25 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.concurrent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nonnull;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -52,7 +53,7 @@ public class Executors {
                private DirectExecutor() {}
 
                @Override
-               public void execute(Runnable command) {
+               public void execute(@Nonnull Runnable command) {
                        command.run();
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/f113d794/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
index a404c98..4948147 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
@@ -20,11 +20,22 @@ package org.apache.flink.runtime.concurrent;
 
 import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 
+import java.util.Collection;
 import java.util.concurrent.Callable;
 import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicInteger;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A collection of utilities that expand the usage of {@link Future} and 
{@link CompletableFuture}.
+ */
 public class FutureUtils {
 
+       // 
------------------------------------------------------------------------
+       //  retrying operations
+       // 
------------------------------------------------------------------------
+
        /**
         * Retry the given operation the given number of times in case of a 
failure.
         *
@@ -88,4 +99,108 @@ public class FutureUtils {
                        super(cause);
                }
        }
+
+       // 
------------------------------------------------------------------------
+       //  composing futures
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Creates a future that is complete once multiple other futures 
completed. 
+        * The ConjunctFuture fails (completes exceptionally) once one of the 
Futures in the
+        * conjunction fails.
+        *
+        * <p>The ConjunctFuture gives access to how many Futures in the 
conjunction have already
+        * completed successfully, via {@link 
ConjunctFuture#getNumFuturesCompleted()}. 
+        * 
+        * @param futures The futures that make up the conjunction. No null 
entries are allowed.
+        * @return The ConjunctFuture that completes once all given futures are 
complete (or one fails).
+        */
+       public static ConjunctFuture combineAll(Collection<? extends Future<?>> 
futures) {
+               checkNotNull(futures, "futures");
+
+               final ConjunctFutureImpl conjunct = new 
ConjunctFutureImpl(futures.size());
+
+               if (futures.isEmpty()) {
+                       conjunct.complete(null);
+               }
+               else {
+                       for (Future<?> future : futures) {
+                               future.handle(conjunct.completionHandler);
+                       }
+               }
+
+               return conjunct;
+       }
+
+       /**
+        * A future that is complete once multiple other futures completed. The 
futures are not
+        * necessarily of the same type, which is why the type of this Future 
is {@code Void}.
+        * The ConjunctFuture fails (completes exceptionally) once one of the 
Futures in the
+        * conjunction fails.
+        * 
+        * <p>The advantage of using the ConjunctFuture over chaining all the 
futures (such as via
+        * {@link Future#thenCombine(Future, BiFunction)}) is that 
ConjunctFuture also tracks how
+        * many of the Futures are already complete.
+        */
+       public interface ConjunctFuture extends CompletableFuture<Void> {
+
+               /**
+                * Gets the total number of Futures in the conjunction.
+                * @return The total number of Futures in the conjunction.
+                */
+               int getNumFuturesTotal();
+
+               /**
+                * Gets the number of Futures in the conjunction that are 
already complete.
+                * @return The number of Futures in the conjunction that are 
already complete
+                */
+               int getNumFuturesCompleted();
+       }
+
+       /**
+        * The implementation of the {@link ConjunctFuture}.
+        * 
+        * <p>Implementation notice: The member fields all have package-private 
access, because they are
+        * either accessed by an inner subclass or by the enclosing class.
+        */
+       private static class ConjunctFutureImpl extends 
FlinkCompletableFuture<Void> implements ConjunctFuture {
+
+               /** The total number of futures in the conjunction */
+               final int numTotal;
+
+               /** The number of futures in the conjunction that are already 
complete */
+               final AtomicInteger numCompleted = new AtomicInteger();
+
+               /** The function that is attached to all futures in the 
conjunction. Once a future
+                * is complete, this function tracks the completion or fails 
the conjunct.  
+                */
+               final BiFunction<Object, Throwable, Void> completionHandler = 
new BiFunction<Object, Throwable, Void>() {
+
+                       @Override
+                       public Void apply(Object o, Throwable throwable) {
+                               if (throwable != null) {
+                                       completeExceptionally(throwable);
+                               }
+                               else if (numTotal == 
numCompleted.incrementAndGet()) {
+                                       complete(null);
+                               }
+
+                               return null;
+                       }
+               };
+
+               ConjunctFutureImpl(int numTotal) {
+                       this.numTotal = numTotal;
+               }
+
+               @Override
+               public int getNumFuturesTotal() {
+                       return numTotal;
+               }
+
+               @Override
+               public int getNumFuturesCompleted() {
+                       return numCompleted.get();
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f113d794/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 60e5575..b3fe443 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -249,27 +249,8 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
         * @throws IllegalStateException Thrown, if the vertex is not in 
CREATED state, which is the only state that permits scheduling.
         */
        public boolean scheduleForExecution(SlotProvider slotProvider, boolean 
queued) {
-               if (slotProvider == null) {
-                       throw new IllegalArgumentException("Cannot send null 
Scheduler when scheduling execution.");
-               }
-
-               final SlotSharingGroup sharingGroup = 
vertex.getJobVertex().getSlotSharingGroup();
-               final CoLocationConstraint locationConstraint = 
vertex.getLocationConstraint();
-
-               // sanity check
-               if (locationConstraint != null && sharingGroup == null) {
-                       throw new RuntimeException("Trying to schedule with 
co-location constraint but without slot sharing allowed.");
-               }
-
-               if (transitionState(CREATED, SCHEDULED)) {
-
-                       ScheduledUnit toSchedule = locationConstraint == null ?
-                               new ScheduledUnit(this, sharingGroup) :
-                               new ScheduledUnit(this, sharingGroup, 
locationConstraint);
-
-                       // IMPORTANT: To prevent leaks of cluster resources, we 
need to make sure that slots are returned
-                       //     in all cases where the deployment failed. we use 
many try {} finally {} clauses to assure that
-                       final Future<SimpleSlot> slotAllocationFuture = 
slotProvider.allocateSlot(toSchedule, queued);
+               try {
+                       final Future<SimpleSlot> slotAllocationFuture = 
allocateSlotForExecution(slotProvider, queued);
 
                        // IMPORTANT: We have to use the synchronous handle 
operation (direct executor) here so
                        // that we directly deploy the tasks if the slot 
allocation future is completed. This is
@@ -296,28 +277,54 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
                        });
 
                        // if tasks have to scheduled immediately check that 
the task has been deployed
-                       // TODO: This might be problematic if the future is not 
completed right away
-                       if (!queued) {
-                               if (!deploymentFuture.isDone()) {
-                                       markFailed(new 
IllegalArgumentException("The slot allocation future has not been completed 
yet."));
-                               }
+                       if (!queued && !deploymentFuture.isDone()) {
+                               markFailed(new IllegalArgumentException("The 
slot allocation future has not been completed yet."));
                        }
-                       
+
                        return true;
                }
+               catch (IllegalExecutionStateException e) {
+                       return false;
+               }
+       }
+
+       public Future<SimpleSlot> allocateSlotForExecution(SlotProvider 
slotProvider, boolean queued) 
+                       throws IllegalExecutionStateException {
+
+               checkNotNull(slotProvider);
+
+               final SlotSharingGroup sharingGroup = 
vertex.getJobVertex().getSlotSharingGroup();
+               final CoLocationConstraint locationConstraint = 
vertex.getLocationConstraint();
+
+               // sanity check
+               if (locationConstraint != null && sharingGroup == null) {
+                       throw new IllegalStateException(
+                                       "Trying to schedule with co-location 
constraint but without slot sharing allowed.");
+               }
+
+               // this method only works if the execution is in the state 
'CREATED'
+               if (transitionState(CREATED, SCHEDULED)) {
+
+                       ScheduledUnit toSchedule = locationConstraint == null ?
+                                       new ScheduledUnit(this, sharingGroup) :
+                                       new ScheduledUnit(this, sharingGroup, 
locationConstraint);
+
+                       return slotProvider.allocateSlot(toSchedule, queued);
+               }
                else {
                        // call race, already deployed, or already done
-                       return false;
+                       throw new IllegalExecutionStateException(this, CREATED, 
state);
                }
        }
 
        public void deployToSlot(final SimpleSlot slot) throws JobException {
-               // sanity checks
-               if (slot == null) {
-                       throw new NullPointerException();
-               }
+               checkNotNull(slot);
+
+               // Check if the TaskManager died in the meantime
+               // This only speeds up the response to TaskManagers failing 
concurrently to deployments.
+               // The more general check is the timeout of the deployment call
                if (!slot.isAlive()) {
-                       throw new JobException("Target slot for deployment is 
not alive.");
+                       throw new JobException("Target slot (TaskManager) for 
deployment is no longer alive.");
                }
 
                // make sure exactly one deployment call happens from the 
correct state

http://git-wip-us.apache.org/repos/asf/flink/blob/f113d794/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionAndSlot.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionAndSlot.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionAndSlot.java
new file mode 100644
index 0000000..ea6186e
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionAndSlot.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.instance.SimpleSlot;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A pair of an {@link Execution} together with a slot future.
+ */
+public class ExecutionAndSlot {
+
+       public final Execution executionAttempt;
+
+       public final Future<SimpleSlot> slotFuture;
+
+       public ExecutionAndSlot(Execution executionAttempt, Future<SimpleSlot> 
slotFuture) {
+               this.executionAttempt = checkNotNull(executionAttempt);
+               this.slotFuture = checkNotNull(slotFuture);
+       }
+
+       // 
-----------------------------------------------------------------------
+
+       @Override
+       public String toString() {
+               return super.toString();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f113d794/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index f25120c..ad4347d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.executiongraph;
 
 import org.apache.commons.lang3.StringUtils;
+
 import org.apache.flink.api.common.Archiveable;
 import org.apache.flink.api.common.ArchivedExecutionConfig;
 import org.apache.flink.api.common.ExecutionConfig;
@@ -40,9 +41,14 @@ import 
org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
+import org.apache.flink.runtime.concurrent.BiFunction;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.concurrent.FutureUtils.ConjunctFuture;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.execution.SuppressRestartsException;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
+import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.instance.SlotProvider;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
@@ -53,6 +59,7 @@ import org.apache.flink.runtime.jobgraph.ScheduleMode;
 import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
+import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 import org.apache.flink.runtime.query.KvStateLocationRegistry;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.util.SerializableObject;
@@ -60,6 +67,7 @@ import org.apache.flink.runtime.util.SerializedThrowable;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedValue;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -77,11 +85,14 @@ import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * The execution graph is the central data structure that coordinates the 
distributed
@@ -158,7 +169,7 @@ public class ExecutionGraph implements 
AccessExecutionGraph, Archiveable<Archive
        private final long[] stateTimestamps;
 
        /** The timeout for all messages that require a 
response/acknowledgement */
-       private final Time timeout;
+       private final Time rpcCallTimeout;
 
        // ------ Configuration of the Execution -------
 
@@ -171,6 +182,8 @@ public class ExecutionGraph implements 
AccessExecutionGraph, Archiveable<Archive
         * from results than need to be materialized. */
        private ScheduleMode scheduleMode = ScheduleMode.LAZY_FROM_SOURCES;
 
+       private final Time scheduleAllocationTimeout;
+
        // ------ Execution status and progress. These values are volatile, and 
accessed under the lock -------
 
        /** Current status of the job execution */
@@ -292,7 +305,8 @@ public class ExecutionGraph implements 
AccessExecutionGraph, Archiveable<Archive
                this.stateTimestamps = new long[JobStatus.values().length];
                this.stateTimestamps[JobStatus.CREATED.ordinal()] = 
System.currentTimeMillis();
 
-               this.timeout = timeout;
+               this.rpcCallTimeout = checkNotNull(timeout);
+               this.scheduleAllocationTimeout = checkNotNull(timeout);
 
                this.restartStrategy = restartStrategy;
 
@@ -695,7 +709,7 @@ public class ExecutionGraph implements 
AccessExecutionGraph, Archiveable<Archive
 
                        // create the execution job vertex and attach it to the 
graph
                        ExecutionJobVertex ejv =
-                                       new ExecutionJobVertex(this, jobVertex, 
1, timeout, createTimestamp);
+                                       new ExecutionJobVertex(this, jobVertex, 
1, rpcCallTimeout, createTimestamp);
                        ejv.connectToPredecessors(this.intermediateResults);
 
                        ExecutionJobVertex previousTask = 
this.tasks.putIfAbsent(jobVertex.getID(), ejv);
@@ -717,9 +731,7 @@ public class ExecutionGraph implements 
AccessExecutionGraph, Archiveable<Archive
        }
 
        public void scheduleForExecution(SlotProvider slotProvider) throws 
JobException {
-               if (slotProvider == null) {
-                       throw new IllegalArgumentException("Scheduler must not 
be null.");
-               }
+               checkNotNull(slotProvider);
 
                if (this.slotProvider != null && this.slotProvider != 
slotProvider) {
                        throw new IllegalArgumentException("Cannot use 
different slot providers for the same job");
@@ -731,18 +743,11 @@ public class ExecutionGraph implements 
AccessExecutionGraph, Archiveable<Archive
                        switch (scheduleMode) {
 
                                case LAZY_FROM_SOURCES:
-                                       // simply take the vertices without 
inputs.
-                                       for (ExecutionJobVertex ejv : 
this.tasks.values()) {
-                                               if 
(ejv.getJobVertex().isInputVertex()) {
-                                                       
ejv.scheduleAll(slotProvider, allowQueuedScheduling);
-                                               }
-                                       }
+                                       scheduleLazy(slotProvider);
                                        break;
 
                                case EAGER:
-                                       for (ExecutionJobVertex ejv : 
getVerticesTopologically()) {
-                                               ejv.scheduleAll(slotProvider, 
allowQueuedScheduling);
-                                       }
+                                       scheduleEager(slotProvider, 
scheduleAllocationTimeout);
                                        break;
 
                                default:
@@ -754,6 +759,139 @@ public class ExecutionGraph implements 
AccessExecutionGraph, Archiveable<Archive
                }
        }
 
+       private void scheduleLazy(SlotProvider slotProvider) throws 
NoResourceAvailableException {
+               // simply take the vertices without inputs.
+               for (ExecutionJobVertex ejv : this.tasks.values()) {
+                       if (ejv.getJobVertex().isInputVertex()) {
+                               ejv.scheduleAll(slotProvider, 
allowQueuedScheduling);
+                       }
+               }
+       }
+
+       /**
+        * 
+        * 
+        * @param slotProvider  The resource provider from which the slots are 
allocated
+        * @param timeout       The maximum time that the deployment may take, 
before a
+        *                      TimeoutException is thrown.
+        */
+       private void scheduleEager(SlotProvider slotProvider, final Time 
timeout) {
+               checkState(state == JobStatus.RUNNING, "job is not running 
currently");
+
+               // Important: reserve all the space we need up front.
+               // that way we do not have any operation that can fail between 
allocating the slots
+               // and adding them to the list. If we had a failure in between 
there, that would
+               // cause the slots to get lost
+               final ArrayList<ExecutionAndSlot[]> resources = new 
ArrayList<>(getNumberOfExecutionJobVertices());
+               final boolean queued = allowQueuedScheduling;
+
+               // we use this flag to handle failures in a 'finally' clause
+               // that allows us to not go through clumsy cast-and-rethrow 
logic
+               boolean successful = false;
+
+               try {
+                       // collecting all the slots may resize and fail in that 
operation without slots getting lost
+                       final ArrayList<Future<SimpleSlot>> slotFutures = new 
ArrayList<>(getNumberOfExecutionJobVertices());
+
+                       // allocate the slots (obtain all their futures
+                       for (ExecutionJobVertex ejv : 
getVerticesTopologically()) {
+                               // these calls are not blocking, they only 
return futures
+                               ExecutionAndSlot[] slots = 
ejv.allocateResourcesForAll(slotProvider, queued);
+
+                               // we need to first add the slots to this list, 
to be safe on release
+                               resources.add(slots);
+
+                               for (ExecutionAndSlot ens : slots) {
+                                       slotFutures.add(ens.slotFuture);
+                               }
+                       }
+
+                       // this future is complete once all slot futures are 
complete.
+                       // the future fails once one slot future fails.
+                       final ConjunctFuture allAllocationsComplete = 
FutureUtils.combineAll(slotFutures);
+
+                       // make sure that we fail if the allocation timeout was 
exceeded
+                       final ScheduledFuture<?> timeoutCancelHandle = 
futureExecutor.schedule(new Runnable() {
+                               @Override
+                               public void run() {
+                                       // When the timeout triggers, we try to 
complete the conjunct future with an exception.
+                                       // Note that this is a no-op if the 
future is already completed
+                                       int numTotal = 
allAllocationsComplete.getNumFuturesTotal();
+                                       int numComplete = 
allAllocationsComplete.getNumFuturesCompleted();
+                                       String message = "Could not allocate 
all requires slots within timeout of " +
+                                                       timeout + ". Slots 
required: " + numTotal + ", slots allocated: " + numComplete;
+
+                                       
allAllocationsComplete.completeExceptionally(new 
NoResourceAvailableException(message));
+                               }
+                       }, timeout.getSize(), timeout.getUnit());
+
+
+                       allAllocationsComplete.handleAsync(new BiFunction<Void, 
Throwable, Void>() {
+
+                               @Override
+                               public Void apply(Void ignored, Throwable 
throwable) {
+                                       try {
+                                               // we do not need the 
cancellation timeout any more
+                                               
timeoutCancelHandle.cancel(false);
+
+                                               if (throwable == null) {
+                                                       // successfully 
obtained all slots, now deploy
+
+                                                       for (ExecutionAndSlot[] 
jobVertexTasks : resources) {
+                                                               for 
(ExecutionAndSlot execAndSlot : jobVertexTasks) {
+
+                                                                       // the 
futures must all be ready - this is simply a sanity check
+                                                                       final 
SimpleSlot slot;
+                                                                       try {
+                                                                               
slot = execAndSlot.slotFuture.getNow(null);
+                                                                               
checkNotNull(slot);
+                                                                       }
+                                                                       catch 
(ExecutionException | NullPointerException e) {
+                                                                               
throw new IllegalStateException("SlotFuture is incomplete " +
+                                                                               
                "or erroneous even though all futures completed");
+                                                                       }
+
+                                                                       // 
actual deployment
+                                                                       
execAndSlot.executionAttempt.deployToSlot(slot);
+                                                               }
+                                                       }
+                                               }
+                                               else {
+                                                       // let the exception 
handler deal with this
+                                                       throw throwable;
+                                               }
+                                       }
+                                       catch (Throwable t) {
+                                               // we catch everything here to 
make sure cleanup happens and the
+                                               // ExecutionGraph notices
+                                               // we need to go into recovery 
and make sure to release all slots
+                                               try {
+                                                       fail(t);
+                                               }
+                                               finally {
+                                                       
ExecutionGraphUtils.releaseAllSlotsSilently(resources);
+                                               }
+                                       }
+
+                                       // Wouldn't it be nice if we could 
return an actual Void object?
+                                       // return (Void) 
Unsafe.getUnsafe().allocateInstance(Void.class);
+                                       return null; 
+                               }
+                       }, futureExecutor);
+
+                       // from now on, slots will be rescued by the the 
futures and their completion, or by the timeout
+                       successful = true;
+               }
+               finally {
+                       if (!successful) {
+                               // we come here only if the 'try' block 
finished with an exception
+                               // we release the slots (possibly failing some 
executions on the way) and
+                               // let the exception bubble up
+                               
ExecutionGraphUtils.releaseAllSlotsSilently(resources);
+                       }
+               }
+       }
+
        public void cancel() {
                while (true) {
                        JobStatus current = state;
@@ -971,7 +1109,7 @@ public class ExecutionGraph implements 
AccessExecutionGraph, Archiveable<Archive
                        }
                } catch (IOException | ClassNotFoundException e) {
                        LOG.error("Couldn't create ArchivedExecutionConfig for 
job {} ", getJobID(), e);
-               };
+               }
                return null;
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f113d794/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphUtils.java
new file mode 100644
index 0000000..cd6d6aa
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphUtils.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.runtime.concurrent.BiFunction;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.instance.SimpleSlot;
+import org.apache.flink.util.ExceptionUtils;
+
+import java.util.List;
+
+/**
+ * Utilities for dealing with the execution graphs and scheduling.
+ */
+public class ExecutionGraphUtils {
+
+       /**
+        * Releases the slot represented by the given future. If the future is 
complete, the
+        * slot is immediately released. Otherwise, the slot is released as 
soon as the future
+        * is completed.
+        * 
+        * <p>Note that releasing the slot means cancelling any task execution 
currently
+        * associated with that slot.
+        * 
+        * @param slotFuture The future for the slot to release.
+        */
+       public static void releaseSlotFuture(Future<SimpleSlot> slotFuture) {
+               slotFuture.handle(ReleaseSlotFunction.INSTANCE);
+       }
+
+       /**
+        * Releases the all the slots in the list of arrays of {@code 
ExecutionAndSlot}.
+        * For each future in that collection holds: If the future is complete, 
its slot is
+        * immediately released. Otherwise, the slot is released as soon as the 
future
+        * is completed.
+        * 
+        * <p>This methods never throws any exceptions (except for fatal 
exceptions) and continues
+        * to release the remaining slots if one slot release failed.
+        *
+        * <p>Note that releasing the slot means cancelling any task execution 
currently
+        * associated with that slot.
+        * 
+        * @param resources The collection of ExecutionAndSlot whose slots 
should be released.
+        */
+       public static void releaseAllSlotsSilently(List<ExecutionAndSlot[]> 
resources) {
+               try {
+                       for (ExecutionAndSlot[] jobVertexResources : resources) 
{
+                               if (jobVertexResources != null) {
+                                       for (ExecutionAndSlot execAndSlot : 
jobVertexResources) {
+                                               if (execAndSlot != null) {
+                                                       try {
+                                                               
releaseSlotFuture(execAndSlot.slotFuture);
+                                                       }
+                                                       catch (Throwable t) {
+                                                               
ExceptionUtils.rethrowIfFatalError(t);
+                                                       }
+                                               }
+                                       }
+                               }
+                       }
+               }
+               catch (Throwable t) {
+                       ExceptionUtils.rethrowIfFatalError(t);
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+
+       /**
+        * A function to be applied into a future, releasing the slot 
immediately upon completion.
+        * Completion here refers to both the successful and exceptional 
completion.
+        */
+       private static final class ReleaseSlotFunction implements 
BiFunction<SimpleSlot, Throwable, Void> {
+
+               static final ReleaseSlotFunction INSTANCE = new 
ReleaseSlotFunction();
+
+               @Override
+               public Void apply(SimpleSlot simpleSlot, Throwable throwable) {
+                       if (simpleSlot != null) {
+                               simpleSlot.releaseSlot();
+                       }
+                       return null;
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+
+       /** Utility class is not meant to be instantiated */
+       private ExecutionGraphUtils() {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f113d794/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index 3828fc9..754148e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -30,7 +30,9 @@ import org.apache.flink.core.io.InputSplitSource;
 import org.apache.flink.core.io.LocatableInputSplit;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.instance.SlotProvider;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
@@ -388,7 +390,7 @@ public class ExecutionJobVertex implements 
AccessExecutionJobVertex, Archiveable
        
        public void scheduleAll(SlotProvider slotProvider, boolean queued) {
                
-               ExecutionVertex[] vertices = this.taskVertices;
+               final ExecutionVertex[] vertices = this.taskVertices;
 
                // kick off the tasks
                for (ExecutionVertex ev : vertices) {
@@ -396,6 +398,48 @@ public class ExecutionJobVertex implements 
AccessExecutionJobVertex, Archiveable
                }
        }
 
+       /**
+        * Acquires a slot for all the execution vertices of this 
ExecutionJobVertex. The method returns
+        * pairs of the slots and execution attempts, to ease correlation 
between vertices and execution
+        * attempts.
+        * 
+        * <p>If this method throws an exception, it makes sure to release all 
so far requested slots.
+        * 
+        * @param resourceProvider The resource provider from whom the slots 
are requested.
+        */
+       public ExecutionAndSlot[] allocateResourcesForAll(SlotProvider 
resourceProvider, boolean queued) {
+               final ExecutionVertex[] vertices = this.taskVertices;
+               final ExecutionAndSlot[] slots = new 
ExecutionAndSlot[vertices.length];
+
+               // try to acquire a slot future for each execution.
+               // we store the execution with the future just to be on the 
safe side
+               for (int i = 0; i < vertices.length; i++) {
+
+                       // we use this flag to handle failures in a 'finally' 
clause
+                       // that allows us to not go through clumsy 
cast-and-rethrow logic
+                       boolean successful = false;
+
+                       try {
+                               // allocate the next slot (future)
+                               final Execution exec = 
vertices[i].getCurrentExecutionAttempt();
+                               final Future<SimpleSlot> future = 
exec.allocateSlotForExecution(resourceProvider, queued);
+                               slots[i] = new ExecutionAndSlot(exec, future);
+                               successful = true;
+                       }
+                       finally {
+                               if (!successful) {
+                                       // this is the case if an exception was 
thrown
+                                       for (int k = 0; k < i; k++) {
+                                               
ExecutionGraphUtils.releaseSlotFuture(slots[k].slotFuture);
+                                       }
+                               }
+                       }
+               }
+
+               // all good, we acquired all slots
+               return slots;
+       }
+
        public void cancel() {
                for (ExecutionVertex ev : getTaskVertices()) {
                        ev.cancel();

http://git-wip-us.apache.org/repos/asf/flink/blob/f113d794/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 92327fd..ca8e07c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -102,6 +102,7 @@ public class ExecutionVertex implements 
AccessExecutionVertex, Archiveable<Archi
                        int subTaskIndex,
                        IntermediateResult[] producedDataSets,
                        Time timeout) {
+
                this(
                                jobVertex,
                                subTaskIndex,
@@ -133,7 +134,7 @@ public class ExecutionVertex implements 
AccessExecutionVertex, Archiveable<Archi
                this.taskNameWithSubtask = String.format("%s (%d/%d)",
                                jobVertex.getJobVertex().getName(), 
subTaskIndex + 1, jobVertex.getParallelism());
 
-               this.resultPartitions = new 
LinkedHashMap<IntermediateResultPartitionID, 
IntermediateResultPartition>(producedDataSets.length, 1);
+               this.resultPartitions = new 
LinkedHashMap<>(producedDataSets.length, 1);
 
                for (IntermediateResult result : producedDataSets) {
                        IntermediateResultPartition irp = new 
IntermediateResultPartition(result, this, subTaskIndex);

http://git-wip-us.apache.org/repos/asf/flink/blob/f113d794/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IllegalExecutionStateException.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IllegalExecutionStateException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IllegalExecutionStateException.java
new file mode 100644
index 0000000..44162ab
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IllegalExecutionStateException.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.runtime.execution.ExecutionState;
+
+/**
+ * A special {@link IllegalStateException} indicating a mismatch in the 
expected and actual
+ * {@link ExecutionState} of an {@link Execution}.
+ */
+public class IllegalExecutionStateException extends IllegalStateException {
+
+       private static final long serialVersionUID = 1L;
+
+       /**
+        * Creates a new IllegalExecutionStateException with the error message 
indicating
+        * the expected and actual state.
+        * 
+        * @param expected The expected state 
+        * @param actual   The actual state
+        */
+       public IllegalExecutionStateException(ExecutionState expected, 
ExecutionState actual) {
+               super("Invalid execution state: Expected " + expected + " , 
found " + actual);
+       }
+
+       /**
+        * Creates a new IllegalExecutionStateException with the error message 
indicating
+        * the expected and actual state.
+        *
+        * @param expected The expected state 
+        * @param actual   The actual state
+        */
+       public IllegalExecutionStateException(Execution execution, 
ExecutionState expected, ExecutionState actual) {
+               super(execution.getVertexWithAttempt() + " is no longer in 
expected state " + expected + 
+                               " but in state " + actual);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f113d794/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
index 4da6c7b..8ba5040 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
@@ -1048,9 +1048,7 @@ public class SlotPool extends 
RpcEndpoint<SlotPoolGateway> {
 
                private final long timestamp;
 
-               SlotAndTimestamp(
-                               AllocatedSlot slot,
-                               long timestamp) {
+               SlotAndTimestamp(AllocatedSlot slot, long timestamp) {
                        this.slot = slot;
                        this.timestamp = timestamp;
                }
@@ -1062,5 +1060,10 @@ public class SlotPool extends 
RpcEndpoint<SlotPoolGateway> {
                public long timestamp() {
                        return timestamp;
                }
+
+               @Override
+               public String toString() {
+                       return slot + " @ " + timestamp;
+               }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f113d794/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
new file mode 100644
index 0000000..43710cb
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.concurrent;
+
+import org.apache.flink.runtime.concurrent.FutureUtils.ConjunctFuture;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.concurrent.ExecutionException;
+
+import static org.junit.Assert.*;
+
+/**
+ * Tests for the utility methods in {@link FutureUtils}
+ */
+public class FutureUtilsTest {
+
+       @Test
+       public void testConjunctFutureFailsOnEmptyAndNull() throws Exception {
+               try {
+                       FutureUtils.combineAll(null);
+                       fail();
+               } catch (NullPointerException ignored) {}
+
+               try {
+                       FutureUtils.combineAll(Arrays.asList(
+                                       new FlinkCompletableFuture<Object>(),
+                                       null,
+                                       new FlinkCompletableFuture<Object>()));
+                       fail();
+               } catch (NullPointerException ignored) {}
+       }
+
+       @Test
+       public void testConjunctFutureCompletion() throws Exception {
+               // some futures that we combine
+               CompletableFuture<Object> future1 = new 
FlinkCompletableFuture<>();
+               CompletableFuture<Object> future2 = new 
FlinkCompletableFuture<>();
+               CompletableFuture<Object> future3 = new 
FlinkCompletableFuture<>();
+               CompletableFuture<Object> future4 = new 
FlinkCompletableFuture<>();
+
+               // some future is initially completed
+               future2.complete(new Object());
+
+               // build the conjunct future
+               ConjunctFuture result = 
FutureUtils.combineAll(Arrays.asList(future1, future2, future3, future4));
+
+               Future<Void> resultMapped = result.thenAccept(new 
AcceptFunction<Void>() {
+                       @Override
+                       public void accept(Void value) {}
+               });
+
+               assertEquals(4, result.getNumFuturesTotal());
+               assertEquals(1, result.getNumFuturesCompleted());
+               assertFalse(result.isDone());
+               assertFalse(resultMapped.isDone());
+
+               // complete two more futures
+               future4.complete(new Object());
+               assertEquals(2, result.getNumFuturesCompleted());
+               assertFalse(result.isDone());
+               assertFalse(resultMapped.isDone());
+
+               future1.complete(new Object());
+               assertEquals(3, result.getNumFuturesCompleted());
+               assertFalse(result.isDone());
+               assertFalse(resultMapped.isDone());
+
+               // complete one future again
+               future1.complete(new Object());
+               assertEquals(3, result.getNumFuturesCompleted());
+               assertFalse(result.isDone());
+               assertFalse(resultMapped.isDone());
+
+               // complete the final future
+               future3.complete(new Object());
+               assertEquals(4, result.getNumFuturesCompleted());
+               assertTrue(result.isDone());
+               assertTrue(resultMapped.isDone());
+       }
+
+       @Test
+       public void testConjunctFutureFailureOnFirst() throws Exception {
+
+               CompletableFuture<Object> future1 = new 
FlinkCompletableFuture<>();
+               CompletableFuture<Object> future2 = new 
FlinkCompletableFuture<>();
+               CompletableFuture<Object> future3 = new 
FlinkCompletableFuture<>();
+               CompletableFuture<Object> future4 = new 
FlinkCompletableFuture<>();
+
+               // build the conjunct future
+               ConjunctFuture result = 
FutureUtils.combineAll(Arrays.asList(future1, future2, future3, future4));
+
+               Future<Void> resultMapped = result.thenAccept(new 
AcceptFunction<Void>() {
+                       @Override
+                       public void accept(Void value) {}
+               });
+
+               assertEquals(4, result.getNumFuturesTotal());
+               assertEquals(0, result.getNumFuturesCompleted());
+               assertFalse(result.isDone());
+               assertFalse(resultMapped.isDone());
+
+               future2.completeExceptionally(new IOException());
+
+               assertEquals(0, result.getNumFuturesCompleted());
+               assertTrue(result.isDone());
+               assertTrue(resultMapped.isDone());
+
+               try {
+                       result.get();
+                       fail();
+               } catch (ExecutionException e) {
+                       assertTrue(e.getCause() instanceof IOException);
+               }
+
+               try {
+                       resultMapped.get();
+                       fail();
+               } catch (ExecutionException e) {
+                       assertTrue(e.getCause() instanceof IOException);
+               }
+       }
+
+       @Test
+       public void testConjunctFutureFailureOnSuccessive() throws Exception {
+
+               CompletableFuture<Object> future1 = new 
FlinkCompletableFuture<>();
+               CompletableFuture<Object> future2 = new 
FlinkCompletableFuture<>();
+               CompletableFuture<Object> future3 = new 
FlinkCompletableFuture<>();
+               CompletableFuture<Object> future4 = new 
FlinkCompletableFuture<>();
+
+               // build the conjunct future
+               ConjunctFuture result = 
FutureUtils.combineAll(Arrays.asList(future1, future2, future3, future4));
+               assertEquals(4, result.getNumFuturesTotal());
+
+               Future<Void> resultMapped = result.thenAccept(new 
AcceptFunction<Void>() {
+                       @Override
+                       public void accept(Void value) {}
+               });
+
+               future1.complete(new Object());
+               future3.complete(new Object());
+               future4.complete(new Object());
+
+               future2.completeExceptionally(new IOException());
+
+               assertEquals(3, result.getNumFuturesCompleted());
+               assertTrue(result.isDone());
+               assertTrue(resultMapped.isDone());
+
+               try {
+                       result.get();
+                       fail();
+               } catch (ExecutionException e) {
+                       assertTrue(e.getCause() instanceof IOException);
+               }
+
+               try {
+                       resultMapped.get();
+                       fail();
+               } catch (ExecutionException e) {
+                       assertTrue(e.getCause() instanceof IOException);
+               }
+       }
+
+       @Test
+       public void testConjunctOfNone() throws Exception {
+               final ConjunctFuture result = 
FutureUtils.combineAll(Collections.<Future<Object>>emptyList());
+
+               assertEquals(0, result.getNumFuturesTotal());
+               assertEquals(0, result.getNumFuturesCompleted());
+               assertTrue(result.isDone());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f113d794/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
new file mode 100644
index 0000000..9834dc6
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
@@ -0,0 +1,610 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+import org.apache.flink.runtime.instance.SimpleSlot;
+import org.apache.flink.runtime.instance.Slot;
+import org.apache.flink.runtime.instance.SlotProvider;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.ScheduleMode;
+import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
+import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
+import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.Test;
+
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.mockito.verification.Timeout;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+/**
+ * Tests for the scheduling of the execution graph. This tests that
+ * for example the order of deployments is correct and that bulk slot 
allocation
+ * works properly.
+ */
+public class ExecutionGraphSchedulingTest extends TestLogger {
+
+       private final ScheduledExecutorService executor = 
Executors.newSingleThreadScheduledExecutor();
+
+       @After
+       public void shutdown() {
+               executor.shutdownNow();
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Tests
+       // 
------------------------------------------------------------------------
+
+       
+       /**
+        * Tests that with scheduling futures and pipelined deployment, the 
target vertex will
+        * not deploy its task before the source vertex does.
+        */
+       @Test
+       public void testScheduleSourceBeforeTarget() throws Exception {
+
+               //                                            [pipelined]
+               //  we construct a simple graph    (source) ----------------> 
(target)
+
+               final int parallelism = 1;
+
+               final JobVertex sourceVertex = new JobVertex("source");
+               sourceVertex.setParallelism(parallelism);
+               sourceVertex.setInvokableClass(NoOpInvokable.class);
+
+               final JobVertex targetVertex = new JobVertex("target");
+               targetVertex.setParallelism(parallelism);
+               targetVertex.setInvokableClass(NoOpInvokable.class);
+
+               targetVertex.connectNewDataSetAsInput(sourceVertex, 
DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+
+               final JobID jobId = new JobID();
+               final JobGraph jobGraph = new JobGraph(jobId, "test", 
sourceVertex, targetVertex);
+
+               final ExecutionGraph eg = createExecutionGraph(jobGraph);
+
+               //
+               //  set up two TaskManager gateways and slots
+
+               final TaskManagerGateway gatewaySource = createTaskManager();
+               final TaskManagerGateway gatewayTarget = createTaskManager();
+
+               final SimpleSlot sourceSlot = createSlot(gatewaySource, jobId);
+               final SimpleSlot targetSlot = createSlot(gatewayTarget, jobId);
+
+               final FlinkCompletableFuture<SimpleSlot> sourceFuture = new 
FlinkCompletableFuture<>();
+               final FlinkCompletableFuture<SimpleSlot> targetFuture = new 
FlinkCompletableFuture<>();
+
+               ProgrammedSlotProvider slotProvider = new 
ProgrammedSlotProvider(parallelism);
+               slotProvider.addSlot(sourceVertex.getID(), 0, sourceFuture);
+               slotProvider.addSlot(targetVertex.getID(), 0, targetFuture);
+
+               eg.setScheduleMode(ScheduleMode.EAGER);
+               eg.setQueuedSchedulingAllowed(true);
+               eg.scheduleForExecution(slotProvider);
+
+               // job should be running
+               assertEquals(JobStatus.RUNNING, eg.getState());
+
+               // we fulfill the target slot before the source slot
+               // that should not cause a deployment or deployment related 
failure
+               targetFuture.complete(targetSlot);
+
+               verify(gatewayTarget, new Timeout(50, 
times(0))).submitTask(any(TaskDeploymentDescriptor.class), any(Time.class));
+               assertEquals(JobStatus.RUNNING, eg.getState());
+
+               // now supply the source slot
+               sourceFuture.complete(sourceSlot);
+
+               // by now, all deployments should have happened
+               verify(gatewaySource, 
timeout(1000)).submitTask(any(TaskDeploymentDescriptor.class), any(Time.class));
+               verify(gatewayTarget, 
timeout(1000)).submitTask(any(TaskDeploymentDescriptor.class), any(Time.class));
+
+               assertEquals(JobStatus.RUNNING, eg.getState());
+       }
+
+       /**
+        * This test verifies that before deploying a pipelined connected 
component, the
+        * full set of slots is available, and that not some tasks are 
deployed, and later the
+        * system realizes that not enough resources are available.
+        */
+       @Test
+       public void testDeployPipelinedConnectedComponentsTogether() throws 
Exception {
+
+               //                                            [pipelined]
+               //  we construct a simple graph    (source) ----------------> 
(target)
+
+               final int parallelism = 8;
+
+               final JobVertex sourceVertex = new JobVertex("source");
+               sourceVertex.setParallelism(parallelism);
+               sourceVertex.setInvokableClass(NoOpInvokable.class);
+
+               final JobVertex targetVertex = new JobVertex("target");
+               targetVertex.setParallelism(parallelism);
+               targetVertex.setInvokableClass(NoOpInvokable.class);
+
+               targetVertex.connectNewDataSetAsInput(sourceVertex, 
DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+
+               final JobID jobId = new JobID();
+               final JobGraph jobGraph = new JobGraph(jobId, "test", 
sourceVertex, targetVertex);
+
+               final ExecutionGraph eg = createExecutionGraph(jobGraph);
+
+               //
+               //  Create the slots, futures, and the slot provider
+
+               final TaskManagerGateway[] sourceTaskManagers = new 
TaskManagerGateway[parallelism];
+               final TaskManagerGateway[] targetTaskManagers = new 
TaskManagerGateway[parallelism];
+
+               final SimpleSlot[] sourceSlots = new SimpleSlot[parallelism];
+               final SimpleSlot[] targetSlots = new SimpleSlot[parallelism];
+
+               @SuppressWarnings({"unchecked", "rawtypes"})
+               final FlinkCompletableFuture<SimpleSlot>[] sourceFutures = new 
FlinkCompletableFuture[parallelism];
+               @SuppressWarnings({"unchecked", "rawtypes"})
+               final FlinkCompletableFuture<SimpleSlot>[] targetFutures = new 
FlinkCompletableFuture[parallelism];
+
+               for (int i = 0; i < parallelism; i++) {
+                       sourceTaskManagers[i] = createTaskManager();
+                       targetTaskManagers[i] = createTaskManager();
+
+                       sourceSlots[i] = createSlot(sourceTaskManagers[i], 
jobId);
+                       targetSlots[i] = createSlot(targetTaskManagers[i], 
jobId);
+
+                       sourceFutures[i] = new FlinkCompletableFuture<>();
+                       targetFutures[i] = new FlinkCompletableFuture<>();
+               }
+
+               ProgrammedSlotProvider slotProvider = new 
ProgrammedSlotProvider(parallelism);
+               slotProvider.addSlots(sourceVertex.getID(), sourceFutures);
+               slotProvider.addSlots(targetVertex.getID(), targetFutures);
+
+               //
+               //  we complete some of the futures
+
+               for (int i = 0; i < parallelism; i += 2) {
+                       sourceFutures[i].complete(sourceSlots[i]);
+               }
+
+               //
+               //  kick off the scheduling
+
+               eg.setScheduleMode(ScheduleMode.EAGER);
+               eg.setQueuedSchedulingAllowed(true);
+               eg.scheduleForExecution(slotProvider);
+
+               verifyNothingDeployed(eg, sourceTaskManagers);
+
+               //  complete the remaining sources
+               for (int i = 1; i < parallelism; i += 2) {
+                       sourceFutures[i].complete(sourceSlots[i]);
+               }
+               verifyNothingDeployed(eg, sourceTaskManagers);
+
+               //  complete the targets except for one
+               for (int i = 1; i < parallelism; i++) {
+                       targetFutures[i].complete(targetSlots[i]);
+               }
+               verifyNothingDeployed(eg, targetTaskManagers);
+
+               //  complete the last target slot future
+               targetFutures[0].complete(targetSlots[0]);
+
+               //
+               //  verify that all deployments have happened
+
+               for (TaskManagerGateway gateway : sourceTaskManagers) {
+                       verify(gateway, 
timeout(50)).submitTask(any(TaskDeploymentDescriptor.class), any(Time.class));
+               }
+               for (TaskManagerGateway gateway : targetTaskManagers) {
+                       verify(gateway, 
timeout(50)).submitTask(any(TaskDeploymentDescriptor.class), any(Time.class));
+               }
+       }
+
+       /**
+        * This test verifies that if one slot future fails, the deployment 
will be aborted.
+        */
+       @Test
+       public void testOneSlotFailureAbortsDeploy() throws Exception {
+
+               //                                            [pipelined]
+               //  we construct a simple graph    (source) ----------------> 
(target)
+
+               final int parallelism = 6;
+
+               final JobVertex sourceVertex = new JobVertex("source");
+               sourceVertex.setParallelism(parallelism);
+               sourceVertex.setInvokableClass(NoOpInvokable.class);
+
+               final JobVertex targetVertex = new JobVertex("target");
+               targetVertex.setParallelism(parallelism);
+               targetVertex.setInvokableClass(NoOpInvokable.class);
+
+               targetVertex.connectNewDataSetAsInput(sourceVertex, 
DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
+
+               final JobID jobId = new JobID();
+               final JobGraph jobGraph = new JobGraph(jobId, "test", 
sourceVertex, targetVertex);
+
+               final ExecutionGraph eg = createExecutionGraph(jobGraph);
+               TerminalJobStatusListener testListener = new 
TerminalJobStatusListener();
+               eg.registerJobStatusListener(testListener);
+
+               //
+               //  Create the slots, futures, and the slot provider
+
+               final TaskManagerGateway taskManager = 
mock(TaskManagerGateway.class);
+               final SlotOwner slotOwner = mock(SlotOwner.class);
+
+               final SimpleSlot[] sourceSlots = new SimpleSlot[parallelism];
+               final SimpleSlot[] targetSlots = new SimpleSlot[parallelism];
+
+               @SuppressWarnings({"unchecked", "rawtypes"})
+               final FlinkCompletableFuture<SimpleSlot>[] sourceFutures = new 
FlinkCompletableFuture[parallelism];
+               @SuppressWarnings({"unchecked", "rawtypes"})
+               final FlinkCompletableFuture<SimpleSlot>[] targetFutures = new 
FlinkCompletableFuture[parallelism];
+
+               for (int i = 0; i < parallelism; i++) {
+                       sourceSlots[i] = createSlot(taskManager, jobId, 
slotOwner);
+                       targetSlots[i] = createSlot(taskManager, jobId, 
slotOwner);
+
+                       sourceFutures[i] = new FlinkCompletableFuture<>();
+                       targetFutures[i] = new FlinkCompletableFuture<>();
+               }
+
+               ProgrammedSlotProvider slotProvider = new 
ProgrammedSlotProvider(parallelism);
+               slotProvider.addSlots(sourceVertex.getID(), sourceFutures);
+               slotProvider.addSlots(targetVertex.getID(), targetFutures);
+
+               //
+               //  we complete some of the futures
+
+               for (int i = 0; i < parallelism; i += 2) {
+                       sourceFutures[i].complete(sourceSlots[i]);
+                       targetFutures[i + 1].complete(targetSlots[i + 1]);
+               }
+
+               //
+               //  kick off the scheduling
+
+               eg.setScheduleMode(ScheduleMode.EAGER);
+               eg.setQueuedSchedulingAllowed(true);
+               eg.scheduleForExecution(slotProvider);
+
+               // fail one slot
+               sourceFutures[1].completeExceptionally(new 
TestRuntimeException());
+
+               // wait until the job failed as a whole
+               testListener.waitForTerminalState(2000);
+
+               // wait until all slots are back
+               verify(slotOwner, new Timeout(2000, 
times(6))).returnAllocatedSlot(any(Slot.class));
+
+               // no deployment calls must have happened
+               verify(taskManager, 
times(0)).submitTask(any(TaskDeploymentDescriptor.class), any(Time.class));
+
+               // all completed futures must have been returns
+               for (int i = 0; i < parallelism; i += 2) {
+                       assertTrue(sourceSlots[i].isCanceled());
+                       assertTrue(targetSlots[i + 1].isCanceled());
+               }
+       }
+
+       /**
+        * This test verifies that the slot allocations times out after a 
certain time, and that
+        * all slots are released in that case.
+        */
+       @Test
+       public void testTimeoutForSlotAllocation() throws Exception {
+
+               //  we construct a simple graph:    (task)
+
+               final int parallelism = 3;
+
+               final JobVertex vertex = new JobVertex("task");
+               vertex.setParallelism(parallelism);
+               vertex.setInvokableClass(NoOpInvokable.class);
+
+               final JobID jobId = new JobID();
+               final JobGraph jobGraph = new JobGraph(jobId, "test", vertex);
+
+               final ExecutionGraph eg = createExecutionGraph(jobGraph, 
Time.milliseconds(20));
+               final TerminalJobStatusListener statusListener = new 
TerminalJobStatusListener();
+               eg.registerJobStatusListener(statusListener);
+
+               final SlotOwner slotOwner = mock(SlotOwner.class);
+
+               final TaskManagerGateway taskManager = 
mock(TaskManagerGateway.class);
+               final SimpleSlot[] slots = new SimpleSlot[parallelism];
+               @SuppressWarnings({"unchecked", "rawtypes"})
+               final FlinkCompletableFuture<SimpleSlot>[] slotFutures = new 
FlinkCompletableFuture[parallelism];
+
+               for (int i = 0; i < parallelism; i++) {
+                       slots[i] = createSlot(taskManager, jobId, slotOwner);
+                       slotFutures[i] = new FlinkCompletableFuture<>();
+               }
+
+               ProgrammedSlotProvider slotProvider = new 
ProgrammedSlotProvider(parallelism);
+               slotProvider.addSlots(vertex.getID(), slotFutures);
+
+               //  we complete one future
+               slotFutures[1].complete(slots[1]);
+
+               //  kick off the scheduling
+
+               eg.setScheduleMode(ScheduleMode.EAGER);
+               eg.setQueuedSchedulingAllowed(true);
+               eg.scheduleForExecution(slotProvider);
+
+               //  we complete another future
+               slotFutures[2].complete(slots[2]);
+
+               // since future[0] is still missing the while operation must 
time out
+               // we have no restarts allowed, so the job will go terminal
+               statusListener.waitForTerminalState(2000);
+
+               // wait until all slots are back
+               verify(slotOwner, new Timeout(2000, 
times(2))).returnAllocatedSlot(any(Slot.class));
+
+               //  verify that no deployments have happened
+               verify(taskManager, 
times(0)).submitTask(any(TaskDeploymentDescriptor.class), any(Time.class));
+
+               for (Future<SimpleSlot> future : slotFutures) {
+                       if (future.isDone()) {
+                               assertTrue(future.get().isCanceled());
+                       }
+               }
+       }
+
+       /**
+        * Tests that the {@link 
ExecutionJobVertex#allocateResourcesForAll(SlotProvider, boolean)} method
+        * releases partially acquired resources upon exception.
+        */
+       @Test
+       public void 
testExecutionJobVertexAllocateResourcesReleasesOnException() throws Exception {
+               final int parallelism = 8;
+
+               final JobVertex vertex = new JobVertex("vertex");
+               vertex.setParallelism(parallelism);
+               vertex.setInvokableClass(NoOpInvokable.class);
+
+               final JobID jobId = new JobID();
+               final JobGraph jobGraph = new JobGraph(jobId, "test", vertex);
+
+               final ExecutionGraph eg = createExecutionGraph(jobGraph);
+               final ExecutionJobVertex ejv = eg.getJobVertex(vertex.getID());
+
+               // set up some available slots and some slot owner that accepts 
released slots back
+               final List<SimpleSlot> returnedSlots = new ArrayList<>();
+               final SlotOwner recycler = new SlotOwner() {
+                       @Override
+                       public boolean returnAllocatedSlot(Slot slot) {
+                               returnedSlots.add((SimpleSlot) slot);
+                               return true;
+                       }
+               };
+
+               final TaskManagerGateway taskManager = 
mock(TaskManagerGateway.class);
+               final List<SimpleSlot> availableSlots = new 
ArrayList<>(Arrays.asList(
+                               createSlot(taskManager, jobId, recycler),
+                               createSlot(taskManager, jobId, recycler),
+                               createSlot(taskManager, jobId, recycler)));
+
+
+               // slot provider that hand out parallelism / 3 slots, then 
throws an exception
+               final SlotProvider slots = mock(SlotProvider.class);
+               
+               when(slots.allocateSlot(any(ScheduledUnit.class), 
anyBoolean())).then(
+                               new Answer<Future<SimpleSlot>>() {
+
+                                       @Override
+                                       public Future<SimpleSlot> 
answer(InvocationOnMock invocation) {
+                                               if (availableSlots.isEmpty()) {
+                                                       throw new 
TestRuntimeException();
+                                               } else {
+                                                       return 
FlinkCompletableFuture.completed(availableSlots.remove(0));
+                                               }
+                                       }
+                               });
+
+               // acquire resources and check that all are back after the 
failure
+
+               final int numSlotsToExpectBack = availableSlots.size();
+
+               try {
+                       ejv.allocateResourcesForAll(slots, false);
+                       fail("should have failed with an exception");
+               }
+               catch (TestRuntimeException e) {
+                       // expected
+               }
+
+               assertEquals(numSlotsToExpectBack, returnedSlots.size());
+       }
+
+       /**
+        * Tests that the {@link 
ExecutionGraph#scheduleForExecution(SlotProvider)} method
+        * releases partially acquired resources upon exception.
+        */
+       @Test
+       public void testExecutionGraphScheduleReleasesResourcesOnException() 
throws Exception {
+
+               //                                            [pipelined]
+               //  we construct a simple graph    (source) ----------------> 
(target)
+
+               final int parallelism = 3;
+
+               final JobVertex sourceVertex = new JobVertex("source");
+               sourceVertex.setParallelism(parallelism);
+               sourceVertex.setInvokableClass(NoOpInvokable.class);
+
+               final JobVertex targetVertex = new JobVertex("target");
+               targetVertex.setParallelism(parallelism);
+               targetVertex.setInvokableClass(NoOpInvokable.class);
+
+               targetVertex.connectNewDataSetAsInput(sourceVertex, 
DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+
+               final JobID jobId = new JobID();
+               final JobGraph jobGraph = new JobGraph(jobId, "test", 
sourceVertex, targetVertex);
+
+               final ExecutionGraph eg = createExecutionGraph(jobGraph);
+
+               // set up some available slots and some slot owner that accepts 
released slots back
+               final List<SimpleSlot> returnedSlots = new ArrayList<>();
+               final SlotOwner recycler = new SlotOwner() {
+                       @Override
+                       public boolean returnAllocatedSlot(Slot slot) {
+                               returnedSlots.add((SimpleSlot) slot);
+                               return true;
+                       }
+               };
+
+               final TaskManagerGateway taskManager = 
mock(TaskManagerGateway.class);
+               final List<SimpleSlot> availableSlots = new 
ArrayList<>(Arrays.asList(
+                               createSlot(taskManager, jobId, recycler),
+                               createSlot(taskManager, jobId, recycler),
+                               createSlot(taskManager, jobId, recycler),
+                               createSlot(taskManager, jobId, recycler),
+                               createSlot(taskManager, jobId, recycler)));
+
+
+               // slot provider that hand out parallelism / 3 slots, then 
throws an exception
+               final SlotProvider slots = mock(SlotProvider.class);
+
+               when(slots.allocateSlot(any(ScheduledUnit.class), 
anyBoolean())).then(
+                               new Answer<Future<SimpleSlot>>() {
+
+                                       @Override
+                                       public Future<SimpleSlot> 
answer(InvocationOnMock invocation) {
+                                               if (availableSlots.isEmpty()) {
+                                                       throw new 
TestRuntimeException();
+                                               } else {
+                                                       return 
FlinkCompletableFuture.completed(availableSlots.remove(0));
+                                               }
+                                       }
+                               });
+
+               // acquire resources and check that all are back after the 
failure
+
+               final int numSlotsToExpectBack = availableSlots.size();
+
+               try {
+                       eg.setScheduleMode(ScheduleMode.EAGER);
+                       eg.scheduleForExecution(slots);
+                       fail("should have failed with an exception");
+               }
+               catch (TestRuntimeException e) {
+                       // expected
+               }
+
+               assertEquals(numSlotsToExpectBack, returnedSlots.size());
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Utilities
+       // 
------------------------------------------------------------------------
+
+       private ExecutionGraph createExecutionGraph(JobGraph jobGraph) throws 
Exception {
+               return createExecutionGraph(jobGraph, Time.minutes(10));
+       }
+
+       private ExecutionGraph createExecutionGraph(JobGraph jobGraph, Time 
timeout) throws Exception {
+               return ExecutionGraphBuilder.buildGraph(
+                               null,
+                               jobGraph,
+                               new Configuration(),
+                               executor,
+                               executor,
+                               getClass().getClassLoader(),
+                               new StandaloneCheckpointRecoveryFactory(),
+                               timeout,
+                               new NoRestartStrategy(),
+                               new UnregisteredMetricsGroup(),
+                               1,
+                               log);
+       }
+
+       private SimpleSlot createSlot(TaskManagerGateway taskManager, JobID 
jobId) {
+               return createSlot(taskManager, jobId, mock(SlotOwner.class));
+       }
+
+       private SimpleSlot createSlot(TaskManagerGateway taskManager, JobID 
jobId, SlotOwner slotOwner) {
+               TaskManagerLocation location = new TaskManagerLocation(
+                               ResourceID.generate(), 
InetAddress.getLoopbackAddress(), 12345);
+
+               AllocatedSlot slot = new AllocatedSlot(
+                               new AllocationID(), jobId, location, 0, 
ResourceProfile.UNKNOWN, taskManager);
+
+               return new SimpleSlot(slot, slotOwner, 0);
+       }
+
+       private static TaskManagerGateway createTaskManager() {
+               TaskManagerGateway tm = mock(TaskManagerGateway.class);
+               when(tm.submitTask(any(TaskDeploymentDescriptor.class), 
any(Time.class)))
+                               
.thenReturn(FlinkCompletableFuture.completed(Acknowledge.get()));
+
+               return tm;
+       }
+
+       private static void verifyNothingDeployed(ExecutionGraph eg, 
TaskManagerGateway[] taskManagers) {
+               // job should still be running
+               assertEquals(JobStatus.RUNNING, eg.getState());
+
+               // none of the TaskManager should have gotten a deployment 
call, yet
+               for (TaskManagerGateway gateway : taskManagers) {
+                       verify(gateway, new Timeout(50, 
times(0))).submitTask(any(TaskDeploymentDescriptor.class), any(Time.class));
+               }
+       }
+
+       private static class TestRuntimeException extends RuntimeException {
+               private static final long serialVersionUID = 1L;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f113d794/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphUtilsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphUtilsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphUtilsTest.java
new file mode 100644
index 0000000..2e6da98
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphUtilsTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.instance.SimpleSlot;
+import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
+import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+import org.junit.Test;
+
+import java.net.InetAddress;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.mockito.Mockito.*;
+
+/**
+ * Tests for the utility methods in the class {@link ExecutionGraphUtils}.
+ */
+public class ExecutionGraphUtilsTest {
+
+       @Test
+       public void testReleaseSlots() {
+               final JobID jid = new JobID();
+               final SlotOwner owner = mock(SlotOwner.class);
+
+               final SimpleSlot slot1 = new 
SimpleSlot(createAllocatedSlot(jid, 0), owner, 0);
+               final SimpleSlot slot2 = new 
SimpleSlot(createAllocatedSlot(jid, 1), owner, 1);
+               final SimpleSlot slot3 = new 
SimpleSlot(createAllocatedSlot(jid, 2), owner, 2);
+
+               final FlinkCompletableFuture<SimpleSlot> incompleteFuture = new 
FlinkCompletableFuture<>();
+
+               final FlinkCompletableFuture<SimpleSlot> completeFuture = new 
FlinkCompletableFuture<>();
+               completeFuture.complete(slot2);
+
+               final FlinkCompletableFuture<SimpleSlot> disposedSlotFuture = 
new FlinkCompletableFuture<>();
+               slot3.releaseSlot();
+               disposedSlotFuture.complete(slot3);
+
+               // release all futures
+               ExecutionGraphUtils.releaseSlotFuture(incompleteFuture);
+               ExecutionGraphUtils.releaseSlotFuture(completeFuture);
+               ExecutionGraphUtils.releaseSlotFuture(disposedSlotFuture);
+
+               // only now complete the incomplete future
+               incompleteFuture.complete(slot1);
+
+               // verify that each slot was returned once to the owner
+               verify(owner, times(1)).returnAllocatedSlot(eq(slot1));
+               verify(owner, times(1)).returnAllocatedSlot(eq(slot2));
+               verify(owner, times(1)).returnAllocatedSlot(eq(slot3));
+       }
+
+       @Test
+       public void testReleaseSlotsWithNulls() {
+               final JobID jid = new JobID();
+               final SlotOwner owner = mock(SlotOwner.class);
+
+               final Execution mockExecution = mock(Execution.class);
+
+               final SimpleSlot slot1 = new 
SimpleSlot(createAllocatedSlot(jid, 0), owner, 0);
+               final SimpleSlot slot2 = new 
SimpleSlot(createAllocatedSlot(jid, 1), owner, 1);
+               final SimpleSlot slot3 = new 
SimpleSlot(createAllocatedSlot(jid, 2), owner, 2);
+               final SimpleSlot slot4 = new 
SimpleSlot(createAllocatedSlot(jid, 3), owner, 3);
+               final SimpleSlot slot5 = new 
SimpleSlot(createAllocatedSlot(jid, 4), owner, 4);
+
+               ExecutionAndSlot[] slots1 = new ExecutionAndSlot[] {
+                               null,
+                               new ExecutionAndSlot(mockExecution, 
FlinkCompletableFuture.completed(slot1)),
+                               null,
+                               new ExecutionAndSlot(mockExecution, 
FlinkCompletableFuture.completed(slot2)),
+                               null
+               };
+
+               ExecutionAndSlot[] slots2 = new ExecutionAndSlot[] {
+                               new ExecutionAndSlot(mockExecution, 
FlinkCompletableFuture.completed(slot3)),
+                               new ExecutionAndSlot(mockExecution, 
FlinkCompletableFuture.completed(slot4)),
+                               new ExecutionAndSlot(mockExecution, 
FlinkCompletableFuture.completed(slot5))
+               };
+
+               List<ExecutionAndSlot[]> resources = Arrays.asList(null, 
slots1, new ExecutionAndSlot[0], null, slots2);
+
+               ExecutionGraphUtils.releaseAllSlotsSilently(resources);
+
+               verify(owner, times(1)).returnAllocatedSlot(eq(slot1));
+               verify(owner, times(1)).returnAllocatedSlot(eq(slot2));
+               verify(owner, times(1)).returnAllocatedSlot(eq(slot3));
+               verify(owner, times(1)).returnAllocatedSlot(eq(slot4));
+               verify(owner, times(1)).returnAllocatedSlot(eq(slot5));
+       }
+
+       // 
------------------------------------------------------------------------
+
+       private static AllocatedSlot createAllocatedSlot(JobID jid, int num) {
+               TaskManagerLocation loc = new TaskManagerLocation(
+                               ResourceID.generate(), 
InetAddress.getLoopbackAddress(), 10000 + num);
+       
+               return new AllocatedSlot(new AllocationID(), jid, loc, num,
+                               ResourceProfile.UNKNOWN, 
mock(TaskManagerGateway.class));
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f113d794/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
index 7b6c6ea..82561b2 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
@@ -457,7 +457,7 @@ public class ExecutionVertexCancelTest {
                        assertEquals(ExecutionState.CANCELED, 
vertex.getExecutionState());
 
                        // 1)
-                       // scheduling after being created should be tolerated 
(no exception) because
+                       // scheduling after being canceled should be tolerated 
(no exception) because
                        // it can occur as the result of races
                        {
                                Scheduler scheduler = mock(Scheduler.class);

http://git-wip-us.apache.org/repos/asf/flink/blob/f113d794/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
index 9132aee..1b029e8 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
@@ -104,9 +104,6 @@ public class ExecutionVertexSchedulingTest {
 
                        future.complete(slot);
 
-                       // wait a second for future's future action be executed
-                       Thread.sleep(1000);
-
                        // will have failed
                        assertEquals(ExecutionState.FAILED, 
vertex.getExecutionState());
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/f113d794/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
index 3a7e759..006f894 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
@@ -348,9 +348,9 @@ public class PointwisePatternTest {
                        
                        
timesUsed[inEdges[0].getSource().getPartitionNumber()]++;
                }
-               
-               for (int i = 0; i < timesUsed.length; i++) {
-                       assertTrue(timesUsed[i] >= factor && timesUsed[i] <= 
factor + delta);
+
+               for (int used : timesUsed) {
+                       assertTrue(used >= factor && used <= factor + delta);
                }
        }
        
@@ -406,9 +406,9 @@ public class PointwisePatternTest {
                                
timesUsed[ee.getSource().getPartitionNumber()]++;
                        }
                }
-               
-               for (int i = 0; i < timesUsed.length; i++) {
-                       assertEquals(1, timesUsed[i]);
+
+               for (int used : timesUsed) {
+                       assertEquals(1, used);
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f113d794/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ProgrammedSlotProvider.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ProgrammedSlotProvider.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ProgrammedSlotProvider.java
new file mode 100644
index 0000000..3acb2eb
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ProgrammedSlotProvider.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.instance.SimpleSlot;
+import org.apache.flink.runtime.instance.SlotProvider;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A slot provider where one can pre-set the slot futures for tasks based on
+ * vertex ID and subtask index.
+ */
+class ProgrammedSlotProvider implements SlotProvider {
+
+       private final Map<JobVertexID, Future<SimpleSlot>[]> slotFutures = new 
HashMap<>();
+
+       private final int parallelism;
+
+       public ProgrammedSlotProvider(int parallelism) {
+               checkArgument(parallelism > 0);
+               this.parallelism = parallelism;
+       }
+
+       public void addSlot(JobVertexID vertex, int subtaskIndex, 
Future<SimpleSlot> future) {
+               checkNotNull(vertex);
+               checkNotNull(future);
+               checkArgument(subtaskIndex >= 0 && subtaskIndex < parallelism);
+
+               Future<SimpleSlot>[] futures = slotFutures.get(vertex);
+               if (futures == null) {
+                       @SuppressWarnings("unchecked")
+                       Future<SimpleSlot>[] newArray = (Future<SimpleSlot>[]) 
new Future<?>[parallelism];
+                       futures = newArray;
+                       slotFutures.put(vertex, futures);
+               }
+
+               futures[subtaskIndex] = future;
+       }
+
+       public void addSlots(JobVertexID vertex, Future<SimpleSlot>[] futures) {
+               checkNotNull(vertex);
+               checkNotNull(futures);
+               checkArgument(futures.length == parallelism);
+
+               slotFutures.put(vertex, futures);
+       }
+
+       @Override
+       public Future<SimpleSlot> allocateSlot(ScheduledUnit task, boolean 
allowQueued) {
+               JobVertexID vertexId = 
task.getTaskToExecute().getVertex().getJobvertexId();
+               int subtask = task.getTaskToExecute().getParallelSubtaskIndex();
+
+               Future<SimpleSlot>[] forTask = slotFutures.get(vertexId);
+               if (forTask != null) {
+                       Future<SimpleSlot> future = forTask[subtask];
+                       if (future != null) {
+                               return future;
+                       }
+               }
+
+               throw new IllegalArgumentException("No registered slot future 
for task " + vertexId + " (" + subtask + ')');
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f113d794/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalJobStatusListener.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalJobStatusListener.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalJobStatusListener.java
new file mode 100644
index 0000000..c107d54
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalJobStatusListener.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * A job status listener that waits lets one block until the job is in a 
terminal state.
+ */
+public class TerminalJobStatusListener  implements JobStatusListener {
+
+       private final OneShotLatch terminalStateLatch = new OneShotLatch();
+
+       public void waitForTerminalState(long timeoutMillis) throws 
InterruptedException, TimeoutException {
+               terminalStateLatch.await(timeoutMillis, TimeUnit.MILLISECONDS);
+       }
+
+       @Override
+       public void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long 
timestamp, Throwable error) {
+               if (newJobStatus.isGloballyTerminalState() || newJobStatus == 
JobStatus.SUSPENDED) {
+                       terminalStateLatch.trigger();
+               }
+       }
+}

Reply via email to