cloud-fan commented on a change in pull request #24595: [SPARK-20774][CORE]
Cancel the running broadcast execution on BroadcastTimeout
URL: https://github.com/apache/spark/pull/24595#discussion_r283702745
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
##########
@@ -143,14 +152,20 @@ case class BroadcastExchangeExec(
override protected[sql] def doExecuteBroadcast[T](): broadcast.Broadcast[T]
= {
try {
- ThreadUtils.awaitResult(relationFuture,
timeout).asInstanceOf[broadcast.Broadcast[T]]
+ relationFuture.get(timeout.toSeconds,
TimeUnit.SECONDS).asInstanceOf[broadcast.Broadcast[T]]
} catch {
case ex: TimeoutException =>
- logError(s"Could not execute broadcast in ${timeout.toSeconds} secs.",
ex)
+ if (!relationFuture.isDone) {
+ sparkContext.cancelJobGroup(runId.toString)
+ relationFuture.cancel(true)
+ }
throw new SparkException(s"Could not execute broadcast in
${timeout.toSeconds} secs. " +
s"You can increase the timeout for broadcasts via
${SQLConf.BROADCAST_TIMEOUT.key} or " +
s"disable broadcast join by setting
${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key} to -1",
ex)
+ case NonFatal(ex) =>
Review comment:
Previously we throw the original exception, now we wrap it with
`SparkException`. Is there a reason for this change? This happens at the driver
side and I think `SparkException` is for executor side errors?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]