andygrove commented on a change in pull request #33140:
URL: https://github.com/apache/spark/pull/33140#discussion_r663018333
##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala
##########
@@ -441,52 +455,60 @@ case class RowToColumnarExec(child: SparkPlan) extends
RowToColumnarTransition {
)
override def doExecuteColumnar(): RDD[ColumnarBatch] = {
- val enableOffHeapColumnVector = conf.offHeapColumnVectorEnabled
- val numInputRows = longMetric("numInputRows")
- val numOutputBatches = longMetric("numOutputBatches")
- // Instead of creating a new config we are reusing columnBatchSize. In the
future if we do
- // combine with some of the Arrow conversion tools we will need to unify
some of the configs.
- val numRows = conf.columnBatchSize
- // This avoids calling `schema` in the RDD closure, so that we don't need
to include the entire
- // plan (this) in the closure.
- val localSchema = this.schema
- child.execute().mapPartitionsInternal { rowIterator =>
- if (rowIterator.hasNext) {
- new Iterator[ColumnarBatch] {
- private val converters = new RowToColumnConverter(localSchema)
- private val vectors: Seq[WritableColumnVector] = if
(enableOffHeapColumnVector) {
- OffHeapColumnVector.allocateColumns(numRows, localSchema)
- } else {
- OnHeapColumnVector.allocateColumns(numRows, localSchema)
- }
- private val cb: ColumnarBatch = new ColumnarBatch(vectors.toArray)
-
- TaskContext.get().addTaskCompletionListener[Unit] { _ =>
- cb.close()
- }
-
- override def hasNext: Boolean = {
- rowIterator.hasNext
- }
-
- override def next(): ColumnarBatch = {
- cb.setNumRows(0)
- vectors.foreach(_.reset())
- var rowCount = 0
- while (rowCount < numRows && rowIterator.hasNext) {
- val row = rowIterator.next()
- converters.convert(row, vectors.toArray)
- rowCount += 1
+ child match {
+ case a: AdaptiveSparkPlanExec if a.finalPlanSupportsColumnar() =>
Review comment:
Writing to parquet is just one example of where an adaptive plan can be
contained in another plan, and just happens to be the one we are most
interested in supporting without transitions at the moment. If the plan is
columnar then we need to execute it as columnar, regardless of whether it is a
subquery or not.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]