TisonKun closed pull request #6353: [FLINK-9875][runtime] Add concurrent 
creation of execution job vertex
URL: https://github.com/apache/flink/pull/6353
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index acb1e16fe71..42440182293 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -88,6 +88,7 @@
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
@@ -808,6 +809,55 @@ public Executor getFutureExecutor() {
        //  Actions
        // 
--------------------------------------------------------------------------------------------
 
+       private void createExecutionJobVertex(List<JobVertex> 
topologiallySorted) throws JobException {
+               final List<CompletableFuture<JobException>> futures = new 
LinkedList<>();
+               final long createTimestamp = System.currentTimeMillis();
+
+               for (JobVertex jobVertex: topologiallySorted) {
+                       futures.add(CompletableFuture.supplyAsync(() -> {
+                               try {
+                                       ExecutionJobVertex ejv = new 
ExecutionJobVertex(
+                                               this,
+                                               jobVertex,
+                                               1,
+                                               rpcTimeout,
+                                               globalModVersion,
+                                               createTimestamp);
+                                       ExecutionJobVertex previousTask = 
tasks.putIfAbsent(jobVertex.getID(), ejv);
+                                       if (previousTask != null) {
+                                               throw new JobException(
+                                                       String.format(
+                                                               "Encountered 
two job vertices with ID %s : previous=[%s] / new=[%s]",
+                                                               
jobVertex.getID(), ejv, previousTask));
+                                       }
+                                       return null;
+                               } catch (JobException e) {
+                                       return e;
+                               }
+                       }, futureExecutor));
+               }
+
+               try {
+                       // wait for all futures done
+                       Collection<JobException> exceptions = 
FutureUtils.combineAll(futures).get();
+
+                       // suppress all optional exceptions
+                       Exception suppressedException = null;
+                       for (Exception exception : exceptions) {
+                               if (exception != null) {
+                                       suppressedException = 
ExceptionUtils.firstOrSuppressed(exception, suppressedException);
+                               }
+                       }
+
+                       if (suppressedException != null) {
+                               throw suppressedException;
+                       }
+               } catch (Exception e) {
+                       throw new JobException("Could not create execution job 
vertex.", e);
+               }
+       }
+
+
        public void attachJobGraph(List<JobVertex> topologiallySorted) throws 
JobException {
 
                LOG.debug("Attaching {} topologically sorted vertices to 
existing job graph with {} " +
@@ -815,7 +865,8 @@ public void attachJobGraph(List<JobVertex> 
topologiallySorted) throws JobExcepti
                                topologiallySorted.size(), tasks.size(), 
intermediateResults.size());
 
                final ArrayList<ExecutionJobVertex> newExecJobVertices = new 
ArrayList<>(topologiallySorted.size());
-               final long createTimestamp = System.currentTimeMillis();
+
+               createExecutionJobVertex(topologiallySorted);
 
                for (JobVertex jobVertex : topologiallySorted) {
 
@@ -823,23 +874,10 @@ public void attachJobGraph(List<JobVertex> 
topologiallySorted) throws JobExcepti
                                this.isStoppable = false;
                        }
 
-                       // create the execution job vertex and attach it to the 
graph
-                       ExecutionJobVertex ejv = new ExecutionJobVertex(
-                               this,
-                               jobVertex,
-                               1,
-                               rpcTimeout,
-                               globalModVersion,
-                               createTimestamp);
+                       ExecutionJobVertex ejv = tasks.get(jobVertex.getID());
 
                        ejv.connectToPredecessors(this.intermediateResults);
 
-                       ExecutionJobVertex previousTask = 
this.tasks.putIfAbsent(jobVertex.getID(), ejv);
-                       if (previousTask != null) {
-                               throw new 
JobException(String.format("Encountered two job vertices with ID %s : 
previous=[%s] / new=[%s]",
-                                               jobVertex.getID(), ejv, 
previousTask));
-                       }
-
                        for (IntermediateResult res : 
ejv.getProducedDataSets()) {
                                IntermediateResult previousDataSet = 
this.intermediateResults.putIfAbsent(res.getId(), res);
                                if (previousDataSet != null) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index e385318b810..5d9a8e2f711 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -174,9 +174,11 @@ public ExecutionVertex(
                        timeout);
 
                // create a co-location scheduling hint, if necessary
-               CoLocationGroup clg = jobVertex.getCoLocationGroup();
+               final CoLocationGroup clg = jobVertex.getCoLocationGroup();
                if (clg != null) {
-                       this.locationConstraint = 
clg.getLocationConstraint(subTaskIndex);
+                       synchronized (clg) {
+                               this.locationConstraint = 
clg.getLocationConstraint(subTaskIndex);
+                       }
                }
                else {
                        this.locationConstraint = null;


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to