This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.18
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.18 by this push:
     new aacc735806a [FLINK-35159] Transition ExecutionGraph to RUNNING after 
slot assignment
aacc735806a is described below

commit aacc735806acf1d63fa732706e079bc2ca1bb4fc
Author: Chesnay Schepler <ches...@apache.org>
AuthorDate: Thu Apr 18 19:10:42 2024 +0200

    [FLINK-35159] Transition ExecutionGraph to RUNNING after slot assignment
---
 .../flink/runtime/scheduler/adaptive/AdaptiveScheduler.java       | 1 -
 .../flink/runtime/scheduler/adaptive/CreatingExecutionGraph.java  | 5 ++++-
 .../runtime/scheduler/adaptive/CreatingExecutionGraphTest.java    | 8 ++++++--
 3 files changed, 10 insertions(+), 4 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
index 4ee22c95848..34539d23e04 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
@@ -1096,7 +1096,6 @@ public class AdaptiveScheduler
                 executionGraphWithVertexParallelism.getExecutionGraph();
 
         executionGraph.start(componentMainThreadExecutor);
-        executionGraph.transitionToRunning();
 
         executionGraph.setInternalTaskFailuresListener(
                 new UpdateSchedulerNgOnInternalFailuresListener(this));
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraph.java
index c876fe6ad1d..21055945372 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraph.java
@@ -124,7 +124,6 @@ public class CreatingExecutionGraph implements State {
                         
operatorCoordinatorHandlerFactory.create(executionGraph, context);
                 operatorCoordinatorHandler.initializeOperatorCoordinators(
                         context.getMainThreadExecutor());
-                operatorCoordinatorHandler.startAllOperatorCoordinators();
                 final String updatedPlan =
                         JsonPlanGenerator.generatePlan(
                                 executionGraph.getJobID(),
@@ -138,6 +137,10 @@ public class CreatingExecutionGraph implements State {
                                                 .iterator(),
                                 
executionGraphWithVertexParallelism.getVertexParallelism());
                 executionGraph.setJsonPlan(updatedPlan);
+
+                executionGraph.transitionToRunning();
+                operatorCoordinatorHandler.startAllOperatorCoordinators();
+
                 context.goToExecuting(
                         result.getExecutionGraph(),
                         executionGraphHandler,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraphTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraphTest.java
index 2375a194206..69e5f589b19 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraphTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraphTest.java
@@ -157,8 +157,12 @@ public class CreatingExecutionGraphTest extends TestLogger 
{
                     ignored -> 
CreatingExecutionGraph.AssignmentResult.notPossible());
             context.setExpectWaitingForResources();
 
-            executionGraphWithVertexParallelismFuture.complete(
-                    getGraph(new StateTrackingMockExecutionGraph()));
+            final StateTrackingMockExecutionGraph executionGraph =
+                    new StateTrackingMockExecutionGraph();
+
+            
executionGraphWithVertexParallelismFuture.complete(getGraph(executionGraph));
+
+            
assertThat(executionGraph.getState()).isEqualTo(JobStatus.INITIALIZING);
         }
     }
 

Reply via email to