Github user kiszk commented on a diff in the pull request:
https://github.com/apache/spark/pull/18747#discussion_r145424996
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
---
@@ -23,21 +23,72 @@ import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning,
Partitioning}
-import org.apache.spark.sql.execution.LeafExecNode
-import org.apache.spark.sql.execution.metric.SQLMetrics
-import org.apache.spark.sql.types.UserDefinedType
+import org.apache.spark.sql.execution.{ColumnarBatchScan, LeafExecNode,
WholeStageCodegenExec}
+import org.apache.spark.sql.execution.vectorized._
+import org.apache.spark.sql.types._
case class InMemoryTableScanExec(
attributes: Seq[Attribute],
predicates: Seq[Expression],
@transient relation: InMemoryRelation)
- extends LeafExecNode {
+ extends LeafExecNode with ColumnarBatchScan {
override protected def innerChildren: Seq[QueryPlan[_]] = Seq(relation)
++ super.innerChildren
- override lazy val metrics = Map(
- "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of
output rows"))
+ override def vectorTypes: Option[Seq[String]] =
+
Option(Seq.fill(attributes.length)(classOf[OnHeapColumnVector].getName))
+
+ /**
+ * If true, get data from ColumnVector in ColumnarBatch, which are
generally faster.
+ * If false, get data from UnsafeRow build from ColumnVector
+ */
+ override val supportCodegen: Boolean = {
+ // In the initial implementation, for ease of review
+ // support only primitive data types and # of fields is less than
wholeStageMaxNumFields
+ val schema = StructType.fromAttributes(relation.output)
+ schema.fields.find(f => f.dataType match {
+ case BooleanType | ByteType | ShortType | IntegerType | LongType |
+ FloatType | DoubleType => false
+ case _ => true
+ }).isEmpty &&
+ !WholeStageCodegenExec.isTooManyFields(conf, relation.schema) &&
+ children.find(p => WholeStageCodegenExec.isTooManyFields(conf,
p.schema)).isEmpty
+ }
+
+ private val columnIndices =
+ attributes.map(a => relation.output.map(o =>
o.exprId).indexOf(a.exprId)).toArray
+
+ private val relationSchema = relation.schema.toArray
+
+ private lazy val columnarBatchSchema = new
StructType(columnIndices.map(i => relationSchema(i)))
+
+ private def createAndDecompressColumn(cachedColumnarBatch: CachedBatch):
ColumnarBatch = {
+ val rowCount = cachedColumnarBatch.numRows
+ val columnVectors = OnHeapColumnVector.allocateColumns(rowCount,
columnarBatchSchema)
--- End diff --
I agree that we can improve efficiency if we can reuse the
`OnHeapColumnVector`.
I think that it is not easy to reuse the `OnHeapColumnVector` between
different cached batches.
IIUC there is no point to know a cached batch will not be referenced. We
rely the management of the lifetime on GC by creating `OnHeapColumnVector`
every time.
If we reuse the `OnHeapColumnVector` (i.e. keep a reference to
`OnHeapColumnVector`), GC will not dispose `OnHeapColumnVector` even if the
generated code will not use the `OnHeapColumnVector`. It means that
uncompressed (huge) data lives for a long time. If we know the point where a
cache batch will not be referenced, we could set null to `data` in
`OnHeapColumnVector`.
Thus, I currently create `OnHeapColumnVector`. What do you think?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]