kiszk commented on a change in pull request #24795: [SPARK-27945][SQL] Minimal changes to support columnar processing URL: https://github.com/apache/spark/pull/24795#discussion_r295862760
########## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala ########## @@ -0,0 +1,538 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import scala.collection.JavaConverters._ + +import org.apache.spark.{broadcast, TaskContext} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder, SpecializedGetters, UnsafeProjection} +import org.apache.spark.sql.catalyst.expressions.codegen._ +import org.apache.spark.sql.catalyst.expressions.codegen.Block._ +import org.apache.spark.sql.catalyst.plans.physical.Partitioning +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} +import org.apache.spark.sql.execution.vectorized.{OffHeapColumnVector, OnHeapColumnVector, WritableColumnVector} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types._ +import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector} + +/** + * Holds a user defined rule that can be used to inject columnar implementations of various + * operators in the plan. The [[preColumnarTransitions]] [[Rule]] can be used to replace + * [[SparkPlan]] instances with versions that support a columnar implementation. After this + * Spark will insert any transitions necessary. This includes transitions from row to columnar + * [[RowToColumnarExec]] and from columnar to row [[ColumnarToRowExec]]. At this point the + * [[postColumnarTransitions]] [[Rule]] is called to allow replacing any of the implementations + * of the transitions or doing cleanup of the plan, like inserting stages to build larger batches + * for more efficient processing, or stages that transition the data to/from an accelerator's + * memory. + */ +class ColumnarRule { + def preColumnarTransitions: Rule[SparkPlan] = plan => plan + def postColumnarTransitions: Rule[SparkPlan] = plan => plan +} + +/** + * Provides a common executor to translate an [[RDD]] of [[ColumnarBatch]] into an [[RDD]] of + * [[InternalRow]]. This is inserted whenever such a transition is determined to be needed. + * + * The implementation is based off of similar implementations in [[ColumnarBatchScan]], + * [[org.apache.spark.sql.execution.python.ArrowEvalPythonExec]], and + * [[MapPartitionsInRWithArrowExec]]. Eventually this should replace those implementations. + */ +case class ColumnarToRowExec(child: SparkPlan) + extends UnaryExecNode with CodegenSupport { + + override def output: Seq[Attribute] = child.output + + override def outputPartitioning: Partitioning = child.outputPartitioning + + override def outputOrdering: Seq[SortOrder] = child.outputOrdering + + override lazy val metrics: Map[String, SQLMetric] = Map( + "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), + "numInputBatches" -> SQLMetrics.createMetric(sparkContext, "number of input batches"), + "scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time") + ) + + override def doExecute(): RDD[InternalRow] = { + val numOutputRows = longMetric("numOutputRows") + val numInputBatches = longMetric("numInputBatches") + val scanTime = longMetric("scanTime") + // UnsafeProjection is not serializable so do it on the executor side, which is why it is lazy + @transient lazy val outputProject = UnsafeProjection.create(output, output) + val batches = child.executeColumnar() + batches.flatMap(batch => { + val batchStartNs = System.nanoTime() + numInputBatches += 1 + // In order to match the numOutputRows metric in the generated code we update + // numOutputRows for each batch. This is less accurate than doing it at output + // because it will over count the number of rows output in the case of a limit, + // but it is more efficient. + numOutputRows += batch.numRows() + val ret = batch.rowIterator().asScala + scanTime += ((System.nanoTime() - batchStartNs) / (1000 * 1000)) + ret.map(outputProject) + }) + } + + /** + * Generate [[ColumnVector]] expressions for our parent to consume as rows. + * This is called once per [[ColumnVector]] in the batch. + * + * This code came unchanged from [[ColumnarBatchScan]] and will hopefully replace it + * at some point. + */ + private def genCodeColumnVector( + ctx: CodegenContext, + columnVar: String, + ordinal: String, + dataType: DataType, + nullable: Boolean): ExprCode = { + val javaType = CodeGenerator.javaType(dataType) + val value = CodeGenerator.getValueFromVector(columnVar, dataType, ordinal) + val isNullVar = if (nullable) { + JavaCode.isNullVariable(ctx.freshName("isNull")) + } else { + FalseLiteral + } + val valueVar = ctx.freshName("value") + val str = s"columnVector[$columnVar, $ordinal, ${dataType.simpleString}]" + val code = code"${ctx.registerComment(str)}" + (if (nullable) { + code""" + boolean $isNullVar = $columnVar.isNullAt($ordinal); + $javaType $valueVar = $isNullVar ? ${CodeGenerator.defaultValue(dataType)} : ($value); + """ + } else { + code"$javaType $valueVar = $value;" + }) + ExprCode(code, isNullVar, JavaCode.variable(valueVar, dataType)) + } + + /** + * Produce code to process the input iterator as [[ColumnarBatch]]es. + * This produces an [[org.apache.spark.sql.catalyst.expressions.UnsafeRow]] for each row in + * each batch. + * + * This code came almost completely unchanged from [[ColumnarBatchScan]] and will + * hopefully replace it at some point. + */ + override protected def doProduce(ctx: CodegenContext): String = { + // PhysicalRDD always just has one input + val input = ctx.addMutableState("scala.collection.Iterator", "input", + v => s"$v = inputs[0];") + + // metrics + val numOutputRows = metricTerm(ctx, "numOutputRows") + val numInputBatches = metricTerm(ctx, "numInputBatches") + val scanTimeMetric = metricTerm(ctx, "scanTime") + val scanTimeTotalNs = + ctx.addMutableState(CodeGenerator.JAVA_LONG, "scanTime") // init as scanTime = 0 + + val columnarBatchClz = classOf[ColumnarBatch].getName + val batch = ctx.addMutableState(columnarBatchClz, "batch") + + val idx = ctx.addMutableState(CodeGenerator.JAVA_INT, "batchIdx") // init as batchIdx = 0 + val columnVectorClzs = child.vectorTypes.getOrElse( + Seq.fill(output.indices.size)(classOf[ColumnVector].getName)) + val (colVars, columnAssigns) = columnVectorClzs.zipWithIndex.map { + case (columnVectorClz, i) => + val name = ctx.addMutableState(columnVectorClz, s"colInstance$i") + (name, s"$name = ($columnVectorClz) $batch.column($i);") + }.unzip + + val nextBatch = ctx.freshName("nextBatch") + val nextBatchFuncName = ctx.addNewFunction(nextBatch, + s""" + |private void $nextBatch() throws java.io.IOException { + | long getBatchStart = System.nanoTime(); + | if ($input.hasNext()) { + | $batch = ($columnarBatchClz)$input.next(); + | $numOutputRows.add($batch.numRows()); + | $idx = 0; + | ${columnAssigns.mkString("", "\n", "\n")} + | ${numInputBatches}.add(1); + | } + | $scanTimeTotalNs += System.nanoTime() - getBatchStart; + |}""".stripMargin) + + ctx.currentVars = null + val rowidx = ctx.freshName("rowIdx") + val columnsBatchInput = (output zip colVars).map { case (attr, colVar) => + genCodeColumnVector(ctx, colVar, rowidx, attr.dataType, attr.nullable) + } + val localIdx = ctx.freshName("localIdx") + val localEnd = ctx.freshName("localEnd") + val numRows = ctx.freshName("numRows") + val shouldStop = if (parent.needStopCheck) { + s"if (shouldStop()) { $idx = $rowidx + 1; return; }" + } else { + "// shouldStop check is eliminated" + } + s""" + |if ($batch == null) { + | $nextBatchFuncName(); + |} + |while ($batch != null) { + | int $numRows = $batch.numRows(); + | int $localEnd = $numRows - $idx; + | for (int $localIdx = 0; $localIdx < $localEnd; $localIdx++) { + | int $rowidx = $idx + $localIdx; + | ${consume(ctx, columnsBatchInput).trim} + | $shouldStop + | } + | $idx = $numRows; + | $batch = null; + | $nextBatchFuncName(); + |} + |$scanTimeMetric.add($scanTimeTotalNs / (1000 * 1000)); + |$scanTimeTotalNs = 0; + """.stripMargin + } + + override def inputRDDs(): Seq[RDD[InternalRow]] = { + child.asInstanceOf[CodegenSupport].inputRDDs() + } +} + +/** + * Provides an optimized set of APIs to append row based data to an array of + * [[WritableColumnVector]]. + */ +private[execution] class RowToColumnConverter(schema: StructType) extends Serializable { + private val converters = schema.fields.map { + f => RowToColumnConverter.getConverterForType(f.dataType, f.nullable) + } + + final def convert(row: InternalRow, vectors: Array[WritableColumnVector]): Unit = { + var idx = 0 + while (idx < row.numFields) { + converters(idx).append(row, idx, vectors(idx)) + idx += 1 + } + } +} + +/** + * Provides an optimized set of APIs to extract a column from a row and append it to a + * [[WritableColumnVector]]. + */ +private object RowToColumnConverter { + private abstract class TypeConverter extends Serializable { + def append(row: SpecializedGetters, column: Int, cv: WritableColumnVector): Unit + } + + private final case class BasicNullableTypeConverter(base: TypeConverter) extends TypeConverter { + override def append(row: SpecializedGetters, column: Int, cv: WritableColumnVector): Unit = { + if (row.isNullAt(column)) { + cv.appendNull + } else { + base.append(row, column, cv) + } + } + } + + private final case class StructNullableTypeConverter(base: TypeConverter) extends TypeConverter { + override def append(row: SpecializedGetters, column: Int, cv: WritableColumnVector): Unit = { + if (row.isNullAt(column)) { + cv.appendStruct(true) + } else { + base.append(row, column, cv) + } + } + } + + private def getConverterForType(dataType: DataType, nullable: Boolean): TypeConverter = { + val core = dataType match { + case BooleanType => BooleanConverter + case ByteType => ByteConverter + case ShortType => ShortConverter + case IntegerType | DateType => IntConverter + case FloatType => FloatConverter + case LongType | TimestampType => LongConverter + case DoubleType => DoubleConverter + case StringType => StringConverter + case CalendarIntervalType => CalendarConverter + case at: ArrayType => new ArrayConverter(getConverterForType(at.elementType, nullable)) + case st: StructType => new StructConverter(st.fields.map( + (f) => getConverterForType(f.dataType, f.nullable))) + case dt: DecimalType => new DecimalConverter(dt) + case mt: MapType => new MapConverter(getConverterForType(mt.keyType, nullable), + getConverterForType(mt.valueType, nullable)) + case unknown => throw new UnsupportedOperationException( + s"Type $unknown not supported") + } + + if (nullable) { + dataType match { + case CalendarIntervalType => new StructNullableTypeConverter(core) + case st: StructType => new StructNullableTypeConverter(core) + case _ => new BasicNullableTypeConverter(core) + } + } else { + core + } + } + + private object BooleanConverter extends TypeConverter { + override def append(row: SpecializedGetters, column: Int, cv: WritableColumnVector): Unit = + cv.appendBoolean(row.getBoolean(column)) + } + + private object ByteConverter extends TypeConverter { + override def append(row: SpecializedGetters, column: Int, cv: WritableColumnVector): Unit = + cv.appendByte(row.getByte(column)) + } + + private object ShortConverter extends TypeConverter { + override def append(row: SpecializedGetters, column: Int, cv: WritableColumnVector): Unit = + cv.appendShort(row.getShort(column)) + } + + private object IntConverter extends TypeConverter { + override def append(row: SpecializedGetters, column: Int, cv: WritableColumnVector): Unit = + cv.appendInt(row.getInt(column)) + } + + private object FloatConverter extends TypeConverter { + override def append(row: SpecializedGetters, column: Int, cv: WritableColumnVector): Unit = + cv.appendFloat(row.getFloat(column)) + } + + private object LongConverter extends TypeConverter { + override def append(row: SpecializedGetters, column: Int, cv: WritableColumnVector): Unit = + cv.appendLong(row.getLong(column)) + } + + private object DoubleConverter extends TypeConverter { + override def append(row: SpecializedGetters, column: Int, cv: WritableColumnVector): Unit = + cv.appendDouble(row.getDouble(column)) + } + + private object StringConverter extends TypeConverter { + override def append(row: SpecializedGetters, column: Int, cv: WritableColumnVector): Unit = { + val data = row.getUTF8String(column).getBytes + cv.appendByteArray(data, 0, data.length) + } + } + + private object CalendarConverter extends TypeConverter { + override def append(row: SpecializedGetters, column: Int, cv: WritableColumnVector): Unit = { + val c = row.getInterval(column) + cv.appendStruct(false) + cv.getChild(0).appendInt(c.months) + cv.getChild(1).appendLong(c.microseconds) + } + } + + private case class ArrayConverter(childConverter: TypeConverter) extends TypeConverter { + override def append(row: SpecializedGetters, column: Int, cv: WritableColumnVector): Unit = { + val values = row.getArray(column) + val numElements = values.numElements() + cv.appendArray(numElements) + val arrData = cv.arrayData() + for (i <- 0 until numElements) { + childConverter.append(values, i, arrData) + } + } + } + + private case class StructConverter(childConverters: Array[TypeConverter]) extends TypeConverter { + override def append(row: SpecializedGetters, column: Int, cv: WritableColumnVector): Unit = { + cv.appendStruct(false) + val data = row.getStruct(column, childConverters.length) + for (i <- 0 until childConverters.length) { + childConverters(i).append(data, i, cv.getChild(i)) + } + } + } + + private case class DecimalConverter(dt: DecimalType) extends TypeConverter { + override def append(row: SpecializedGetters, column: Int, cv: WritableColumnVector): Unit = { + val d = row.getDecimal(column, dt.precision, dt.scale) + if (dt.precision <= Decimal.MAX_INT_DIGITS) { + cv.appendInt(d.toUnscaledLong.toInt) + } else if (dt.precision <= Decimal.MAX_LONG_DIGITS) { + cv.appendLong(d.toUnscaledLong) + } else { + val integer = d.toJavaBigDecimal.unscaledValue + val bytes = integer.toByteArray + cv.appendByteArray(bytes, 0, bytes.length) + } + } + } + + private case class MapConverter(keyConverter: TypeConverter, valueConverter: TypeConverter) + extends TypeConverter { + override def append(row: SpecializedGetters, column: Int, cv: WritableColumnVector): Unit = { + val m = row.getMap(column) + val keys = cv.getChild(0) + val values = cv.getChild(1) + val numElements = m.numElements() + cv.appendArray(numElements) + + val srcKeys = m.keyArray() + val srcValues = m.valueArray() + + for (i <- 0 until numElements) { + keyConverter.append(srcKeys, i, keys) + valueConverter.append(srcValues, i, values) + } + } + } +} + +/** + * Provides a common executor to translate an [[RDD]] of [[InternalRow]] into an [[RDD]] of + * [[ColumnarBatch]]. This is inserted whenever such a transition is determined to be needed. + * + * This is similar to some of the code in ArrowConverters.scala and + * [[org.apache.spark.sql.execution.arrow.ArrowWriter]]. That code is more specialized + * to convert [[InternalRow]] to Arrow formatted data, but in the future if we make + * [[OffHeapColumnVector]] internally Arrow formatted we may be able to replace much of that code. + * + * This is also similar to + * [[org.apache.spark.sql.execution.vectorized.ColumnVectorUtils.populate()]] and + * [[org.apache.spark.sql.execution.vectorized.ColumnVectorUtils.toBatch()]] toBatch is only ever + * called from tests and can probably be removed, but populate is used by both Orc and Parquet + * to initialize partition and missing columns. There is some chance that we could replace + * populate with [[RowToColumnConverter]], but the performance requirements are different and it + * would only be to reduce code. + */ +case class RowToColumnarExec(child: SparkPlan) extends UnaryExecNode { + override def output: Seq[Attribute] = child.output + + override def outputPartitioning: Partitioning = child.outputPartitioning + + override def outputOrdering: Seq[SortOrder] = child.outputOrdering + + override def doExecute(): RDD[InternalRow] = { + child.execute() + } + + override def doExecuteBroadcast[T](): broadcast.Broadcast[T] = { + child.doExecuteBroadcast() + } + + override def supportsColumnar: Boolean = true + + override lazy val metrics: Map[String, SQLMetric] = Map( + "numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input rows"), + "numOutputBatches" -> SQLMetrics.createMetric(sparkContext, "number of output batches") + ) + + override def doExecuteColumnar(): RDD[ColumnarBatch] = { + val enableOffHeapColumnVector = sqlContext.conf.offHeapColumnVectorEnabled + val numInputRows = longMetric("numInputRows") + val numOutputBatches = longMetric("numOutputBatches") + // Instead of creating a new config we are reusing columnBatchSize. In the future if we do + // combine with some of the Arrow conversion tools we will need to unify some of the configs. + val numRows = conf.columnBatchSize + val converters = new RowToColumnConverter(schema) + val rowBased = child.execute() + rowBased.mapPartitions(rowIterator => { + new Iterator[ColumnarBatch] { + var cb: ColumnarBatch = null + + TaskContext.get().addTaskCompletionListener[Unit] { _ => + if (cb != null) { + cb.close Review comment: nit: `cb.close()`? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
