Github user rdblue commented on a diff in the pull request:
https://github.com/apache/spark/pull/19394#discussion_r142474766
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala ---
@@ -280,13 +280,20 @@ abstract class SparkPlan extends QueryPlan[SparkPlan]
with Logging with Serializ
results.toArray
}
+ private[spark] def executeCollectIterator(): (Long,
Iterator[InternalRow]) = {
+ val countsAndBytes = getByteArrayRdd().collect()
--- End diff --
I agree, but this is better than before. This only keeps all of the rows in
memory compressed, and then streams through the compressed blocks. Before this
patch, the rows are copied into a buffer per row while holding the compressed
blocks, so it held the rows compressed and uncompressed at the same time. The
uncompressed rows are what this fixes, we can follow up with something better
to stream through blocks from executors.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]