cloud-fan commented on a change in pull request #24795: [SPARK-27945][SQL] Minimal changes to support columnar processing URL: https://github.com/apache/spark/pull/24795#discussion_r298873668
########## File path: sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala ########## @@ -251,6 +286,371 @@ object MyExtensions { (_: Seq[Expression]) => Literal(5, IntegerType)) } +case class CloseableColumnBatchIterator(itr: Iterator[ColumnarBatch], + f: ColumnarBatch => ColumnarBatch) extends Iterator[ColumnarBatch] { + var cb: ColumnarBatch = null + + private def closeCurrentBatch(): Unit = { + if (cb != null) { + cb.close + cb = null + } + } + + TaskContext.get().addTaskCompletionListener[Unit]((tc: TaskContext) => { + closeCurrentBatch() + }) + + override def hasNext: Boolean = { + closeCurrentBatch() + itr.hasNext + } + + override def next(): ColumnarBatch = { + closeCurrentBatch() + cb = f(itr.next()) + cb + } +} + +object NoCloseColumnVector extends Logging { + def wrapIfNeeded(cv: ColumnVector): NoCloseColumnVector = cv match { + case ref: NoCloseColumnVector => + ref + case vec => NoCloseColumnVector(vec) + } +} + +/** + * Provide a ColumnVector so ColumnarExpression can close temporary values without + * having to guess what type it really is. + */ +case class NoCloseColumnVector(wrapped: ColumnVector) extends ColumnVector(wrapped.dataType) { + private var refCount = 1 + + /** + * Don't actually close the ColumnVector this wraps. The producer of the vector will take + * care of that. + */ + override def close(): Unit = { + // Empty + } + + override def hasNull: Boolean = wrapped.hasNull + + override def numNulls(): Int = wrapped.numNulls + + override def isNullAt(rowId: Int): Boolean = wrapped.isNullAt(rowId) + + override def getBoolean(rowId: Int): Boolean = wrapped.getBoolean(rowId) + + override def getByte(rowId: Int): Byte = wrapped.getByte(rowId) + + override def getShort(rowId: Int): Short = wrapped.getShort(rowId) + + override def getInt(rowId: Int): Int = wrapped.getInt(rowId) + + override def getLong(rowId: Int): Long = wrapped.getLong(rowId) + + override def getFloat(rowId: Int): Float = wrapped.getFloat(rowId) + + override def getDouble(rowId: Int): Double = wrapped.getDouble(rowId) + + override def getArray(rowId: Int): ColumnarArray = wrapped.getArray(rowId) + + override def getMap(ordinal: Int): ColumnarMap = wrapped.getMap(ordinal) + + override def getDecimal(rowId: Int, precision: Int, scale: Int): Decimal = + wrapped.getDecimal(rowId, precision, scale) + + override def getUTF8String(rowId: Int): UTF8String = wrapped.getUTF8String(rowId) + + override def getBinary(rowId: Int): Array[Byte] = wrapped.getBinary(rowId) + + override protected def getChild(ordinal: Int): ColumnVector = wrapped.getChild(ordinal) +} + +trait ColumnarExpression extends Expression with Serializable { + /** + * Returns true if this expression supports columnar processing through [[columnarEval]]. + */ + def supportsColumnar: Boolean = true + + /** + * Returns the result of evaluating this expression on the entire + * [[org.apache.spark.sql.vectorized.ColumnarBatch]]. The result of + * calling this may be a single [[org.apache.spark.sql.vectorized.ColumnVector]] or a scalar + * value. Scalar values typically happen if they are a part of the expression i.e. col("a") + 100. Review comment: I understand that this is in test not an API, but other people may look at this test to learn how to implement columnar operator, and I feel the current example is not that good. IIUC, the goal is: 1. users can write a rule to replace an arbitrary SQL operator with a custom optimized columnar version 2. Spark automatically insert column-to-row and row-to-column operators around the columnar operator. For 1, I think a pretty simple approach is, take in an expression tree, compile it to a columnar processor that can execute the expression tree in a columnar fashion. We don't need to create a `ColumnarExpression`. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org