uros-b commented on code in PR #56700:
URL: https://github.com/apache/spark/pull/56700#discussion_r3461701004
##########
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/DataflowGraphTransformer.scala:
##########
@@ -134,159 +134,165 @@ class DataflowGraphTransformer(graph: DataflowGraph)
extends AutoCloseable {
val failedFlowsQueue = new ConcurrentLinkedQueue[ResolutionFailedFlow]()
val failedDependentFlows = new ConcurrentHashMap[TableIdentifier,
Seq[ResolutionFailedFlow]]()
- var futures = ArrayBuffer[Future[Unit]]()
+ val completionService = new ExecutorCompletionService[Unit](executor)
+ var outstanding = 0
val toBeResolvedFlows = new ConcurrentLinkedDeque[Flow]()
toBeResolvedFlows.addAll(flows.asJava)
- while (futures.nonEmpty || toBeResolvedFlows.peekFirst() != null) {
- val (done, notDone) = futures.partition(_.isDone)
- // Explicitly call future.get() to propagate exceptions one by one if any
+ // Waits on a finished resolution task and propagates its exception, if
any.
+ def reap(finished: Future[Unit]): Unit = {
try {
- done.foreach(_.get())
+ finished.get()
} catch {
case exn: ExecutionException =>
// Computation threw the exception that is the cause of exn
throw exn.getCause
}
- futures = notDone
- val flowOpt = {
- // We only schedule [[batchSize]] number of flows in parallel.
- if (futures.size < batchSize) {
- Option(toBeResolvedFlows.pollFirst())
- } else {
- None
- }
+ outstanding -= 1
+ }
+
+ while (outstanding > 0 || toBeResolvedFlows.peekFirst() != null) {
+ // Reap every resolution task that has already finished, without
blocking.
+ var finished = completionService.poll()
+ while (finished != null) {
+ reap(finished)
+ finished = completionService.poll()
}
- flowOpt.foreach { flow =>
- futures.append(
- executor.submit(
- () =>
+ // We only schedule [[batchSize]] number of flows in parallel.
+ if (outstanding < batchSize && toBeResolvedFlows.peekFirst() != null) {
+ val flow = toBeResolvedFlows.pollFirst()
+ outstanding += 1
+ completionService.submit(
+ () =>
+ try {
try {
- try {
- // Note: Flow don't need their inputs passed, so for now we
send empty Seq.
- val result = transformer(flow, Seq.empty)
- require(
- result.forall(_.isInstanceOf[ResolvedFlow]),
- "transformer must return a Seq[Flow]"
- )
+ // Note: Flow don't need their inputs passed, so for now we
send empty Seq.
+ val result = transformer(flow, Seq.empty)
+ require(
+ result.forall(_.isInstanceOf[ResolvedFlow]),
+ "transformer must return a Seq[Flow]"
+ )
- val transformedFlows =
result.map(_.asInstanceOf[ResolvedFlow])
- resolvedFlowsMap.put(flow.identifier, transformedFlows)
- resolvedFlows.addAll(transformedFlows.asJava)
- } catch {
- case e: TransformNodeRetryableException =>
- val datasetIdentifier = e.datasetIdentifier
- failedDependentFlows.compute(
- datasetIdentifier,
- (_, flows) => {
- // Don't add the input flow back but the failed flow
object
- // back which has relevant failure information.
- val failedFlow = e.failedNode
- if (flows == null) {
- Seq(failedFlow)
- } else {
- flows :+ failedFlow
- }
+ val transformedFlows = result.map(_.asInstanceOf[ResolvedFlow])
+ resolvedFlowsMap.put(flow.identifier, transformedFlows)
+ resolvedFlows.addAll(transformedFlows.asJava)
+ } catch {
+ case e: TransformNodeRetryableException =>
+ val datasetIdentifier = e.datasetIdentifier
+ failedDependentFlows.compute(
+ datasetIdentifier,
+ (_, flows) => {
+ // Don't add the input flow back but the failed flow
object
+ // back which has relevant failure information.
+ val failedFlow = e.failedNode
+ if (flows == null) {
+ Seq(failedFlow)
+ } else {
+ flows :+ failedFlow
}
- )
- // Between the time the flow started and finished
resolving, perhaps the
- // dependent dataset was resolved
- resolvedFlowDestinationsMap.computeIfPresent(
- datasetIdentifier,
- (_, resolved) => {
- if (resolved) {
- // Check if the dataset that the flow is dependent
on has been resolved
- // and if so, remove all dependent flows from the
failedDependentFlows and
- // add them to the toBeResolvedFlows queue for retry.
- failedDependentFlows.computeIfPresent(
- datasetIdentifier,
- (_, toRetryFlows) => {
-
toRetryFlows.foreach(toBeResolvedFlows.addFirst(_))
- null
- }
- )
- }
- resolved
+ }
+ )
+ // Between the time the flow started and finished resolving,
perhaps the
+ // dependent dataset was resolved
+ resolvedFlowDestinationsMap.computeIfPresent(
+ datasetIdentifier,
+ (_, resolved) => {
+ if (resolved) {
+ // Check if the dataset that the flow is dependent on
has been resolved
+ // and if so, remove all dependent flows from the
failedDependentFlows and
+ // add them to the toBeResolvedFlows queue for retry.
+ failedDependentFlows.computeIfPresent(
+ datasetIdentifier,
+ (_, toRetryFlows) => {
+ toRetryFlows.foreach(toBeResolvedFlows.addFirst(_))
+ null
+ }
+ )
}
+ resolved
+ }
+ )
+ case other: Throwable => throw other
+ }
+ // If all flows to this particular destination are resolved,
move to the destination
+ // node transformer
+ if (flowsTo(flow.destinationIdentifier).forall({
flowToDestination =>
+ resolvedFlowsMap.containsKey(flowToDestination.identifier)
+ })) {
+ // If multiple flows completed in parallel, ensure we resolve
the destination only
+ // once by electing a leader via computeIfAbsent
+ var isCurrentThreadLeader = false
+
resolvedFlowDestinationsMap.computeIfAbsent(flow.destinationIdentifier, _ => {
+ isCurrentThreadLeader = true
+ // Set initial value as false as flow destination is not
resolved yet.
+ false
+ })
+ if (isCurrentThreadLeader) {
+ if (tableMap.contains(flow.destinationIdentifier)) {
+ val transformed =
+ transformer(
+ tableMap(flow.destinationIdentifier),
+ flowsTo(flow.destinationIdentifier)
+ )
+ resolvedTables.addAll(
+ transformed.collect { case t: Table => t }.asJava
)
- case other: Throwable => throw other
- }
- // If all flows to this particular destination are resolved,
move to the destination
- // node transformer
- if (flowsTo(flow.destinationIdentifier).forall({
flowToDestination =>
- resolvedFlowsMap.containsKey(flowToDestination.identifier)
- })) {
- // If multiple flows completed in parallel, ensure we
resolve the destination only
- // once by electing a leader via computeIfAbsent
- var isCurrentThreadLeader = false
-
resolvedFlowDestinationsMap.computeIfAbsent(flow.destinationIdentifier, _ => {
- isCurrentThreadLeader = true
- // Set initial value as false as flow destination is not
resolved yet.
- false
- })
- if (isCurrentThreadLeader) {
- if (tableMap.contains(flow.destinationIdentifier)) {
+ resolvedFlows.addAll(
+ transformed.collect { case f: ResolvedFlow => f }.asJava
+ )
+ } else if (viewMap.contains(flow.destinationIdentifier)) {
+ resolvedViews.addAll {
val transformed =
transformer(
- tableMap(flow.destinationIdentifier),
+ viewMap(flow.destinationIdentifier),
flowsTo(flow.destinationIdentifier)
)
- resolvedTables.addAll(
- transformed.collect { case t: Table => t }.asJava
- )
- resolvedFlows.addAll(
- transformed.collect { case f: ResolvedFlow => f
}.asJava
- )
- } else if (viewMap.contains(flow.destinationIdentifier)) {
- resolvedViews.addAll {
- val transformed =
- transformer(
- viewMap(flow.destinationIdentifier),
- flowsTo(flow.destinationIdentifier)
- )
- transformed.map(_.asInstanceOf[View]).asJava
- }
- } else if (sinkMap.contains(flow.destinationIdentifier)) {
- resolvedSinks.addAll {
- val transformed =
- transformer(
- sinkMap(flow.destinationIdentifier),
flowsTo(flow.destinationIdentifier)
- )
- require(
- transformed.forall(_.isInstanceOf[Sink]),
- "transformer must return a Seq[Sink]"
+ transformed.map(_.asInstanceOf[View]).asJava
+ }
+ } else if (sinkMap.contains(flow.destinationIdentifier)) {
+ resolvedSinks.addAll {
+ val transformed =
+ transformer(
+ sinkMap(flow.destinationIdentifier),
flowsTo(flow.destinationIdentifier)
)
- transformed.map(_.asInstanceOf[Sink]).asJava
- }
- } else {
- throw new IllegalArgumentException(
- s"Unsupported destination
${flow.destinationIdentifier.unquotedString}" +
- s" in flow: ${flow.displayName} at transformDownNodes"
+ require(
+ transformed.forall(_.isInstanceOf[Sink]),
+ "transformer must return a Seq[Sink]"
)
+ transformed.map(_.asInstanceOf[Sink]).asJava
}
- // Set flow destination as resolved now.
- resolvedFlowDestinationsMap.computeIfPresent(
- flow.destinationIdentifier,
- (_, _) => {
- // If there are any other node failures dependent on
this destination, retry
- // them
- failedDependentFlows.computeIfPresent(
- flow.destinationIdentifier,
- (_, toRetryFlows) => {
- toRetryFlows.foreach(toBeResolvedFlows.addFirst(_))
- null
- }
- )
- true
- }
+ } else {
+ throw new IllegalArgumentException(
+ s"Unsupported destination
${flow.destinationIdentifier.unquotedString}" +
+ s" in flow: ${flow.displayName} at transformDownNodes"
)
}
+ // Set flow destination as resolved now.
+ resolvedFlowDestinationsMap.computeIfPresent(
+ flow.destinationIdentifier,
+ (_, _) => {
+ // If there are any other node failures dependent on
this destination, retry
+ // them
+ failedDependentFlows.computeIfPresent(
+ flow.destinationIdentifier,
+ (_, toRetryFlows) => {
+ toRetryFlows.foreach(toBeResolvedFlows.addFirst(_))
+ null
+ }
+ )
+ true
+ }
+ )
}
- } catch {
- case ex: TransformNodeFailedException =>
failedFlowsQueue.add(ex.failedNode)
}
- )
+ } catch {
+ case ex: TransformNodeFailedException =>
failedFlowsQueue.add(ex.failedNode)
+ }
)
+ } else if (outstanding > 0) {
+ // Nothing finished and nothing could be scheduled, but tasks are
still running:
+ // block until the next one finishes instead of busy-spinning on
Future.isDone.
+ reap(completionService.take())
Review Comment:
Nit: the `else if (outstanding > 0)` guard is effectively always true when
reached. Control only reaches it when the if is false, i.e. `outstanding >=
batchSize || queue.isEmpty`. Since `batchSize >= 1`, the first disjunct implies
outstanding > 0; and if the queue is empty the loop invariant guarantees
outstanding > 0. So it could just be a plain `else`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]