Github user CodingCat commented on a diff in the pull request:
https://github.com/apache/spark/pull/19810#discussion_r154844579
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
---
@@ -52,6 +52,68 @@ object InMemoryRelation {
private[columnar]
case class CachedBatch(numRows: Int, buffers: Array[Array[Byte]], stats:
InternalRow)
+private[columnar] class CachedBatchIterator(
+ rowIterator: Iterator[InternalRow],
+ output: Seq[Attribute],
+ batchSize: Int,
+ useCompression: Boolean,
+ batchStats: LongAccumulator,
+ singleBatchPerPartition: Boolean) extends Iterator[CachedBatch] {
+
+ def next(): CachedBatch = {
+ val columnBuilders = output.map { attribute =>
+ ColumnBuilder(attribute.dataType, batchSize, attribute.name,
useCompression)
+ }.toArray
+
+ var rowCount = 0
+ var totalSize = 0L
+
+ val terminateLoop = (singleBatch: Boolean, rowIter:
Iterator[InternalRow],
--- End diff --
to make getting partition stats easier, we construct only one CatchedBatch
for each partition when enabling the functionality proposed in this PR.
`singleBatch` distinguishes the scenarios which enables/disables the
functionality by introducing different while loop termination conditions,
making the other code reusable
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]