Github user viirya commented on a diff in the pull request:
https://github.com/apache/spark/pull/19394#discussion_r143093875
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
---
@@ -73,25 +73,37 @@ case class BroadcastExchangeExec(
try {
val beforeCollect = System.nanoTime()
// Note that we use .executeCollect() because we don't want to
convert data to Scala types
- val input: Array[InternalRow] = child.executeCollect()
- if (input.length >= 512000000) {
+ val (numRows, input) = child.executeCollectIterator()
+ if (numRows >= 512000000) {
throw new SparkException(
- s"Cannot broadcast the table with more than 512 millions
rows: ${input.length} rows")
+ s"Cannot broadcast the table with more than 512 millions
rows: $numRows rows")
}
+
val beforeBuild = System.nanoTime()
longMetric("collectTime") += (beforeBuild - beforeCollect) /
1000000
- val dataSize =
input.map(_.asInstanceOf[UnsafeRow].getSizeInBytes.toLong).sum
+
+ // Construct the relation.
+ val relation = mode.transform(input, Some(numRows))
+
+ val dataSize = relation match {
+ case map: HashedRelation =>
+ map.estimatedSize
--- End diff --
The estimated size of hashed relation maybe larger than the accurate data
size calculated before. I guess this may be regression under some cases.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]