revans2 commented on a change in pull request #24795: [SPARK-27945][SQL] Minimal changes to support columnar processing URL: https://github.com/apache/spark/pull/24795#discussion_r294274613
########## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala ########## @@ -0,0 +1,594 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.{broadcast, TaskContext} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder, SpecializedGetters, UnsafeProjection} +import org.apache.spark.sql.catalyst.expressions.codegen._ +import org.apache.spark.sql.catalyst.expressions.codegen.Block._ +import org.apache.spark.sql.catalyst.plans.physical.Partitioning +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} +import org.apache.spark.sql.execution.vectorized.{OffHeapColumnVector, OnHeapColumnVector, WritableColumnVector} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types._ +import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector} + +/** + * Holds a user defined rule that can be used to inject columnar implementations of various + * operators in the plan. The [[preColumnarTransitions]] [[Rule]] can be used to replace + * [[SparkPlan]] instances with versions that support a columnar implementation. After this + * Spark will insert any transitions necessary. This includes transitions from row to columnar + * [[RowToColumnarExec]] and from columnar to row [[ColumnarToRowExec]]. At this point the + * [[postColumnarTransitions]] [[Rule]] is called to allow replacing any of the implementations + * of the transitions or doing cleanup of the plan, like inserting stages to build larger batches + * for more efficient processing, or stages that transition the data to/from an accelerator's + * memory. + */ +class ColumnarRule { + def preColumnarTransitions: Rule[SparkPlan] = plan => plan + def postColumnarTransitions: Rule[SparkPlan] = plan => plan +} + +/** + * Provides a common executor to translate an [[RDD]] of [[ColumnarBatch]] into an [[RDD]] of + * [[InternalRow]]. This is inserted whenever such a transition is determined to be needed. + * + * The implementation is based off of similar implementations in [[ColumnarBatchScan]], + * [[org.apache.spark.sql.execution.python.ArrowEvalPythonExec]], and + * [[MapPartitionsInRWithArrowExec]]. Eventually this should replace those implementations. + */ +case class ColumnarToRowExec(child: SparkPlan) + extends UnaryExecNode with CodegenSupport { + + override def output: Seq[Attribute] = child.output + + override def outputPartitioning: Partitioning = child.outputPartitioning + + override def outputOrdering: Seq[SortOrder] = child.outputOrdering + + override lazy val metrics: Map[String, SQLMetric] = Map( + "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), + "numInputBatches" -> SQLMetrics.createMetric(sparkContext, "number of input batches"), + "scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time") + ) + + override def doExecute(): RDD[InternalRow] = { + val numOutputRows = longMetric("numOutputRows") + val numInputBatches = longMetric("numInputBatches") + val scanTime = longMetric("scanTime") + val batches = child.executeColumnar() + batches.mapPartitions { cbIter => + // UnsafeProjection is not serializable so do it on the executor side + val outputProject = UnsafeProjection.create(output, output) + new Iterator[InternalRow] { + var it: java.util.Iterator[InternalRow] = null + + def loadNextBatch: Unit = { + if (it != null) { + it = null + } + val batchStartNs = System.nanoTime() + if (cbIter.hasNext) { + val cb = cbIter.next() + it = cb.rowIterator() + numInputBatches += 1 + // In order to match the numOutputRows metric in the generated code we update + // numOutputRows for each batch. This is less accurate than doing it at output + // because it will over count the number of rows output in the case of a limit, + // but it is more efficient. + numOutputRows += cb.numRows() + } + scanTime += ((System.nanoTime() - batchStartNs) / (1000 * 1000)) + } + + override def hasNext: Boolean = { + val itHasNext = it != null && it.hasNext + if (!itHasNext) { + loadNextBatch + it != null && it.hasNext + } else { + itHasNext + } + } + + override def next(): InternalRow = { + if (it == null || !it.hasNext) { + loadNextBatch + } + if (it == null) { + throw new NoSuchElementException() + } + it.next() + } + // This is to convert the InternalRow to an UnsafeRow. Even though the type is + // InternalRow some operations downstream operations like collect require it to + // be UnsafeRow + }.map(outputProject) + } + } + + /** + * Generate [[ColumnVector]] expressions for our parent to consume as rows. + * This is called once per [[ColumnVector]] in the batch. + * + * This code came unchanged from [[ColumnarBatchScan]] and will hopefully replace it + * at some point. + */ + private def genCodeColumnVector( + ctx: CodegenContext, + columnVar: String, + ordinal: String, + dataType: DataType, + nullable: Boolean): ExprCode = { + val javaType = CodeGenerator.javaType(dataType) + val value = CodeGenerator.getValueFromVector(columnVar, dataType, ordinal) + val isNullVar = if (nullable) { + JavaCode.isNullVariable(ctx.freshName("isNull")) + } else { + FalseLiteral + } + val valueVar = ctx.freshName("value") + val str = s"columnVector[$columnVar, $ordinal, ${dataType.simpleString}]" + val code = code"${ctx.registerComment(str)}" + (if (nullable) { + code""" + boolean $isNullVar = $columnVar.isNullAt($ordinal); + $javaType $valueVar = $isNullVar ? ${CodeGenerator.defaultValue(dataType)} : ($value); + """ + } else { + code"$javaType $valueVar = $value;" + }) + ExprCode(code, isNullVar, JavaCode.variable(valueVar, dataType)) + } + + /** + * Produce code to process the input iterator as [[ColumnarBatch]]es. + * This produces an [[org.apache.spark.sql.catalyst.expressions.UnsafeRow]] for each row in + * each batch. + * + * This code came almost completely unchanged from [[ColumnarBatchScan]] and will + * hopefully replace it at some point. + */ + override protected def doProduce(ctx: CodegenContext): String = { + // PhysicalRDD always just has one input + val input = ctx.addMutableState("scala.collection.Iterator", "input", + v => s"$v = inputs[0];") + + // metrics + val numOutputRows = metricTerm(ctx, "numOutputRows") + val numInputBatches = metricTerm(ctx, "numInputBatches") + val scanTimeMetric = metricTerm(ctx, "scanTime") + val scanTimeTotalNs = + ctx.addMutableState(CodeGenerator.JAVA_LONG, "scanTime") // init as scanTime = 0 + + val columnarBatchClz = classOf[ColumnarBatch].getName + val batch = ctx.addMutableState(columnarBatchClz, "batch") + + val idx = ctx.addMutableState(CodeGenerator.JAVA_INT, "batchIdx") // init as batchIdx = 0 + val columnVectorClzs = child.vectorTypes.getOrElse( + Seq.fill(output.indices.size)(classOf[ColumnVector].getName)) + val (colVars, columnAssigns) = columnVectorClzs.zipWithIndex.map { + case (columnVectorClz, i) => + val name = ctx.addMutableState(columnVectorClz, s"colInstance$i") + (name, s"$name = ($columnVectorClz) $batch.column($i);") + }.unzip + + val nextBatch = ctx.freshName("nextBatch") + val nextBatchFuncName = ctx.addNewFunction(nextBatch, + s""" + |private void $nextBatch() throws java.io.IOException { + | long getBatchStart = System.nanoTime(); + | if ($input.hasNext()) { + | $batch = ($columnarBatchClz)$input.next(); + | $numOutputRows.add($batch.numRows()); + | $idx = 0; + | ${columnAssigns.mkString("", "\n", "\n")} + | ${numInputBatches}.add(1); + | } + | $scanTimeTotalNs += System.nanoTime() - getBatchStart; + |}""".stripMargin) + + ctx.currentVars = null + val rowidx = ctx.freshName("rowIdx") + val columnsBatchInput = (output zip colVars).map { case (attr, colVar) => + genCodeColumnVector(ctx, colVar, rowidx, attr.dataType, attr.nullable) + } + val localIdx = ctx.freshName("localIdx") + val localEnd = ctx.freshName("localEnd") + val numRows = ctx.freshName("numRows") + val shouldStop = if (parent.needStopCheck) { + s"if (shouldStop()) { $idx = $rowidx + 1; return; }" + } else { + "// shouldStop check is eliminated" + } + s""" + |if ($batch == null) { + | $nextBatchFuncName(); + |} + |while ($batch != null) { + | int $numRows = $batch.numRows(); + | int $localEnd = $numRows - $idx; + | for (int $localIdx = 0; $localIdx < $localEnd; $localIdx++) { + | int $rowidx = $idx + $localIdx; + | ${consume(ctx, columnsBatchInput).trim} + | $shouldStop + | } + | $idx = $numRows; + | $batch = null; + | $nextBatchFuncName(); + |} + |$scanTimeMetric.add($scanTimeTotalNs / (1000 * 1000)); + |$scanTimeTotalNs = 0; + """.stripMargin + } + + override def inputRDDs(): Seq[RDD[InternalRow]] = { + child.asInstanceOf[CodegenSupport].inputRDDs() + } +} + +/** + * Provides an optimized set of APIs to append row based data to an array of + * [[WritableColumnVector]]. + */ +private[execution] class RowToColumnConverter(schema: StructType) extends Serializable { + private val converters = schema.fields.map { + f => RowToColumnConverter.getConverterForType(f.dataType) + } + + final def convert(row: InternalRow, vectors: Array[WritableColumnVector]): Unit = { + var idx = 0 + while (idx < row.numFields) { + converters(idx).append(row, idx, vectors(idx)) + idx += 1 + } + } +} + +/** + * Provides an optimized set of APIs to extract a column from a row and append it to a + * [[WritableColumnVector]]. + */ +private object RowToColumnConverter { Review comment: That is the eventual goal. In this patch, I wanted the code to not replace any existing code to keep the possible impacts smaller. I am currently working on a separate patch to replace ColumnarBatchScan with ColumnarToRowExec. The patch is mostly just updating tests to match the new pattern, but I also want to run a lot of benchmark code to be sure that I didn't regress anything. After that, I was going to start working on the various APIs that use arrow data behind the scenes to process data, R, Python, etc. That is likely to have a larger impact because the code that does those transitions is spread through a lot more files. I also don't want to get too far ahead of myself as this patch all of them would be based off of is not even in yet. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org