cloud-fan commented on code in PR #45234:
URL: https://github.com/apache/spark/pull/45234#discussion_r1684101953


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala:
##########
@@ -47,6 +47,15 @@ import org.apache.spark.util.random.XORShiftRandom
  */
 trait ShuffleExchangeLike extends Exchange {
 
+  /**
+   * The asynchronous job that materializes the shuffle. It also does the 
preparations work,
+   * such as waiting for the subqueries.
+   */
+  @transient private lazy val shuffleFuture: Future[MapOutputStatistics] = 
executeQuery {
+    materializationStarted.set(true)

Review Comment:
   I think we need a bit of synchronization here. The shuffle node should have 
two fields: `isCancelled` flag and the shuffle job `Future`.
   - When we cancel a shuffle, we lock on the shuffle node, and set 
`isCancelled` flag to true. Then if the shuffle job `Future` is present, we 
cancel it.
   - When we are going to submit a shuffle, we lock on the shuffle node. Then: 
if the `isCancelled` flag is true, fail immediately, otherwise, submit the 
shuffle job and set the `Future` field.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to