Shockang commented on a change in pull request #24595:
URL: https://github.com/apache/spark/pull/24595#discussion_r669236306
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
##########
@@ -67,68 +70,74 @@ case class BroadcastExchangeExec(
@transient
private lazy val relationFuture: Future[broadcast.Broadcast[Any]] = {
- // broadcastFuture is used in "doExecute". Therefore we can get the
execution id correctly here.
+ // relationFuture is used in "doExecute". Therefore we can get the
execution id correctly here.
val executionId =
sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
- Future {
- // This will run in another thread. Set the execution id so that we can
connect these jobs
- // with the correct execution.
- SQLExecution.withExecutionId(sqlContext.sparkSession, executionId) {
- try {
- val beforeCollect = System.nanoTime()
- // Use executeCollect/executeCollectIterator to avoid conversion to
Scala types
- val (numRows, input) = child.executeCollectIterator()
- if (numRows >= 512000000) {
- throw new SparkException(
- s"Cannot broadcast the table with 512 million or more rows:
$numRows rows")
- }
-
- val beforeBuild = System.nanoTime()
- longMetric("collectTime") += NANOSECONDS.toMillis(beforeBuild -
beforeCollect)
-
- // Construct the relation.
- val relation = mode.transform(input, Some(numRows))
-
- val dataSize = relation match {
- case map: HashedRelation =>
- map.estimatedSize
- case arr: Array[InternalRow] =>
- arr.map(_.asInstanceOf[UnsafeRow].getSizeInBytes.toLong).sum
- case _ =>
- throw new SparkException("[BUG] BroadcastMode.transform returned
unexpected type: " +
- relation.getClass.getName)
+ val task = new Callable[broadcast.Broadcast[Any]]() {
+ override def call(): broadcast.Broadcast[Any] = {
+ // This will run in another thread. Set the execution id so that we
can connect these jobs
+ // with the correct execution.
+ SQLExecution.withExecutionId(sqlContext.sparkSession, executionId) {
+ try {
+ sparkContext.setJobGroup(runId.toString, s"broadcast exchange
(runId $runId)",
Review comment:
My idea is that since cancelling the broadcast job is an optimization
method, and the UI is also user-oriented, it is better to add a configuration
item, such as:
`spark.broadcast.job.cancelling.enabled`
When it is true, it supports cancelling the broadcast job. The default is
false. By default, if the user sets up job group and job description, he will
not be surprised by the UI. If the user wants to optimize the broadcast job, he
will not be surprised by the UI too. We can make it clear in the configuration
doc. Mutiple job group can also achieve the goal, but I am worried that it will
be more disruptive to program development. Some old users may not know how to
set up mutiple job group without viewing the source code.In addition, multiple
job group will bring more workload and higher error probability.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]