HyukjinKwon commented on a change in pull request #34396:
URL: https://github.com/apache/spark/pull/34396#discussion_r747959540
##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala
##########
@@ -458,6 +462,34 @@ case class RowToColumnarExec(child: SparkPlan) extends
RowToColumnarTransition {
// This avoids calling `schema` in the RDD closure, so that we don't need
to include the entire
// plan (this) in the closure.
val localSchema = this.schema
+ if (enableArrowColumnVector) {
+ val maxRecordsPerBatch = SQLConf.get.arrowMaxRecordsPerBatch
+ val timeZoneId = SQLConf.get.sessionLocalTimeZone
+ return child.execute().mapPartitionsInternal { rowIterator =>
+ val context = TaskContext.get()
+ val allocator = ArrowUtils.getDefaultAllocator
+ val bytesIterator = ArrowConverters
+ .toBatchIterator(rowIterator, localSchema, maxRecordsPerBatch,
timeZoneId, context)
Review comment:
Oh, okay. I guess there was a bit of miscommunication here.
I am more worrying about symmetry, and I am more talking about SQL physical
plan level. I am thinking about having plans such as:
```scala
RowsToColumnarExec
ColumnarToRowsExec
ArrowToColumnarExec
ColumnarToArrowExec
```
What you want to do is I guess use Arrow-backed Columnar instances, right?
You could, for example, pattern match on `RowsToColumnarExec(child)` and
replace to something like `ColumnarToArrowExec(RowsToColumnarExec(child))` in
Spark extensions.
I presume the overhead between `ColumnarBatch` and `ArrowBatch` is trivial.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]