cloud-fan commented on a change in pull request #24795: [SPARK-27945][SQL] 
Minimal changes to support columnar processing
URL: https://github.com/apache/spark/pull/24795#discussion_r298873668
 
 

 ##########
 File path: 
sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala
 ##########
 @@ -251,6 +286,371 @@ object MyExtensions {
     (_: Seq[Expression]) => Literal(5, IntegerType))
 }
 
+case class CloseableColumnBatchIterator(itr: Iterator[ColumnarBatch],
+    f: ColumnarBatch => ColumnarBatch) extends Iterator[ColumnarBatch] {
+  var cb: ColumnarBatch = null
+
+  private def closeCurrentBatch(): Unit = {
+    if (cb != null) {
+      cb.close
+      cb = null
+    }
+  }
+
+  TaskContext.get().addTaskCompletionListener[Unit]((tc: TaskContext) => {
+    closeCurrentBatch()
+  })
+
+  override def hasNext: Boolean = {
+    closeCurrentBatch()
+    itr.hasNext
+  }
+
+  override def next(): ColumnarBatch = {
+    closeCurrentBatch()
+    cb = f(itr.next())
+    cb
+  }
+}
+
+object NoCloseColumnVector extends Logging {
+  def wrapIfNeeded(cv: ColumnVector): NoCloseColumnVector = cv match {
+    case ref: NoCloseColumnVector =>
+      ref
+    case vec => NoCloseColumnVector(vec)
+  }
+}
+
+/**
+ * Provide a ColumnVector so ColumnarExpression can close temporary values 
without
+ * having to guess what type it really is.
+ */
+case class NoCloseColumnVector(wrapped: ColumnVector) extends 
ColumnVector(wrapped.dataType) {
+  private var refCount = 1
+
+  /**
+   * Don't actually close the ColumnVector this wraps.  The producer of the 
vector will take
+   * care of that.
+   */
+  override def close(): Unit = {
+    // Empty
+  }
+
+  override def hasNull: Boolean = wrapped.hasNull
+
+  override def numNulls(): Int = wrapped.numNulls
+
+  override def isNullAt(rowId: Int): Boolean = wrapped.isNullAt(rowId)
+
+  override def getBoolean(rowId: Int): Boolean = wrapped.getBoolean(rowId)
+
+  override def getByte(rowId: Int): Byte = wrapped.getByte(rowId)
+
+  override def getShort(rowId: Int): Short = wrapped.getShort(rowId)
+
+  override def getInt(rowId: Int): Int = wrapped.getInt(rowId)
+
+  override def getLong(rowId: Int): Long = wrapped.getLong(rowId)
+
+  override def getFloat(rowId: Int): Float = wrapped.getFloat(rowId)
+
+  override def getDouble(rowId: Int): Double = wrapped.getDouble(rowId)
+
+  override def getArray(rowId: Int): ColumnarArray = wrapped.getArray(rowId)
+
+  override def getMap(ordinal: Int): ColumnarMap = wrapped.getMap(ordinal)
+
+  override def getDecimal(rowId: Int, precision: Int, scale: Int): Decimal =
+    wrapped.getDecimal(rowId, precision, scale)
+
+  override def getUTF8String(rowId: Int): UTF8String = 
wrapped.getUTF8String(rowId)
+
+  override def getBinary(rowId: Int): Array[Byte] = wrapped.getBinary(rowId)
+
+  override protected def getChild(ordinal: Int): ColumnVector = 
wrapped.getChild(ordinal)
+}
+
+trait ColumnarExpression extends Expression with Serializable {
+  /**
+   * Returns true if this expression supports columnar processing through 
[[columnarEval]].
+   */
+  def supportsColumnar: Boolean = true
+
+  /**
+   * Returns the result of evaluating this expression on the entire
+   * [[org.apache.spark.sql.vectorized.ColumnarBatch]]. The result of
+   * calling this may be a single 
[[org.apache.spark.sql.vectorized.ColumnVector]] or a scalar
+   * value. Scalar values typically happen if they are a part of the 
expression i.e. col("a") + 100.
 
 Review comment:
   I understand that this is in test not an API, but other people may look at 
this test to learn how to implement columnar operator, and I feel the current 
example is not that good.
   
   IIUC, the goal is:
   1. users can write a rule to replace an arbitrary SQL operator with a custom 
optimized columnar version
   2. Spark automatically insert column-to-row and row-to-column operators 
around the columnar operator.
   
   For 1, I think a pretty simple approach is, take in an expression tree, 
compile it to a columnar processor that can execute the expression tree in a 
columnar fashion. We don't need to create a `ColumnarExpression`.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to