hvanhovell commented on a change in pull request #27266: [SPARK-22590][SQL] 
Copy sparkContext.localproperties to child thread in 
BroadcastExchangeExec.executionContext
URL: https://github.com/apache/spark/pull/27266#discussion_r371153263
 
 

 ##########
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
 ##########
 @@ -75,77 +75,74 @@ case class BroadcastExchangeExec(
   private[sql] lazy val relationFuture: Future[broadcast.Broadcast[Any]] = {
     // relationFuture is used in "doExecute". Therefore we can get the 
execution id correctly here.
     val executionId = 
sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
-    val task = new Callable[broadcast.Broadcast[Any]]() {
-      override def call(): broadcast.Broadcast[Any] = {
-        // This will run in another thread. Set the execution id so that we 
can connect these jobs
-        // with the correct execution.
-        SQLExecution.withExecutionId(sqlContext.sparkSession, executionId) {
-          try {
-            // Setup a job group here so later it may get cancelled by groupId 
if necessary.
-            sparkContext.setJobGroup(runId.toString, s"broadcast exchange 
(runId $runId)",
-              interruptOnCancel = true)
-            val beforeCollect = System.nanoTime()
-            // Use executeCollect/executeCollectIterator to avoid conversion 
to Scala types
-            val (numRows, input) = child.executeCollectIterator()
-            if (numRows >= 512000000) {
-              throw new SparkException(
-                s"Cannot broadcast the table with 512 million or more rows: 
$numRows rows")
-            }
-
-            val beforeBuild = System.nanoTime()
-            longMetric("collectTime") += NANOSECONDS.toMillis(beforeBuild - 
beforeCollect)
-
-            // Construct the relation.
-            val relation = mode.transform(input, Some(numRows))
-
-            val dataSize = relation match {
-              case map: HashedRelation =>
-                map.estimatedSize
-              case arr: Array[InternalRow] =>
-                arr.map(_.asInstanceOf[UnsafeRow].getSizeInBytes.toLong).sum
-              case _ =>
-                throw new SparkException("[BUG] BroadcastMode.transform 
returned unexpected " +
-                  s"type: ${relation.getClass.getName}")
-            }
-
-            longMetric("dataSize") += dataSize
-            if (dataSize >= (8L << 30)) {
-              throw new SparkException(
-                s"Cannot broadcast the table that is larger than 8GB: 
${dataSize >> 30} GB")
-            }
-
-            val beforeBroadcast = System.nanoTime()
-            longMetric("buildTime") += NANOSECONDS.toMillis(beforeBroadcast - 
beforeBuild)
-
-            // Broadcast the relation
-            val broadcasted = sparkContext.broadcast(relation)
-            longMetric("broadcastTime") += NANOSECONDS.toMillis(
-              System.nanoTime() - beforeBroadcast)
-
-            SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, 
metrics.values.toSeq)
-            promise.success(broadcasted)
-            broadcasted
-          } catch {
-            // SPARK-24294: To bypass scala bug: 
https://github.com/scala/bug/issues/9554, we throw
-            // SparkFatalException, which is a subclass of Exception. 
ThreadUtils.awaitResult
-            // will catch this exception and re-throw the wrapped fatal 
throwable.
-            case oe: OutOfMemoryError =>
-              val ex = new SparkFatalException(
-                new OutOfMemoryError("Not enough memory to build and broadcast 
the table to all " +
-                  "worker nodes. As a workaround, you can either disable 
broadcast by setting " +
-                  s"${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key} to -1 or 
increase the spark " +
-                  s"driver memory by setting ${SparkLauncher.DRIVER_MEMORY} to 
a higher value.")
-                  .initCause(oe.getCause))
-              promise.failure(ex)
-              throw ex
-            case e if !NonFatal(e) =>
-              val ex = new SparkFatalException(e)
-              promise.failure(ex)
-              throw ex
-            case e: Throwable =>
-              promise.failure(e)
-              throw e
+    val task = SQLExecution.withThreadLocalCaptured[broadcast.Broadcast[Any]](
 
 Review comment:
   That is expected. There should already be an active query execution here, if 
there is then the configurations should also be set.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to