TisonKun closed pull request #6353: [FLINK-9875][runtime] Add concurrent creation of execution job vertex URL: https://github.com/apache/flink/pull/6353
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index acb1e16fe71..42440182293 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -88,6 +88,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.NoSuchElementException; @@ -808,6 +809,55 @@ public Executor getFutureExecutor() { // Actions // -------------------------------------------------------------------------------------------- + private void createExecutionJobVertex(List<JobVertex> topologiallySorted) throws JobException { + final List<CompletableFuture<JobException>> futures = new LinkedList<>(); + final long createTimestamp = System.currentTimeMillis(); + + for (JobVertex jobVertex: topologiallySorted) { + futures.add(CompletableFuture.supplyAsync(() -> { + try { + ExecutionJobVertex ejv = new ExecutionJobVertex( + this, + jobVertex, + 1, + rpcTimeout, + globalModVersion, + createTimestamp); + ExecutionJobVertex previousTask = tasks.putIfAbsent(jobVertex.getID(), ejv); + if (previousTask != null) { + throw new JobException( + String.format( + "Encountered two job vertices with ID %s : previous=[%s] / new=[%s]", + jobVertex.getID(), ejv, previousTask)); + } + return null; + } catch (JobException e) { + return e; + } + }, futureExecutor)); + } + + try { + // wait for all futures done + Collection<JobException> exceptions = FutureUtils.combineAll(futures).get(); + + // suppress all optional exceptions + Exception suppressedException = null; + for (Exception exception : exceptions) { + if (exception != null) { + suppressedException = ExceptionUtils.firstOrSuppressed(exception, suppressedException); + } + } + + if (suppressedException != null) { + throw suppressedException; + } + } catch (Exception e) { + throw new JobException("Could not create execution job vertex.", e); + } + } + + public void attachJobGraph(List<JobVertex> topologiallySorted) throws JobException { LOG.debug("Attaching {} topologically sorted vertices to existing job graph with {} " + @@ -815,7 +865,8 @@ public void attachJobGraph(List<JobVertex> topologiallySorted) throws JobExcepti topologiallySorted.size(), tasks.size(), intermediateResults.size()); final ArrayList<ExecutionJobVertex> newExecJobVertices = new ArrayList<>(topologiallySorted.size()); - final long createTimestamp = System.currentTimeMillis(); + + createExecutionJobVertex(topologiallySorted); for (JobVertex jobVertex : topologiallySorted) { @@ -823,23 +874,10 @@ public void attachJobGraph(List<JobVertex> topologiallySorted) throws JobExcepti this.isStoppable = false; } - // create the execution job vertex and attach it to the graph - ExecutionJobVertex ejv = new ExecutionJobVertex( - this, - jobVertex, - 1, - rpcTimeout, - globalModVersion, - createTimestamp); + ExecutionJobVertex ejv = tasks.get(jobVertex.getID()); ejv.connectToPredecessors(this.intermediateResults); - ExecutionJobVertex previousTask = this.tasks.putIfAbsent(jobVertex.getID(), ejv); - if (previousTask != null) { - throw new JobException(String.format("Encountered two job vertices with ID %s : previous=[%s] / new=[%s]", - jobVertex.getID(), ejv, previousTask)); - } - for (IntermediateResult res : ejv.getProducedDataSets()) { IntermediateResult previousDataSet = this.intermediateResults.putIfAbsent(res.getId(), res); if (previousDataSet != null) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java index e385318b810..5d9a8e2f711 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java @@ -174,9 +174,11 @@ public ExecutionVertex( timeout); // create a co-location scheduling hint, if necessary - CoLocationGroup clg = jobVertex.getCoLocationGroup(); + final CoLocationGroup clg = jobVertex.getCoLocationGroup(); if (clg != null) { - this.locationConstraint = clg.getLocationConstraint(subTaskIndex); + synchronized (clg) { + this.locationConstraint = clg.getLocationConstraint(subTaskIndex); + } } else { this.locationConstraint = null; ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services