revans2 commented on a change in pull request #24795: [SPARK-27945][SQL] 
Minimal changes to support columnar processing
URL: https://github.com/apache/spark/pull/24795#discussion_r294274613
 
 

 ##########
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala
 ##########
 @@ -0,0 +1,594 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.{broadcast, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder, 
SpecializedGetters, UnsafeProjection}
+import org.apache.spark.sql.catalyst.expressions.codegen._
+import org.apache.spark.sql.catalyst.expressions.codegen.Block._
+import org.apache.spark.sql.catalyst.plans.physical.Partitioning
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
+import org.apache.spark.sql.execution.vectorized.{OffHeapColumnVector, 
OnHeapColumnVector, WritableColumnVector}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
+import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
+
+/**
+ * Holds a user defined rule that can be used to inject columnar 
implementations of various
+ * operators in the plan. The [[preColumnarTransitions]] [[Rule]] can be used 
to replace
+ * [[SparkPlan]] instances with versions that support a columnar 
implementation. After this
+ * Spark will insert any transitions necessary. This includes transitions from 
row to columnar
+ * [[RowToColumnarExec]] and from columnar to row [[ColumnarToRowExec]]. At 
this point the
+ * [[postColumnarTransitions]] [[Rule]] is called to allow replacing any of 
the implementations
+ * of the transitions or doing cleanup of the plan, like inserting stages to 
build larger batches
+ * for more efficient processing, or stages that transition the data to/from 
an accelerator's
+ * memory.
+ */
+class ColumnarRule {
+  def preColumnarTransitions: Rule[SparkPlan] = plan => plan
+  def postColumnarTransitions: Rule[SparkPlan] = plan => plan
+}
+
+/**
+ * Provides a common executor to translate an [[RDD]] of [[ColumnarBatch]] 
into an [[RDD]] of
+ * [[InternalRow]]. This is inserted whenever such a transition is determined 
to be needed.
+ *
+ * The implementation is based off of similar implementations in 
[[ColumnarBatchScan]],
+ * [[org.apache.spark.sql.execution.python.ArrowEvalPythonExec]], and
+ * [[MapPartitionsInRWithArrowExec]]. Eventually this should replace those 
implementations.
+ */
+case class ColumnarToRowExec(child: SparkPlan)
+  extends UnaryExecNode with CodegenSupport {
+
+  override def output: Seq[Attribute] = child.output
+
+  override def outputPartitioning: Partitioning = child.outputPartitioning
+
+  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+
+  override lazy val metrics: Map[String, SQLMetric] = Map(
+    "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output 
rows"),
+    "numInputBatches" -> SQLMetrics.createMetric(sparkContext, "number of 
input batches"),
+    "scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time")
+  )
+
+  override def doExecute(): RDD[InternalRow] = {
+    val numOutputRows = longMetric("numOutputRows")
+    val numInputBatches = longMetric("numInputBatches")
+    val scanTime = longMetric("scanTime")
+    val batches = child.executeColumnar()
+    batches.mapPartitions { cbIter =>
+      // UnsafeProjection is not serializable so do it on the executor side
+      val outputProject = UnsafeProjection.create(output, output)
+      new Iterator[InternalRow] {
+        var it: java.util.Iterator[InternalRow] = null
+
+        def loadNextBatch: Unit = {
+          if (it != null) {
+            it = null
+          }
+          val batchStartNs = System.nanoTime()
+          if (cbIter.hasNext) {
+            val cb = cbIter.next()
+            it = cb.rowIterator()
+            numInputBatches += 1
+            // In order to match the numOutputRows metric in the generated 
code we update
+            // numOutputRows for each batch. This is less accurate than doing 
it at output
+            // because it will over count the number of rows output in the 
case of a limit,
+            // but it is more efficient.
+            numOutputRows += cb.numRows()
+          }
+          scanTime += ((System.nanoTime() - batchStartNs) / (1000 * 1000))
+        }
+
+        override def hasNext: Boolean = {
+          val itHasNext = it != null && it.hasNext
+          if (!itHasNext) {
+            loadNextBatch
+            it != null && it.hasNext
+          } else {
+            itHasNext
+          }
+        }
+
+        override def next(): InternalRow = {
+          if (it == null || !it.hasNext) {
+            loadNextBatch
+          }
+          if (it == null) {
+            throw new NoSuchElementException()
+          }
+          it.next()
+        }
+        // This is to convert the InternalRow to an UnsafeRow. Even though the 
type is
+        // InternalRow some operations downstream operations like collect 
require it to
+        // be UnsafeRow
+      }.map(outputProject)
+    }
+  }
+
+  /**
+   * Generate [[ColumnVector]] expressions for our parent to consume as rows.
+   * This is called once per [[ColumnVector]] in the batch.
+   *
+   * This code came unchanged from [[ColumnarBatchScan]] and will hopefully 
replace it
+   * at some point.
+   */
+  private def genCodeColumnVector(
+      ctx: CodegenContext,
+      columnVar: String,
+      ordinal: String,
+      dataType: DataType,
+      nullable: Boolean): ExprCode = {
+    val javaType = CodeGenerator.javaType(dataType)
+    val value = CodeGenerator.getValueFromVector(columnVar, dataType, ordinal)
+    val isNullVar = if (nullable) {
+      JavaCode.isNullVariable(ctx.freshName("isNull"))
+    } else {
+      FalseLiteral
+    }
+    val valueVar = ctx.freshName("value")
+    val str = s"columnVector[$columnVar, $ordinal, ${dataType.simpleString}]"
+    val code = code"${ctx.registerComment(str)}" + (if (nullable) {
+      code"""
+        boolean $isNullVar = $columnVar.isNullAt($ordinal);
+        $javaType $valueVar = $isNullVar ? 
${CodeGenerator.defaultValue(dataType)} : ($value);
+      """
+    } else {
+      code"$javaType $valueVar = $value;"
+    })
+    ExprCode(code, isNullVar, JavaCode.variable(valueVar, dataType))
+  }
+
+  /**
+   * Produce code to process the input iterator as [[ColumnarBatch]]es.
+   * This produces an [[org.apache.spark.sql.catalyst.expressions.UnsafeRow]] 
for each row in
+   * each batch.
+   *
+   * This code came almost completely unchanged from [[ColumnarBatchScan]] and 
will
+   * hopefully replace it at some point.
+   */
+  override protected def doProduce(ctx: CodegenContext): String = {
+    // PhysicalRDD always just has one input
+    val input = ctx.addMutableState("scala.collection.Iterator", "input",
+      v => s"$v = inputs[0];")
+
+    // metrics
+    val numOutputRows = metricTerm(ctx, "numOutputRows")
+    val numInputBatches = metricTerm(ctx, "numInputBatches")
+    val scanTimeMetric = metricTerm(ctx, "scanTime")
+    val scanTimeTotalNs =
+      ctx.addMutableState(CodeGenerator.JAVA_LONG, "scanTime") // init as 
scanTime = 0
+
+    val columnarBatchClz = classOf[ColumnarBatch].getName
+    val batch = ctx.addMutableState(columnarBatchClz, "batch")
+
+    val idx = ctx.addMutableState(CodeGenerator.JAVA_INT, "batchIdx") // init 
as batchIdx = 0
+    val columnVectorClzs = child.vectorTypes.getOrElse(
+      Seq.fill(output.indices.size)(classOf[ColumnVector].getName))
+    val (colVars, columnAssigns) = columnVectorClzs.zipWithIndex.map {
+      case (columnVectorClz, i) =>
+        val name = ctx.addMutableState(columnVectorClz, s"colInstance$i")
+        (name, s"$name = ($columnVectorClz) $batch.column($i);")
+    }.unzip
+
+    val nextBatch = ctx.freshName("nextBatch")
+    val nextBatchFuncName = ctx.addNewFunction(nextBatch,
+      s"""
+         |private void $nextBatch() throws java.io.IOException {
+         |  long getBatchStart = System.nanoTime();
+         |  if ($input.hasNext()) {
+         |    $batch = ($columnarBatchClz)$input.next();
+         |    $numOutputRows.add($batch.numRows());
+         |    $idx = 0;
+         |    ${columnAssigns.mkString("", "\n", "\n")}
+         |    ${numInputBatches}.add(1);
+         |  }
+         |  $scanTimeTotalNs += System.nanoTime() - getBatchStart;
+         |}""".stripMargin)
+
+    ctx.currentVars = null
+    val rowidx = ctx.freshName("rowIdx")
+    val columnsBatchInput = (output zip colVars).map { case (attr, colVar) =>
+      genCodeColumnVector(ctx, colVar, rowidx, attr.dataType, attr.nullable)
+    }
+    val localIdx = ctx.freshName("localIdx")
+    val localEnd = ctx.freshName("localEnd")
+    val numRows = ctx.freshName("numRows")
+    val shouldStop = if (parent.needStopCheck) {
+      s"if (shouldStop()) { $idx = $rowidx + 1; return; }"
+    } else {
+      "// shouldStop check is eliminated"
+    }
+    s"""
+       |if ($batch == null) {
+       |  $nextBatchFuncName();
+       |}
+       |while ($batch != null) {
+       |  int $numRows = $batch.numRows();
+       |  int $localEnd = $numRows - $idx;
+       |  for (int $localIdx = 0; $localIdx < $localEnd; $localIdx++) {
+       |    int $rowidx = $idx + $localIdx;
+       |    ${consume(ctx, columnsBatchInput).trim}
+       |    $shouldStop
+       |  }
+       |  $idx = $numRows;
+       |  $batch = null;
+       |  $nextBatchFuncName();
+       |}
+       |$scanTimeMetric.add($scanTimeTotalNs / (1000 * 1000));
+       |$scanTimeTotalNs = 0;
+     """.stripMargin
+  }
+
+  override def inputRDDs(): Seq[RDD[InternalRow]] = {
+    child.asInstanceOf[CodegenSupport].inputRDDs()
+  }
+}
+
+/**
+ * Provides an optimized set of APIs to append row based data to an array of
+ * [[WritableColumnVector]].
+ */
+private[execution] class RowToColumnConverter(schema: StructType) extends 
Serializable {
+  private val converters = schema.fields.map {
+    f => RowToColumnConverter.getConverterForType(f.dataType)
+  }
+
+  final def convert(row: InternalRow, vectors: Array[WritableColumnVector]): 
Unit = {
+    var idx = 0
+    while (idx < row.numFields) {
+      converters(idx).append(row, idx, vectors(idx))
+      idx += 1
+    }
+  }
+}
+
+/**
+ * Provides an optimized set of APIs to extract a column from a row and append 
it to a
+ * [[WritableColumnVector]].
+ */
+private object RowToColumnConverter {
 
 Review comment:
   That is the eventual goal.  In this patch, I wanted the code to not replace 
any existing code to keep the possible impacts smaller.  
   
   I am currently working on a separate patch to replace ColumnarBatchScan with 
ColumnarToRowExec.  The patch is mostly just updating tests to match the new 
pattern, but I also want to run a lot of benchmark code to be sure that I 
didn't regress anything.
   
   After that, I was going to start working on the various APIs that use arrow 
data behind the scenes to process data, R, Python, etc.  That is likely to have 
a larger impact because the code that does those transitions is spread through 
a lot more files.  I also don't want to get too far ahead of myself as this 
patch all of them would be based off of is not even in yet.  

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to