zhuzhurk closed pull request #7250: [FLINK-10945] Add an 
InputDependencyConstraint to avoid resource dead…
URL: https://github.com/apache/flink/pull/7250
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java 
b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index 3b3132b3d11..6def1dfa488 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -159,6 +159,9 @@
        /** Determines if a task fails or not if there is an error in writing 
its checkpoint data. Default: true */
        private boolean failTaskOnCheckpointError = true;
 
+       /** The input dependency constraint to schedule tasks. */
+       private InputDependencyConstraint inputDependencyConstraint = 
InputDependencyConstraint.ANY;
+
        // ------------------------------- User code values 
--------------------------------------------
 
        private GlobalJobParameters globalJobParameters;
@@ -518,6 +521,30 @@ public ExecutionMode getExecutionMode() {
                return executionMode;
        }
 
+       /**
+        * Sets the input dependency constraint for vertex scheduling. It 
indicates when a task
+        * should be scheduled considering its inputs status.
+        *
+        * The default constraint is {@link InputDependencyConstraint#ANY}.
+        *
+        * @param inputDependencyConstraint The input dependency constraint.
+        */
+       public void setInputDependencyConstraint(InputDependencyConstraint 
inputDependencyConstraint) {
+               this.inputDependencyConstraint = inputDependencyConstraint;
+       }
+
+       /**
+        * Gets the input dependency constraint for vertex scheduling. It 
indicates when a task
+        * should be scheduled considering its inputs status.
+        *
+        * The default constraint is {@link InputDependencyConstraint#ANY}.
+        *
+        * @return The input dependency constraint of this job.
+        */
+       public InputDependencyConstraint getInputDependencyConstraint() {
+               return inputDependencyConstraint;
+       }
+
        /**
         * Force TypeExtractor to use Kryo serializer for POJOS even though we 
could analyze as POJO.
         * In some cases this might be preferable. For example, when using 
interfaces
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/InputDependencyConstraint.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/InputDependencyConstraint.java
new file mode 100644
index 00000000000..c7a1e6a3519
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/InputDependencyConstraint.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common;
+
+/**
+ * This constraint indicates when a task should be scheduled considering its 
inputs status.
+ */
+public enum InputDependencyConstraint {
+
+       /**
+        * Schedule the task if any input is consumable.
+        */
+       ANY,
+
+       /**
+        * Schedule the task if all the inputs are consumable.
+        */
+       ALL
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
index 7c2b30db32a..15bdb36f1d3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
@@ -101,7 +101,8 @@ public String toString() {
                        final ResultPartitionLocation partitionLocation;
 
                        // The producing task needs to be RUNNING or already 
FINISHED
-                       if (consumedPartition.isConsumable() && producerSlot != 
null &&
+                       if ((consumedPartition.getResultType().isPipelined() || 
consumedPartition.isConsumable()) &&
+                               producerSlot != null &&
                                        (producerState == 
ExecutionState.RUNNING ||
                                                producerState == 
ExecutionState.FINISHED ||
                                                producerState == 
ExecutionState.SCHEDULED ||
@@ -136,7 +137,8 @@ else if (producerState == ExecutionState.CANCELING
                        }
                        else {
                                String msg = String.format("Trying to eagerly 
schedule a task whose inputs " +
-                                       "are not ready (partition consumable? 
%s, producer state: %s, producer slot: %s).",
+                                       "are not ready (result type: %s, 
partition consumable: %s, producer state: %s, producer slot: %s).",
+                                               
consumedPartition.getResultType(),
                                                
consumedPartition.isConsumable(),
                                                producerState,
                                                producerSlot);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index e3b501e52e8..59f76502ed6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -747,31 +747,34 @@ else if (numConsumers == 0) {
                                
consumerVertex.cachePartitionInfo(PartialInputChannelDeploymentDescriptor.fromEdge(
                                                partition, partitionExecution));
 
-                               // When deploying a consuming task, its task 
deployment descriptor will contain all
-                               // deployment information available at the 
respective time. It is possible that some
-                               // of the partitions to be consumed have not 
been created yet. These are updated
-                               // runtime via the update messages.
-                               //
-                               // TODO The current approach may send many 
update messages even though the consuming
-                               // task has already been deployed with all 
necessary information. We have to check
-                               // whether this is a problem and fix it, if it 
is.
-                               CompletableFuture.supplyAsync(
-                                       () -> {
-                                               try {
-                                                       final ExecutionGraph 
executionGraph = consumerVertex.getExecutionGraph();
-                                                       
consumerVertex.scheduleForExecution(
-                                                               
executionGraph.getSlotProvider(),
-                                                               
executionGraph.isQueuedSchedulingAllowed(),
-                                                               
LocationPreferenceConstraint.ANY, // there must be at least one known location
-                                                               
Collections.emptySet());
-                                               } catch (Throwable t) {
-                                                       consumerVertex.fail(new 
IllegalStateException("Could not schedule consumer " +
+                               // Schedule the consumer vertex if its inputs 
constraint is satisfied, otherwise postpone the scheduling
+                               if 
(consumerVertex.checkInputDependencyConstraints()) {
+                                       // When deploying a consuming task, its 
task deployment descriptor will contain all
+                                       // deployment information available at 
the respective time. It is possible that some
+                                       // of the partitions to be consumed 
have not been created yet. These are updated
+                                       // runtime via the update messages.
+                                       //
+                                       // TODO The current approach may send 
many update messages even though the consuming
+                                       // task has already been deployed with 
all necessary information. We have to check
+                                       // whether this is a problem and fix 
it, if it is.
+                                       CompletableFuture.supplyAsync(
+                                               () -> {
+                                                       try {
+                                                               final 
ExecutionGraph executionGraph = consumerVertex.getExecutionGraph();
+                                                               
consumerVertex.scheduleForExecution(
+                                                                       
executionGraph.getSlotProvider(),
+                                                                       
executionGraph.isQueuedSchedulingAllowed(),
+                                                                       
LocationPreferenceConstraint.ANY, // there must be at least one known location
+                                                                       
Collections.emptySet());
+                                                       } catch (Throwable t) {
+                                                               
consumerVertex.fail(new IllegalStateException("Could not schedule consumer " +
                                                                        "vertex 
" + consumerVertex, t));
-                                               }
+                                                       }
 
-                                               return null;
-                                       },
-                                       executor);
+                                                       return null;
+                                               },
+                                               executor);
+                               }
 
                                // double check to resolve race conditions
                                if (consumerVertex.getExecutionState() == 
RUNNING) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 56315e07146..e89409f032c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -21,6 +21,7 @@
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.ArchivedExecutionConfig;
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.InputDependencyConstraint;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.accumulators.AccumulatorHelper;
@@ -255,6 +256,9 @@
         * from results than need to be materialized. */
        private ScheduleMode scheduleMode = ScheduleMode.LAZY_FROM_SOURCES;
 
+       /** The input dependency constraint to schedule tasks. */
+       private InputDependencyConstraint inputDependencyConstraint = 
InputDependencyConstraint.ANY;
+
        // ------ Execution status and progress. These values are volatile, and 
accessed under the lock -------
 
        private final AtomicInteger verticesFinished;
@@ -456,6 +460,14 @@ public ScheduleMode getScheduleMode() {
                return scheduleMode;
        }
 
+       public void setInputDependencyConstraint(InputDependencyConstraint 
inputDependencyConstraint) {
+               this.inputDependencyConstraint = inputDependencyConstraint;
+       }
+
+       public InputDependencyConstraint getInputDependencyConstraint() {
+               return inputDependencyConstraint;
+       }
+
        public Time getAllocationTimeout() {
                return allocationTimeout;
        }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
index f1a861d2ca1..c0d877dcb28 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
@@ -179,6 +179,13 @@ public static ExecutionGraph buildGraph(
                executionGraph.setScheduleMode(jobGraph.getScheduleMode());
                
executionGraph.setQueuedSchedulingAllowed(jobGraph.getAllowQueuedScheduling());
 
+               try {
+                       executionGraph.setInputDependencyConstraint(
+                
jobGraph.getSerializedExecutionConfig().deserializeValue(classLoader).getInputDependencyConstraint());
+               } catch (IOException | ClassNotFoundException e) {
+                       throw new JobException("Fail to deserialize execution 
config.", e);
+               }
+
                try {
                        
executionGraph.setJsonPlan(JsonPlanGenerator.generatePlan(jobGraph));
                }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index a0747296c53..22e02e2b13d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -20,6 +20,7 @@
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.Archiveable;
+import org.apache.flink.api.common.InputDependencyConstraint;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.JobManagerOptions;
@@ -686,6 +687,7 @@ void scheduleOrUpdateConsumers(ResultPartitionID 
partitionId) {
 
                if 
(partition.getIntermediateResult().getResultType().isPipelined()) {
                        // Schedule or update receivers of this partition
+                       partition.markDataProduced();
                        
execution.scheduleOrUpdateConsumers(partition.getConsumers());
                }
                else {
@@ -726,6 +728,56 @@ void sendPartitionInfos() {
                }
        }
 
+       /**
+        * Check whether the InputDependencyConstraint is satisfied for this 
vertex.
+        *
+        * @return whether the input constraint is satisfied
+        */
+       public boolean checkInputDependencyConstraints() {
+               if (getExecutionGraph().getInputDependencyConstraint() == 
InputDependencyConstraint.ANY) {
+                       // InputDependencyConstraint == ANY
+                       for (int i = 0; i < inputEdges.length; i++) {
+                               if (isInputConsumable(i)) {
+                                       return true;
+                               }
+                       }
+                       return false;
+               } else {
+                       // InputDependencyConstraint == ALL
+                       for (int i = 0; i < inputEdges.length; i++) {
+                               if (!isInputConsumable(i)) {
+                                       return false;
+                               }
+                       }
+                       return true;
+               }
+       }
+
+       /**
+        * An input is consumable when
+        * 1. the source result is PIPELINED and one of the result partition 
has produced data.
+        * 2. the source result is BLOCKING and is FINISHED(all partitions are 
FINISHED).
+        *
+        * @return whether the input is consumable
+        */
+       public boolean isInputConsumable(int inputNumber) {
+               IntermediateResult result = 
jobVertex.getInputs().get(inputNumber);
+
+               if (result.getResultType().isPipelined()) {
+                       // For PIPELINED result, the input is consumable if any 
result partition has produced records or is finished
+                       for (ExecutionEdge edge : inputEdges[inputNumber]) {
+                               if (edge.getSource().hasDataProduced()) {
+                                       return true;
+                               }
+                       }
+               } else {
+                       // For BLOCKING result, the input is consumable if all 
the partitions in the result are finished
+                       return result.areAllPartitionsFinished();
+               }
+
+               return false;
+       }
+
        // 
--------------------------------------------------------------------------------------------
        //   Notifications from the Execution Attempt
        // 
--------------------------------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java
index 313272cf86b..6e1d9ba69fb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java
@@ -156,19 +156,17 @@ public int getConnectionIndex() {
 
        void resetForNewExecution() {
                this.numberOfRunningProducers.set(numParallelProducers);
+               for (IntermediateResultPartition partition : partitions) {
+                       partition.resetForNewExecution();
+               }
        }
 
        int decrementNumberOfRunningProducersAndGetRemaining() {
                return numberOfRunningProducers.decrementAndGet();
        }
 
-       boolean isConsumable() {
-               if (resultType.isPipelined()) {
-                       return true;
-               }
-               else {
-                       return numberOfRunningProducers.get() == 0;
-               }
+       boolean areAllPartitionsFinished() {
+               return numberOfRunningProducers.get() == 0;
        }
 
        @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java
index 124ceb2b6bc..56fb137eb59 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java
@@ -36,6 +36,11 @@
 
        private List<List<ExecutionEdge>> consumers;
 
+       /**
+        * Whether this partition has data produced. For pipelined result only.
+        */
+       private boolean dataProduced = false;
+
        public IntermediateResultPartition(IntermediateResult totalResult, 
ExecutionVertex producer, int partitionNumber) {
                this.totalResult = totalResult;
                this.producer = producer;
@@ -60,7 +65,7 @@ public IntermediateResultPartitionID getPartitionId() {
                return partitionId;
        }
 
-       ResultPartitionType getResultType() {
+       public ResultPartitionType getResultType() {
                return totalResult.getResultType();
        }
 
@@ -68,8 +73,24 @@ ResultPartitionType getResultType() {
                return consumers;
        }
 
+       public void markDataProduced() {
+               dataProduced = true;
+       }
+
+       public boolean hasDataProduced() {
+               return dataProduced;
+       }
+
        public boolean isConsumable() {
-               return totalResult.isConsumable();
+               if (getResultType().isPipelined()) {
+                       return dataProduced;
+               } else {
+                       return totalResult.areAllPartitionsFinished();
+               }
+       }
+
+       void resetForNewExecution() {
+               dataProduced = false;
        }
 
        int addConsumerGroup() {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptorTest.java
index 6aa36b70ab8..4f7417f27fd 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptorTest.java
@@ -29,6 +29,7 @@
 import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
@@ -192,6 +193,7 @@ private static ExecutionVertex 
mockExecutionVertex(ExecutionState state, Resourc
 
        private static IntermediateResultPartition 
mockPartition(ExecutionVertex producer) {
                IntermediateResultPartition partition = 
mock(IntermediateResultPartition.class);
+               
when(partition.getResultType()).thenReturn(ResultPartitionType.PIPELINED);
                when(partition.isConsumable()).thenReturn(true);
 
                IntermediateResult result = mock(IntermediateResult.class);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexInputConstraintTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexInputConstraintTest.java
new file mode 100644
index 00000000000..8d131a0cb30
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexInputConstraintTest.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.InputDependencyConstraint;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.failover.RestartAllStrategy;
+import 
org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
+import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.isInExecutionState;
+import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.waitForAllExecutionsPredicate;
+import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.waitUntilExecutionVertexState;
+import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.waitUntilJobStatus;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for the inputs constraint for {@link ExecutionVertex}.
+ */
+public class ExecutionVertexInputConstraintTest extends TestLogger {
+
+       @Test
+       public void testInputConsumable() throws Exception {
+               JobVertex v1 = new JobVertex("vertex1");
+               JobVertex v2 = new JobVertex("vertex2");
+               JobVertex v3 = new JobVertex("vertex3");
+               v1.setParallelism(2);
+               v2.setParallelism(2);
+               v3.setParallelism(2);
+               v1.setInvokableClass(AbstractInvokable.class);
+               v2.setInvokableClass(AbstractInvokable.class);
+               v3.setInvokableClass(AbstractInvokable.class);
+               v3.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, 
ResultPartitionType.PIPELINED);
+               v3.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL, 
ResultPartitionType.BLOCKING);
+               List<JobVertex> ordered = Arrays.asList(v1, v2, v3);
+               ExecutionGraph eg = createExecutionGraph(ordered);
+
+               ExecutionVertex ev11 = 
eg.getJobVertex(v1.getID()).getTaskVertices()[0];
+               ExecutionVertex ev12 = 
eg.getJobVertex(v1.getID()).getTaskVertices()[1];
+               ExecutionVertex ev21 = 
eg.getJobVertex(v2.getID()).getTaskVertices()[0];
+               ExecutionVertex ev22 = 
eg.getJobVertex(v2.getID()).getTaskVertices()[1];
+               ExecutionVertex ev31 = 
eg.getJobVertex(v3.getID()).getTaskVertices()[0];
+               ExecutionVertex ev32 = 
eg.getJobVertex(v3.getID()).getTaskVertices()[1];
+
+               eg.scheduleForExecution();
+
+               // Inputs not consumable on init
+               assertFalse(ev31.isInputConsumable(0));
+               assertFalse(ev31.isInputConsumable(1));
+
+               // One pipelined input consumable on data produced
+               IntermediateResultPartition partition11 = 
ev11.getProducedPartitions().values().iterator().next();
+               ev11.scheduleOrUpdateConsumers(new 
ResultPartitionID(partition11.getPartitionId(),
+                       ev11.getCurrentExecutionAttempt().getAttemptId()));
+               assertTrue(ev31.isInputConsumable(0));
+
+               // The blocking input not consumable if only one partition is 
FINISHED
+               ev21.getCurrentExecutionAttempt().markFinished();
+               assertFalse(ev31.isInputConsumable(1));
+
+               // The blocking input consumable if all partitions are FINISHED
+               ev22.getCurrentExecutionAttempt().markFinished();
+               assertTrue(ev31.isInputConsumable(1));
+
+               // Inputs not consumable after failover
+               ev11.fail(new Exception());
+               waitUntilJobRestarted(eg);
+               assertFalse(ev31.isInputConsumable(0));
+               assertFalse(ev31.isInputConsumable(1));
+       }
+
+       @Test
+       public void testInputConstraintANY() throws Exception {
+               JobVertex v1 = new JobVertex("vertex1");
+               JobVertex v2 = new JobVertex("vertex2");
+               JobVertex v3 = new JobVertex("vertex3");
+               v1.setParallelism(2);
+               v2.setParallelism(2);
+               v3.setParallelism(2);
+               v1.setInvokableClass(AbstractInvokable.class);
+               v2.setInvokableClass(AbstractInvokable.class);
+               v3.setInvokableClass(AbstractInvokable.class);
+               v3.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, 
ResultPartitionType.PIPELINED);
+               v3.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL, 
ResultPartitionType.BLOCKING);
+               List<JobVertex> ordered = Arrays.asList(v1, v2, v3);
+               ExecutionGraph eg = createExecutionGraph(ordered);
+               eg.setInputDependencyConstraint(InputDependencyConstraint.ANY);
+
+               ExecutionVertex ev11 = 
eg.getJobVertex(v1.getID()).getTaskVertices()[0];
+               ExecutionVertex ev12 = 
eg.getJobVertex(v1.getID()).getTaskVertices()[1];
+               ExecutionVertex ev21 = 
eg.getJobVertex(v2.getID()).getTaskVertices()[0];
+               ExecutionVertex ev22 = 
eg.getJobVertex(v2.getID()).getTaskVertices()[1];
+               ExecutionVertex ev31 = 
eg.getJobVertex(v3.getID()).getTaskVertices()[0];
+               ExecutionVertex ev32 = 
eg.getJobVertex(v3.getID()).getTaskVertices()[1];
+
+               eg.scheduleForExecution();
+
+               // Inputs constraint not satisfied on init
+               assertFalse(ev31.checkInputDependencyConstraints());
+
+               // Input1 consumable satisfies the constraint
+               IntermediateResultPartition partition11 = 
ev11.getProducedPartitions().values().iterator().next();
+               ev11.scheduleOrUpdateConsumers(new 
ResultPartitionID(partition11.getPartitionId(),
+                       ev11.getCurrentExecutionAttempt().getAttemptId()));
+               assertTrue(ev31.checkInputDependencyConstraints());
+
+               // Inputs constraint not satisfied after failover
+               ev11.fail(new Exception());
+               waitUntilJobRestarted(eg);
+               assertFalse(ev31.checkInputDependencyConstraints());
+
+               // Input2 consumable satisfies the constraint
+               waitUntilExecutionVertexState(ev21, ExecutionState.DEPLOYING, 
2000L);
+               waitUntilExecutionVertexState(ev22, ExecutionState.DEPLOYING, 
2000L);
+               ev21.getCurrentExecutionAttempt().markFinished();
+               ev22.getCurrentExecutionAttempt().markFinished();
+               assertTrue(ev31.isInputConsumable(1));
+       }
+
+       @Test
+       public void testInputConstraintALL() throws Exception {
+               JobVertex v1 = new JobVertex("vertex1");
+               JobVertex v2 = new JobVertex("vertex2");
+               JobVertex v3 = new JobVertex("vertex3");
+               v1.setParallelism(2);
+               v2.setParallelism(2);
+               v3.setParallelism(2);
+               v1.setInvokableClass(AbstractInvokable.class);
+               v2.setInvokableClass(AbstractInvokable.class);
+               v3.setInvokableClass(AbstractInvokable.class);
+               v3.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, 
ResultPartitionType.PIPELINED);
+               v3.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL, 
ResultPartitionType.BLOCKING);
+               List<JobVertex> ordered = Arrays.asList(v1, v2, v3);
+               ExecutionGraph eg = createExecutionGraph(ordered);
+               eg.setInputDependencyConstraint(InputDependencyConstraint.ALL);
+
+               ExecutionVertex ev11 = 
eg.getJobVertex(v1.getID()).getTaskVertices()[0];
+               ExecutionVertex ev12 = 
eg.getJobVertex(v1.getID()).getTaskVertices()[1];
+               ExecutionVertex ev21 = 
eg.getJobVertex(v2.getID()).getTaskVertices()[0];
+               ExecutionVertex ev22 = 
eg.getJobVertex(v2.getID()).getTaskVertices()[1];
+               ExecutionVertex ev31 = 
eg.getJobVertex(v3.getID()).getTaskVertices()[0];
+               ExecutionVertex ev32 = 
eg.getJobVertex(v3.getID()).getTaskVertices()[1];
+
+               eg.scheduleForExecution();
+
+               // Inputs constraint not satisfied on init
+               assertFalse(ev31.checkInputDependencyConstraints());
+
+               // Input1 consumable does not satisfy the constraint
+               IntermediateResultPartition partition11 = 
ev11.getProducedPartitions().values().iterator().next();
+               ev11.scheduleOrUpdateConsumers(new 
ResultPartitionID(partition11.getPartitionId(),
+                       ev11.getCurrentExecutionAttempt().getAttemptId()));
+               assertFalse(ev31.checkInputDependencyConstraints());
+
+               // Input2 consumable satisfies the constraint
+               ev21.getCurrentExecutionAttempt().markFinished();
+               ev22.getCurrentExecutionAttempt().markFinished();
+               assertTrue(ev31.isInputConsumable(1));
+
+               // Inputs constraint not satisfied after failover
+               ev11.fail(new Exception());
+               waitUntilJobRestarted(eg);
+               assertFalse(ev31.checkInputDependencyConstraints());
+       }
+
+       private static ExecutionGraph createExecutionGraph(List<JobVertex> 
ordered) throws Exception {
+               final JobID jobId = new JobID();
+               final String jobName = "Test Job Sample Name";
+
+               final SlotProvider slotProvider = new SimpleSlotProvider(jobId, 
20);
+
+               ExecutionGraph eg = new ExecutionGraph(
+                       new DummyJobInformation(
+                               jobId,
+                               jobName),
+                       TestingUtils.defaultExecutor(),
+                       TestingUtils.defaultExecutor(),
+                       AkkaUtils.getDefaultTimeout(),
+                       new FixedDelayRestartStrategy(1, 0),
+                       new RestartAllStrategy.Factory(),
+                       slotProvider);
+
+               eg.attachJobGraph(ordered);
+
+               return eg;
+       }
+
+       private void waitUntilJobRestarted(ExecutionGraph eg) throws Exception {
+               waitForAllExecutionsPredicate(eg,
+                       isInExecutionState(ExecutionState.CANCELING)
+                               .or(isInExecutionState(ExecutionState.CANCELED))
+                               .or(isInExecutionState(ExecutionState.FAILED))
+                               
.or(isInExecutionState(ExecutionState.FINISHED)),
+                       2000L);
+
+               for (ExecutionVertex ev : eg.getAllExecutionVertices()) {
+                       if (ev.getCurrentExecutionAttempt().getState() == 
ExecutionState.CANCELING) {
+                               
ev.getCurrentExecutionAttempt().cancelingComplete();
+                       }
+               }
+               waitUntilJobStatus(eg, JobStatus.RUNNING, 2000L);
+       }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartitionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartitionTest.java
new file mode 100644
index 00000000000..2cd00617d90
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartitionTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getExecutionVertex;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link IntermediateResultPartition}.
+ */
+public class IntermediateResultPartitionTest extends TestLogger {
+
+       @Test
+       public void testPipelinedPartitionConsumable() throws Exception {
+               ExecutionJobVertex jobVertex = getExecutionVertex(new 
JobVertexID(), new DirectScheduledExecutorService());
+               IntermediateResult result =
+                       new IntermediateResult(new IntermediateDataSetID(), 
jobVertex, 2, ResultPartitionType.PIPELINED);
+               ExecutionVertex vertex1 =
+                       new ExecutionVertex(jobVertex, 0, new 
IntermediateResult[]{result}, Time.minutes(1));
+               ExecutionVertex vertex2 =
+                       new ExecutionVertex(jobVertex, 1, new 
IntermediateResult[]{result}, Time.minutes(1));
+
+               IntermediateResultPartition partition1 = 
result.getPartitions()[0];
+               IntermediateResultPartition partition2 = 
result.getPartitions()[1];
+
+               // Not consumable on init
+               assertFalse(partition1.isConsumable());
+               assertFalse(partition2.isConsumable());
+
+               // Partition 1 consumable after data are produced
+               partition1.markDataProduced();
+               assertTrue(partition1.isConsumable());
+               assertFalse(partition2.isConsumable());
+
+               // Not consumable if failover happens
+               result.resetForNewExecution();
+               assertFalse(partition1.isConsumable());
+               assertFalse(partition2.isConsumable());
+       }
+
+       @Test
+       public void testBlockingPartitionConsumable() throws Exception {
+               ExecutionJobVertex jobVertex = getExecutionVertex(new 
JobVertexID(), new DirectScheduledExecutorService());
+               IntermediateResult result =
+                       new IntermediateResult(new IntermediateDataSetID(), 
jobVertex, 2, ResultPartitionType.BLOCKING);
+               ExecutionVertex vertex1 =
+                       new ExecutionVertex(jobVertex, 0, new 
IntermediateResult[]{result}, Time.minutes(1));
+               ExecutionVertex vertex2 =
+                       new ExecutionVertex(jobVertex, 1, new 
IntermediateResult[]{result}, Time.minutes(1));
+
+               IntermediateResultPartition partition1 = 
result.getPartitions()[0];
+               IntermediateResultPartition partition2 = 
result.getPartitions()[1];
+
+               // Not consumable on init
+               assertFalse(partition1.isConsumable());
+               assertFalse(partition2.isConsumable());
+
+               // Not consumable if only one partition is FINISHED
+               partition1.markFinished();
+               assertFalse(partition1.isConsumable());
+               assertFalse(partition2.isConsumable());
+
+               // Consumable after all partitions are FINISHED
+               partition2.markFinished();
+               assertTrue(partition1.isConsumable());
+               assertTrue(partition2.isConsumable());
+
+               // Not consumable if failover happens
+               result.resetForNewExecution();
+               assertFalse(partition1.isConsumable());
+               assertFalse(partition2.isConsumable());
+       }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to