This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch release-1.18 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.18 by this push: new aacc735806a [FLINK-35159] Transition ExecutionGraph to RUNNING after slot assignment aacc735806a is described below commit aacc735806acf1d63fa732706e079bc2ca1bb4fc Author: Chesnay Schepler <ches...@apache.org> AuthorDate: Thu Apr 18 19:10:42 2024 +0200 [FLINK-35159] Transition ExecutionGraph to RUNNING after slot assignment --- .../flink/runtime/scheduler/adaptive/AdaptiveScheduler.java | 1 - .../flink/runtime/scheduler/adaptive/CreatingExecutionGraph.java | 5 ++++- .../runtime/scheduler/adaptive/CreatingExecutionGraphTest.java | 8 ++++++-- 3 files changed, 10 insertions(+), 4 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java index 4ee22c95848..34539d23e04 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java @@ -1096,7 +1096,6 @@ public class AdaptiveScheduler executionGraphWithVertexParallelism.getExecutionGraph(); executionGraph.start(componentMainThreadExecutor); - executionGraph.transitionToRunning(); executionGraph.setInternalTaskFailuresListener( new UpdateSchedulerNgOnInternalFailuresListener(this)); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraph.java index c876fe6ad1d..21055945372 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraph.java @@ -124,7 +124,6 @@ public class CreatingExecutionGraph implements State { operatorCoordinatorHandlerFactory.create(executionGraph, context); operatorCoordinatorHandler.initializeOperatorCoordinators( context.getMainThreadExecutor()); - operatorCoordinatorHandler.startAllOperatorCoordinators(); final String updatedPlan = JsonPlanGenerator.generatePlan( executionGraph.getJobID(), @@ -138,6 +137,10 @@ public class CreatingExecutionGraph implements State { .iterator(), executionGraphWithVertexParallelism.getVertexParallelism()); executionGraph.setJsonPlan(updatedPlan); + + executionGraph.transitionToRunning(); + operatorCoordinatorHandler.startAllOperatorCoordinators(); + context.goToExecuting( result.getExecutionGraph(), executionGraphHandler, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraphTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraphTest.java index 2375a194206..69e5f589b19 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraphTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraphTest.java @@ -157,8 +157,12 @@ public class CreatingExecutionGraphTest extends TestLogger { ignored -> CreatingExecutionGraph.AssignmentResult.notPossible()); context.setExpectWaitingForResources(); - executionGraphWithVertexParallelismFuture.complete( - getGraph(new StateTrackingMockExecutionGraph())); + final StateTrackingMockExecutionGraph executionGraph = + new StateTrackingMockExecutionGraph(); + + executionGraphWithVertexParallelismFuture.complete(getGraph(executionGraph)); + + assertThat(executionGraph.getState()).isEqualTo(JobStatus.INITIALIZING); } }