Github user cloud-fan commented on a diff in the pull request:
https://github.com/apache/spark/pull/18747#discussion_r144757619
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
---
@@ -23,21 +23,53 @@ import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning,
Partitioning}
-import org.apache.spark.sql.execution.LeafExecNode
-import org.apache.spark.sql.execution.metric.SQLMetrics
+import org.apache.spark.sql.execution.{ColumnarBatchScan, LeafExecNode}
+import org.apache.spark.sql.execution.vectorized._
import org.apache.spark.sql.types.UserDefinedType
case class InMemoryTableScanExec(
attributes: Seq[Attribute],
predicates: Seq[Expression],
@transient relation: InMemoryRelation)
- extends LeafExecNode {
+ extends LeafExecNode with ColumnarBatchScan {
override protected def innerChildren: Seq[QueryPlan[_]] = Seq(relation)
++ super.innerChildren
- override lazy val metrics = Map(
- "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of
output rows"))
+ override def vectorTypes: Option[Seq[String]] =
+
Option(Seq.fill(attributes.length)(classOf[OnHeapColumnVector].getName))
+
+ override val columnIndexes =
+ attributes.map(a => relation.output.map(o =>
o.exprId).indexOf(a.exprId)).toArray
+
+ override val supportCodegen: Boolean = relation.useColumnarBatches
+
+ private def createAndDecompressColumn(cachedColumnarBatch: CachedBatch):
ColumnarBatch = {
+ val rowCount = cachedColumnarBatch.numRows
+ val schema = cachedColumnarBatch.schema
+ val columnVectors = OnHeapColumnVector.allocateColumns(rowCount,
schema)
+ val columnarBatch = new ColumnarBatch(
+ schema, columnVectors.asInstanceOf[Array[ColumnVector]], rowCount)
+ columnarBatch.setNumRows(rowCount)
+
+ for (i <- 0 until cachedColumnarBatch.buffers.length) {
+ ColumnAccessor.decompress(
+ cachedColumnarBatch.buffers(i),
columnarBatch.column(i).asInstanceOf[WritableColumnVector],
+ schema.fields(i).dataType, rowCount)
+ }
+ return columnarBatch
+ }
+
+ override def inputRDDs(): Seq[RDD[InternalRow]] = {
+ if (supportCodegen) {
+ val buffers = relation.cachedColumnBuffers
+ // HACK ALERT: This is actually an RDD[ColumnarBatch].
+ // We're taking advantage of Scala's type erasure here to pass these
batches along.
+
Seq(buffers.map(createAndDecompressColumn(_)).asInstanceOf[RDD[InternalRow]])
--- End diff --
I think we need to apply column pruning here, instead of adding
`columnIndexes` and ask `ColumnarBatchScan` to do it.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]