revans2 commented on a change in pull request #24795: [SPARK-27945][SQL] 
Minimal changes to support columnar processing
URL: https://github.com/apache/spark/pull/24795#discussion_r299013597
 
 

 ##########
 File path: 
sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala
 ##########
 @@ -251,6 +286,371 @@ object MyExtensions {
     (_: Seq[Expression]) => Literal(5, IntegerType))
 }
 
+case class CloseableColumnBatchIterator(itr: Iterator[ColumnarBatch],
+    f: ColumnarBatch => ColumnarBatch) extends Iterator[ColumnarBatch] {
+  var cb: ColumnarBatch = null
+
+  private def closeCurrentBatch(): Unit = {
+    if (cb != null) {
+      cb.close
+      cb = null
+    }
+  }
+
+  TaskContext.get().addTaskCompletionListener[Unit]((tc: TaskContext) => {
+    closeCurrentBatch()
+  })
+
+  override def hasNext: Boolean = {
+    closeCurrentBatch()
+    itr.hasNext
+  }
+
+  override def next(): ColumnarBatch = {
+    closeCurrentBatch()
+    cb = f(itr.next())
+    cb
+  }
+}
+
+object NoCloseColumnVector extends Logging {
+  def wrapIfNeeded(cv: ColumnVector): NoCloseColumnVector = cv match {
+    case ref: NoCloseColumnVector =>
+      ref
+    case vec => NoCloseColumnVector(vec)
+  }
+}
+
+/**
+ * Provide a ColumnVector so ColumnarExpression can close temporary values 
without
+ * having to guess what type it really is.
+ */
+case class NoCloseColumnVector(wrapped: ColumnVector) extends 
ColumnVector(wrapped.dataType) {
+  private var refCount = 1
+
+  /**
+   * Don't actually close the ColumnVector this wraps.  The producer of the 
vector will take
+   * care of that.
+   */
+  override def close(): Unit = {
+    // Empty
+  }
+
+  override def hasNull: Boolean = wrapped.hasNull
+
+  override def numNulls(): Int = wrapped.numNulls
+
+  override def isNullAt(rowId: Int): Boolean = wrapped.isNullAt(rowId)
+
+  override def getBoolean(rowId: Int): Boolean = wrapped.getBoolean(rowId)
+
+  override def getByte(rowId: Int): Byte = wrapped.getByte(rowId)
+
+  override def getShort(rowId: Int): Short = wrapped.getShort(rowId)
+
+  override def getInt(rowId: Int): Int = wrapped.getInt(rowId)
+
+  override def getLong(rowId: Int): Long = wrapped.getLong(rowId)
+
+  override def getFloat(rowId: Int): Float = wrapped.getFloat(rowId)
+
+  override def getDouble(rowId: Int): Double = wrapped.getDouble(rowId)
+
+  override def getArray(rowId: Int): ColumnarArray = wrapped.getArray(rowId)
+
+  override def getMap(ordinal: Int): ColumnarMap = wrapped.getMap(ordinal)
+
+  override def getDecimal(rowId: Int, precision: Int, scale: Int): Decimal =
+    wrapped.getDecimal(rowId, precision, scale)
+
+  override def getUTF8String(rowId: Int): UTF8String = 
wrapped.getUTF8String(rowId)
+
+  override def getBinary(rowId: Int): Array[Byte] = wrapped.getBinary(rowId)
+
+  override protected def getChild(ordinal: Int): ColumnVector = 
wrapped.getChild(ordinal)
+}
+
+trait ColumnarExpression extends Expression with Serializable {
+  /**
+   * Returns true if this expression supports columnar processing through 
[[columnarEval]].
+   */
+  def supportsColumnar: Boolean = true
+
+  /**
+   * Returns the result of evaluating this expression on the entire
+   * [[org.apache.spark.sql.vectorized.ColumnarBatch]]. The result of
+   * calling this may be a single 
[[org.apache.spark.sql.vectorized.ColumnVector]] or a scalar
+   * value. Scalar values typically happen if they are a part of the 
expression i.e. col("a") + 100.
 
 Review comment:
   The main reason it is this way is that originally I had Expression support 
columnar as well but as a part of the review it changed so I made minimal 
changes to the example to match.
   I do find it simpler from a development standpoint to have a one to one 
mapping.  I can then write unit tests with just inputs and verify that the 
outputs match exactly.   But yes there probably are simpler ways to do this 
depending on the columnar library you are using.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to