amoghmargoor commented on a change in pull request #15178: [SPARK-17556][SQL]
Executor side broadcast for broadcast joins
URL: https://github.com/apache/spark/pull/15178#discussion_r270614033
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
##########
@@ -73,28 +80,25 @@ case class BroadcastExchangeExec(
// with the correct execution.
SQLExecution.withExecutionId(sparkContext, executionId) {
try {
- val beforeCollect = System.nanoTime()
- // Note that we use .executeCollect() because we don't want to
convert data to Scala types
- val input: Array[InternalRow] = child.executeCollect()
- if (input.length >= 512000000) {
- throw new SparkException(
- s"Cannot broadcast the table with more than 512 millions rows:
${input.length} rows")
- }
val beforeBuild = System.nanoTime()
- longMetric("collectTime") += (beforeBuild - beforeCollect) / 1000000
- val dataSize =
input.map(_.asInstanceOf[UnsafeRow].getSizeInBytes.toLong).sum
- longMetric("dataSize") += dataSize
- if (dataSize >= (8L << 30)) {
+ // Call persist on the RDD because we want to broadcast the RDD
blocks on executors.
+ childRDD = child.execute().mapPartitionsInternal { rowIterator =>
+ rowIterator.map(_.copy())
Review comment:
I know the PR has been closed, but I was interested in understanding the
code.
Why is copy of `child` RDD made before persisting ?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]