ajithme commented on a change in pull request #27266: [SPARK-22590][SQL] Copy
sparkContext.localproperties to child thread in
BroadcastExchangeExec.executionContext
URL: https://github.com/apache/spark/pull/27266#discussion_r371126600
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
##########
@@ -75,77 +75,74 @@ case class BroadcastExchangeExec(
private[sql] lazy val relationFuture: Future[broadcast.Broadcast[Any]] = {
// relationFuture is used in "doExecute". Therefore we can get the
execution id correctly here.
val executionId =
sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
- val task = new Callable[broadcast.Broadcast[Any]]() {
- override def call(): broadcast.Broadcast[Any] = {
- // This will run in another thread. Set the execution id so that we
can connect these jobs
- // with the correct execution.
- SQLExecution.withExecutionId(sqlContext.sparkSession, executionId) {
- try {
- // Setup a job group here so later it may get cancelled by groupId
if necessary.
- sparkContext.setJobGroup(runId.toString, s"broadcast exchange
(runId $runId)",
- interruptOnCancel = true)
- val beforeCollect = System.nanoTime()
- // Use executeCollect/executeCollectIterator to avoid conversion
to Scala types
- val (numRows, input) = child.executeCollectIterator()
- if (numRows >= 512000000) {
- throw new SparkException(
- s"Cannot broadcast the table with 512 million or more rows:
$numRows rows")
- }
-
- val beforeBuild = System.nanoTime()
- longMetric("collectTime") += NANOSECONDS.toMillis(beforeBuild -
beforeCollect)
-
- // Construct the relation.
- val relation = mode.transform(input, Some(numRows))
-
- val dataSize = relation match {
- case map: HashedRelation =>
- map.estimatedSize
- case arr: Array[InternalRow] =>
- arr.map(_.asInstanceOf[UnsafeRow].getSizeInBytes.toLong).sum
- case _ =>
- throw new SparkException("[BUG] BroadcastMode.transform
returned unexpected " +
- s"type: ${relation.getClass.getName}")
- }
-
- longMetric("dataSize") += dataSize
- if (dataSize >= (8L << 30)) {
- throw new SparkException(
- s"Cannot broadcast the table that is larger than 8GB:
${dataSize >> 30} GB")
- }
-
- val beforeBroadcast = System.nanoTime()
- longMetric("buildTime") += NANOSECONDS.toMillis(beforeBroadcast -
beforeBuild)
-
- // Broadcast the relation
- val broadcasted = sparkContext.broadcast(relation)
- longMetric("broadcastTime") += NANOSECONDS.toMillis(
- System.nanoTime() - beforeBroadcast)
-
- SQLMetrics.postDriverMetricUpdates(sparkContext, executionId,
metrics.values.toSeq)
- promise.success(broadcasted)
- broadcasted
- } catch {
- // SPARK-24294: To bypass scala bug:
https://github.com/scala/bug/issues/9554, we throw
- // SparkFatalException, which is a subclass of Exception.
ThreadUtils.awaitResult
- // will catch this exception and re-throw the wrapped fatal
throwable.
- case oe: OutOfMemoryError =>
- val ex = new SparkFatalException(
- new OutOfMemoryError("Not enough memory to build and broadcast
the table to all " +
- "worker nodes. As a workaround, you can either disable
broadcast by setting " +
- s"${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key} to -1 or
increase the spark " +
- s"driver memory by setting ${SparkLauncher.DRIVER_MEMORY} to
a higher value.")
- .initCause(oe.getCause))
- promise.failure(ex)
- throw ex
- case e if !NonFatal(e) =>
- val ex = new SparkFatalException(e)
- promise.failure(ex)
- throw ex
- case e: Throwable =>
- promise.failure(e)
- throw e
+ val task = SQLExecution.withThreadLocalCaptured[broadcast.Broadcast[Any]](
Review comment:
@hvanhovell Do you mean i merge the SQLExecution.withThreadLocalCaptured and
SQLExecution.withExecutionId into single method.? Please correct me if i
understood wrong
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]