This is an automated email from the ASF dual-hosted git repository. tgraves pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 8dff711 [SPARK-28213][SQL] Replace ColumnarBatchScan with equivilant from Columnar 8dff711 is described below commit 8dff711ce732d476593a4e235d68e5e1728046cb Author: Robert (Bobby) Evans <bo...@apache.org> AuthorDate: Thu Jul 11 09:03:30 2019 -0500 [SPARK-28213][SQL] Replace ColumnarBatchScan with equivilant from Columnar ## What changes were proposed in this pull request? This is a second part of the https://issues.apache.org/jira/browse/SPARK-27396 and a follow on to #24795 ## How was this patch tested? I did some manual tests and ran/updated the automated tests I did some simple performance tests on a single node to try to verify that there is no performance impact, and I was not able to measure anything beyond noise. Closes #25008 from revans2/columnar-remove-batch-scan. Authored-by: Robert (Bobby) Evans <bo...@apache.org> Signed-off-by: Thomas Graves <tgra...@apache.org> --- .../org/apache/spark/sql/execution/Columnar.scala | 10 +- .../spark/sql/execution/ColumnarBatchScan.scala | 167 --------------------- .../spark/sql/execution/DataSourceScanExec.scala | 60 ++++---- .../sql/execution/WholeStageCodegenExec.scala | 6 +- .../execution/adaptive/AdaptiveSparkPlanExec.scala | 2 + .../execution/columnar/InMemoryTableScanExec.scala | 109 +++++++------- .../execution/datasources/v2/BatchScanExec.scala | 2 +- .../datasources/v2/DataSourceV2ScanExecBase.scala | 34 +++-- .../datasources/v2/MicroBatchScanExec.scala | 2 +- .../org/apache/spark/sql/CachedTableSuite.scala | 2 +- .../scala/org/apache/spark/sql/SubquerySuite.scala | 5 +- .../execution/LogicalPlanTagInSparkPlanSuite.scala | 13 +- .../sql/execution/WholeStageCodegenSuite.scala | 42 ------ .../columnar/InMemoryColumnarQuerySuite.scala | 11 +- .../datasources/parquet/ParquetQuerySuite.scala | 4 +- .../sql/execution/metric/SQLMetricsSuite.scala | 6 +- .../org/apache/spark/sql/test/SQLTestUtils.scala | 2 +- 17 files changed, 142 insertions(+), 335 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala index 315eba6..4385843 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala @@ -53,8 +53,8 @@ class ColumnarRule { * Provides a common executor to translate an [[RDD]] of [[ColumnarBatch]] into an [[RDD]] of * [[InternalRow]]. This is inserted whenever such a transition is determined to be needed. * - * The implementation is based off of similar implementations in [[ColumnarBatchScan]], - * [[org.apache.spark.sql.execution.python.ArrowEvalPythonExec]], and + * The implementation is based off of similar implementations in + * [[org.apache.spark.sql.execution.python.ArrowEvalPythonExec]] and * [[MapPartitionsInRWithArrowExec]]. Eventually this should replace those implementations. */ case class ColumnarToRowExec(child: SparkPlan) @@ -96,9 +96,6 @@ case class ColumnarToRowExec(child: SparkPlan) /** * Generate [[ColumnVector]] expressions for our parent to consume as rows. * This is called once per [[ColumnVector]] in the batch. - * - * This code came unchanged from [[ColumnarBatchScan]] and will hopefully replace it - * at some point. */ private def genCodeColumnVector( ctx: CodegenContext, @@ -130,9 +127,6 @@ case class ColumnarToRowExec(child: SparkPlan) * Produce code to process the input iterator as [[ColumnarBatch]]es. * This produces an [[org.apache.spark.sql.catalyst.expressions.UnsafeRow]] for each row in * each batch. - * - * This code came almost completely unchanged from [[ColumnarBatchScan]] and will - * hopefully replace it at some point. */ override protected def doProduce(ctx: CodegenContext): String = { // PhysicalRDD always just has one input diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala deleted file mode 100644 index b2e9f76..0000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala +++ /dev/null @@ -1,167 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution - -import org.apache.spark.sql.catalyst.expressions.{BoundReference, UnsafeRow} -import org.apache.spark.sql.catalyst.expressions.codegen._ -import org.apache.spark.sql.catalyst.expressions.codegen.Block._ -import org.apache.spark.sql.execution.metric.SQLMetrics -import org.apache.spark.sql.types.DataType -import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector} - - -/** - * Helper trait for abstracting scan functionality using [[ColumnarBatch]]es. - */ -private[sql] trait ColumnarBatchScan extends CodegenSupport { - - protected def supportsBatch: Boolean = true - - override lazy val metrics = Map( - "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), - "scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time")) - - /** - * Generate [[ColumnVector]] expressions for our parent to consume as rows. - * This is called once per [[ColumnarBatch]]. - */ - private def genCodeColumnVector( - ctx: CodegenContext, - columnVar: String, - ordinal: String, - dataType: DataType, - nullable: Boolean): ExprCode = { - val javaType = CodeGenerator.javaType(dataType) - val value = CodeGenerator.getValueFromVector(columnVar, dataType, ordinal) - val isNullVar = if (nullable) { - JavaCode.isNullVariable(ctx.freshName("isNull")) - } else { - FalseLiteral - } - val valueVar = ctx.freshName("value") - val str = s"columnVector[$columnVar, $ordinal, ${dataType.simpleString}]" - val code = code"${ctx.registerComment(str)}" + (if (nullable) { - code""" - boolean $isNullVar = $columnVar.isNullAt($ordinal); - $javaType $valueVar = $isNullVar ? ${CodeGenerator.defaultValue(dataType)} : ($value); - """ - } else { - code"$javaType $valueVar = $value;" - }) - ExprCode(code, isNullVar, JavaCode.variable(valueVar, dataType)) - } - - /** - * Produce code to process the input iterator as [[ColumnarBatch]]es. - * This produces an [[UnsafeRow]] for each row in each batch. - */ - // TODO: return ColumnarBatch.Rows instead - override protected def doProduce(ctx: CodegenContext): String = { - // PhysicalRDD always just has one input - val input = ctx.addMutableState("scala.collection.Iterator", "input", - v => s"$v = inputs[0];") - if (supportsBatch) { - produceBatches(ctx, input) - } else { - produceRows(ctx, input) - } - } - - private def produceBatches(ctx: CodegenContext, input: String): String = { - // metrics - val numOutputRows = metricTerm(ctx, "numOutputRows") - val scanTimeMetric = metricTerm(ctx, "scanTime") - val scanTimeTotalNs = - ctx.addMutableState(CodeGenerator.JAVA_LONG, "scanTime") // init as scanTime = 0 - - val columnarBatchClz = classOf[ColumnarBatch].getName - val batch = ctx.addMutableState(columnarBatchClz, "batch") - - val idx = ctx.addMutableState(CodeGenerator.JAVA_INT, "batchIdx") // init as batchIdx = 0 - val columnVectorClzs = vectorTypes.getOrElse( - Seq.fill(output.indices.size)(classOf[ColumnVector].getName)) - val (colVars, columnAssigns) = columnVectorClzs.zipWithIndex.map { - case (columnVectorClz, i) => - val name = ctx.addMutableState(columnVectorClz, s"colInstance$i") - (name, s"$name = ($columnVectorClz) $batch.column($i);") - }.unzip - - val nextBatch = ctx.freshName("nextBatch") - val nextBatchFuncName = ctx.addNewFunction(nextBatch, - s""" - |private void $nextBatch() throws java.io.IOException { - | long getBatchStart = System.nanoTime(); - | if ($input.hasNext()) { - | $batch = ($columnarBatchClz)$input.next(); - | $numOutputRows.add($batch.numRows()); - | $idx = 0; - | ${columnAssigns.mkString("", "\n", "\n")} - | } - | $scanTimeTotalNs += System.nanoTime() - getBatchStart; - |}""".stripMargin) - - ctx.currentVars = null - val rowidx = ctx.freshName("rowIdx") - val columnsBatchInput = (output zip colVars).map { case (attr, colVar) => - genCodeColumnVector(ctx, colVar, rowidx, attr.dataType, attr.nullable) - } - val localIdx = ctx.freshName("localIdx") - val localEnd = ctx.freshName("localEnd") - val numRows = ctx.freshName("numRows") - val shouldStop = if (parent.needStopCheck) { - s"if (shouldStop()) { $idx = $rowidx + 1; return; }" - } else { - "// shouldStop check is eliminated" - } - s""" - |if ($batch == null) { - | $nextBatchFuncName(); - |} - |while ($limitNotReachedCond $batch != null) { - | int $numRows = $batch.numRows(); - | int $localEnd = $numRows - $idx; - | for (int $localIdx = 0; $localIdx < $localEnd; $localIdx++) { - | int $rowidx = $idx + $localIdx; - | ${consume(ctx, columnsBatchInput).trim} - | $shouldStop - | } - | $idx = $numRows; - | $batch = null; - | $nextBatchFuncName(); - |} - |$scanTimeMetric.add($scanTimeTotalNs / (1000 * 1000)); - |$scanTimeTotalNs = 0; - """.stripMargin - } - - private def produceRows(ctx: CodegenContext, input: String): String = { - val numOutputRows = metricTerm(ctx, "numOutputRows") - val row = ctx.freshName("row") - - ctx.INPUT_ROW = row - ctx.currentVars = null - s""" - |while ($limitNotReachedCond $input.hasNext()) { - | InternalRow $row = (InternalRow) $input.next(); - | $numOutputRows.add(1); - | ${consume(ctx, null, row).trim} - | if (shouldStop()) return; - |} - """.stripMargin - } -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index 518460d..728ac3a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -37,10 +37,11 @@ import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat => import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.sql.sources.{BaseRelation, Filter} import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.util.Utils import org.apache.spark.util.collection.BitSet -trait DataSourceScanExec extends LeafExecNode with CodegenSupport { +trait DataSourceScanExec extends LeafExecNode { val relation: BaseRelation val tableIdentifier: Option[TableIdentifier] @@ -69,6 +70,12 @@ trait DataSourceScanExec extends LeafExecNode with CodegenSupport { private def redact(text: String): String = { Utils.redact(sqlContext.sessionState.conf.stringRedactionPattern, text) } + + /** + * The data being read in. This is to provide input to the tests in a way compatible with + * [[InputRDDCodegen]] which all implementations used to extend. + */ + def inputRDDs(): Seq[RDD[InternalRow]] } /** Physical plan node for scanning data from a relation. */ @@ -141,11 +148,11 @@ case class FileSourceScanExec( optionalBucketSet: Option[BitSet], dataFilters: Seq[Expression], override val tableIdentifier: Option[TableIdentifier]) - extends DataSourceScanExec with ColumnarBatchScan { + extends DataSourceScanExec { // Note that some vals referring the file-based relation are lazy intentionally // so that this plan can be canonicalized on executor side too. See SPARK-23731. - override lazy val supportsBatch: Boolean = { + override lazy val supportsColumnar: Boolean = { relation.fileFormat.supportBatch(relation.sparkSession, schema) } @@ -275,7 +282,7 @@ case class FileSourceScanExec( Map( "Format" -> relation.fileFormat.toString, "ReadSchema" -> requiredSchema.catalogString, - "Batched" -> supportsBatch.toString, + "Batched" -> supportsColumnar.toString, "PartitionFilters" -> seqToString(partitionFilters), "PushedFilters" -> seqToString(pushedDownFilters), "DataFilters" -> seqToString(dataFilters), @@ -302,7 +309,7 @@ case class FileSourceScanExec( withSelectedBucketsCount } - private lazy val inputRDD: RDD[InternalRow] = { + lazy val inputRDD: RDD[InternalRow] = { val readFile: (PartitionedFile) => Iterator[InternalRow] = relation.fileFormat.buildReaderWithPartitionValues( sparkSession = relation.sparkSession, @@ -334,29 +341,30 @@ case class FileSourceScanExec( "scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time")) protected override def doExecute(): RDD[InternalRow] = { - if (supportsBatch) { - // in the case of fallback, this batched scan should never fail because of: - // 1) only primitive types are supported - // 2) the number of columns should be smaller than spark.sql.codegen.maxFields - WholeStageCodegenExec(this)(codegenStageId = 0).execute() - } else { - val numOutputRows = longMetric("numOutputRows") - - if (needsUnsafeRowConversion) { - inputRDD.mapPartitionsWithIndexInternal { (index, iter) => - val proj = UnsafeProjection.create(schema) - proj.initialize(index) - iter.map( r => { - numOutputRows += 1 - proj(r) - }) - } - } else { - inputRDD.map { r => + val numOutputRows = longMetric("numOutputRows") + + if (needsUnsafeRowConversion) { + inputRDD.mapPartitionsWithIndexInternal { (index, iter) => + val proj = UnsafeProjection.create(schema) + proj.initialize(index) + iter.map( r => { numOutputRows += 1 - r - } + proj(r) + }) } + } else { + inputRDD.map { r => + numOutputRows += 1 + r + } + } + } + + protected override def doExecuteColumnar(): RDD[ColumnarBatch] = { + val numOutputRows = longMetric("numOutputRows") + inputRDD.asInstanceOf[RDD[ColumnarBatch]].map { batch => + numOutputRows += batch.numRows() + batch } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala index 94a5ede..a0afa9a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala @@ -709,11 +709,7 @@ case class WholeStageCodegenExec(child: SparkPlan)(val codegenStageId: Int) s"${sqlContext.conf.hugeMethodLimit}, and the whole-stage codegen was disabled " + s"for this plan (id=$codegenStageId). To avoid this, you can raise the limit " + s"`${SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.key}`:\n$treeString") - child match { - // The fallback solution of batch file source scan still uses WholeStageCodegenExec - case f: FileSourceScanExec if f.supportsBatch => // do nothing - case _ => return child.execute() - } + return child.execute() } val references = ctx.references.toArray diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala index 0708878..61dbc58 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala @@ -84,6 +84,8 @@ case class AdaptiveSparkPlanExec( // optimizations should be stage-independent. @transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq( ReduceNumShufflePartitions(conf), + ApplyColumnarRulesAndInsertTransitions(session.sessionState.conf, + session.sessionState.columnarRules), CollapseCodegenStages(conf) ) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala index 06634c1..7a8c6d6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala @@ -24,7 +24,8 @@ import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.physical.Partitioning -import org.apache.spark.sql.execution.{ColumnarBatchScan, LeafExecNode, SparkPlan, WholeStageCodegenExec} +import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan, WholeStageCodegenExec} +import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.sql.execution.vectorized._ import org.apache.spark.sql.types._ import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector} @@ -34,7 +35,10 @@ case class InMemoryTableScanExec( attributes: Seq[Attribute], predicates: Seq[Expression], @transient relation: InMemoryRelation) - extends LeafExecNode with ColumnarBatchScan { + extends LeafExecNode { + + override lazy val metrics = Map( + "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) override val nodeName: String = { relation.cacheBuilder.tableName match { @@ -65,7 +69,7 @@ case class InMemoryTableScanExec( * If true, get data from ColumnVector in ColumnarBatch, which are generally faster. * If false, get data from UnsafeRow build from CachedBatch */ - override val supportsBatch: Boolean = { + override val supportsColumnar: Boolean = { // In the initial implementation, for ease of review // support only primitive data types and # of fields is less than wholeStageMaxNumFields conf.cacheVectorizedReaderEnabled && relation.schema.fields.forall(f => f.dataType match { @@ -75,9 +79,6 @@ case class InMemoryTableScanExec( }) && !WholeStageCodegenExec.isTooManyFields(conf, relation.schema) } - // TODO: revisit this. Shall we always turn off whole stage codegen if the output data are rows? - override def supportCodegen: Boolean = supportsBatch - private val columnIndices = attributes.map(a => relation.output.map(o => o.exprId).indexOf(a.exprId)).toArray @@ -108,59 +109,61 @@ case class InMemoryTableScanExec( columnarBatch } + private lazy val columnarInputRDD: RDD[ColumnarBatch] = { + val numOutputRows = longMetric("numOutputRows") + val buffers = filteredCachedBatches() + val offHeapColumnVectorEnabled = conf.offHeapColumnVectorEnabled + buffers + .map(createAndDecompressColumn(_, offHeapColumnVectorEnabled)) + .map(b => { + numOutputRows += b.numRows() + b + }) + } + private lazy val inputRDD: RDD[InternalRow] = { val buffers = filteredCachedBatches() val offHeapColumnVectorEnabled = conf.offHeapColumnVectorEnabled - if (supportsBatch) { - // HACK ALERT: This is actually an RDD[ColumnarBatch]. - // We're taking advantage of Scala's type erasure here to pass these batches along. - buffers - .map(createAndDecompressColumn(_, offHeapColumnVectorEnabled)) - .asInstanceOf[RDD[InternalRow]] - } else { - val numOutputRows = longMetric("numOutputRows") + val numOutputRows = longMetric("numOutputRows") - if (enableAccumulatorsForTest) { - readPartitions.setValue(0) - readBatches.setValue(0) - } + if (enableAccumulatorsForTest) { + readPartitions.setValue(0) + readBatches.setValue(0) + } - // Using these variables here to avoid serialization of entire objects (if referenced - // directly) within the map Partitions closure. - val relOutput: AttributeSeq = relation.output - - filteredCachedBatches().mapPartitionsInternal { cachedBatchIterator => - // Find the ordinals and data types of the requested columns. - val (requestedColumnIndices, requestedColumnDataTypes) = - attributes.map { a => - relOutput.indexOf(a.exprId) -> a.dataType - }.unzip - - // update SQL metrics - val withMetrics = cachedBatchIterator.map { batch => - if (enableAccumulatorsForTest) { - readBatches.add(1) - } - numOutputRows += batch.numRows - batch + // Using these variables here to avoid serialization of entire objects (if referenced + // directly) within the map Partitions closure. + val relOutput: AttributeSeq = relation.output + + filteredCachedBatches().mapPartitionsInternal { cachedBatchIterator => + // Find the ordinals and data types of the requested columns. + val (requestedColumnIndices, requestedColumnDataTypes) = + attributes.map { a => + relOutput.indexOf(a.exprId) -> a.dataType + }.unzip + + // update SQL metrics + val withMetrics = cachedBatchIterator.map { batch => + if (enableAccumulatorsForTest) { + readBatches.add(1) } + numOutputRows += batch.numRows + batch + } - val columnTypes = requestedColumnDataTypes.map { - case udt: UserDefinedType[_] => udt.sqlType - case other => other - }.toArray - val columnarIterator = GenerateColumnAccessor.generate(columnTypes) - columnarIterator.initialize(withMetrics, columnTypes, requestedColumnIndices.toArray) - if (enableAccumulatorsForTest && columnarIterator.hasNext) { - readPartitions.add(1) - } - columnarIterator + val columnTypes = requestedColumnDataTypes.map { + case udt: UserDefinedType[_] => udt.sqlType + case other => other + }.toArray + val columnarIterator = GenerateColumnAccessor.generate(columnTypes) + columnarIterator.initialize(withMetrics, columnTypes, requestedColumnIndices.toArray) + if (enableAccumulatorsForTest && columnarIterator.hasNext) { + readPartitions.add(1) } + columnarIterator } } - override def inputRDDs(): Seq[RDD[InternalRow]] = Seq(inputRDD) - override def output: Seq[Attribute] = attributes private def updateAttribute(expr: Expression): Expression = { @@ -339,10 +342,10 @@ case class InMemoryTableScanExec( } protected override def doExecute(): RDD[InternalRow] = { - if (supportsBatch) { - WholeStageCodegenExec(this)(codegenStageId = 0).execute() - } else { - inputRDD - } + inputRDD + } + + protected override def doExecuteColumnar(): RDD[ColumnarBatch] = { + columnarInputRDD } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala index 3276ab5..c3cbb9d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala @@ -45,7 +45,7 @@ case class BatchScanExec( override lazy val readerFactory: PartitionReaderFactory = batch.createReaderFactory() override lazy val inputRDD: RDD[InternalRow] = { - new DataSourceRDD(sparkContext, partitions, readerFactory, supportsBatch) + new DataSourceRDD(sparkContext, partitions, readerFactory, supportsColumnar) } override def doCanonicalize(): BatchScanExec = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala index 9ad683f..c5c902f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala @@ -23,11 +23,16 @@ import org.apache.spark.sql.catalyst.expressions.AttributeMap import org.apache.spark.sql.catalyst.plans.physical import org.apache.spark.sql.catalyst.plans.physical.SinglePartition import org.apache.spark.sql.catalyst.util.truncatedString -import org.apache.spark.sql.execution.{ColumnarBatchScan, LeafExecNode, WholeStageCodegenExec} +import org.apache.spark.sql.execution.LeafExecNode +import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.sql.sources.v2.reader.{InputPartition, PartitionReaderFactory, Scan, SupportsReportPartitioning} +import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.util.Utils -trait DataSourceV2ScanExecBase extends LeafExecNode with ColumnarBatchScan { +trait DataSourceV2ScanExecBase extends LeafExecNode { + + override lazy val metrics = Map( + "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) def scan: Scan @@ -52,7 +57,7 @@ trait DataSourceV2ScanExecBase extends LeafExecNode with ColumnarBatchScan { case _ => super.outputPartitioning } - override def supportsBatch: Boolean = { + override def supportsColumnar: Boolean = { require(partitions.forall(readerFactory.supportColumnarReads) || !partitions.exists(readerFactory.supportColumnarReads), "Cannot mix row-based and columnar input partitions.") @@ -62,17 +67,22 @@ trait DataSourceV2ScanExecBase extends LeafExecNode with ColumnarBatchScan { def inputRDD: RDD[InternalRow] - override def inputRDDs(): Seq[RDD[InternalRow]] = Seq(inputRDD) + def inputRDDs(): Seq[RDD[InternalRow]] = Seq(inputRDD) override def doExecute(): RDD[InternalRow] = { - if (supportsBatch) { - WholeStageCodegenExec(this)(codegenStageId = 0).execute() - } else { - val numOutputRows = longMetric("numOutputRows") - inputRDD.map { r => - numOutputRows += 1 - r - } + val numOutputRows = longMetric("numOutputRows") + inputRDD.map { r => + numOutputRows += 1 + r + } + } + + override def doExecuteColumnar(): RDD[ColumnarBatch] = { + val numOutputRows = longMetric("numOutputRows") + inputRDD.asInstanceOf[RDD[ColumnarBatch]].map { + b => + numOutputRows += b.numRows() + b } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MicroBatchScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MicroBatchScanExec.scala index d2e33d4..a9b0f5b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MicroBatchScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MicroBatchScanExec.scala @@ -46,6 +46,6 @@ case class MicroBatchScanExec( override lazy val readerFactory: PartitionReaderFactory = stream.createReaderFactory() override lazy val inputRDD: RDD[InternalRow] = { - new DataSourceRDD(sparkContext, partitions, readerFactory, supportsBatch) + new DataSourceRDD(sparkContext, partitions, readerFactory, supportsColumnar) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala index 6049e89..267f255 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala @@ -832,7 +832,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext val df = spark.range(10).cache() df.queryExecution.executedPlan.foreach { case i: InMemoryTableScanExec => - assert(i.supportsBatch == vectorized && i.supportCodegen == vectorized) + assert(i.supportsColumnar == vectorized) case _ => } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala index fddc4f6..b2c3868 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala @@ -21,7 +21,7 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.sql.catalyst.expressions.SubqueryExpression import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan, Sort} -import org.apache.spark.sql.execution.{ExecSubqueryExpression, FileSourceScanExec, ReusedSubqueryExec, ScalarSubquery, SubqueryExec, WholeStageCodegenExec} +import org.apache.spark.sql.execution.{ColumnarToRowExec, ExecSubqueryExpression, FileSourceScanExec, InputAdapter, ReusedSubqueryExec, ScalarSubquery, SubqueryExec, WholeStageCodegenExec} import org.apache.spark.sql.execution.datasources.FileScanRDD import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext @@ -1293,7 +1293,8 @@ class SubquerySuite extends QueryTest with SharedSQLContext { checkAnswer(df, Seq(Row(0, 0), Row(2, 0))) // need to execute the query before we can examine fs.inputRDDs() assert(df.queryExecution.executedPlan match { - case WholeStageCodegenExec(fs @ FileSourceScanExec(_, _, _, partitionFilters, _, _, _)) => + case WholeStageCodegenExec(ColumnarToRowExec(InputAdapter( + fs @ FileSourceScanExec(_, _, _, partitionFilters, _, _, _), _))) => partitionFilters.exists(ExecSubqueryExpression.hasSubquery) && fs.inputRDDs().forall( _.asInstanceOf[FileScanRDD].filePartitions.forall( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/LogicalPlanTagInSparkPlanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/LogicalPlanTagInSparkPlanSuite.scala index b35348b..b114348 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/LogicalPlanTagInSparkPlanSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/LogicalPlanTagInSparkPlanSuite.scala @@ -44,9 +44,14 @@ class LogicalPlanTagInSparkPlanSuite extends TPCDSQuerySuite { } // A scan plan tree is a plan tree that has a leaf node under zero or more Project/Filter nodes. - private def isScanPlanTree(plan: SparkPlan): Boolean = plan match { - case p: ProjectExec => isScanPlanTree(p.child) - case f: FilterExec => isScanPlanTree(f.child) + // Because of how codegen and columnar to row transitions work, we may have InputAdaptors + // and ColumnarToRow transformations in the middle of it, but they will not have the tag + // we want, so skip them if they are the first thing we see + private def isScanPlanTree(plan: SparkPlan, first: Boolean): Boolean = plan match { + case i: InputAdapter if !first => isScanPlanTree(i.child, false) + case c: ColumnarToRowExec if !first => isScanPlanTree(c.child, false) + case p: ProjectExec => isScanPlanTree(p.child, false) + case f: FilterExec => isScanPlanTree(f.child, false) case _: LeafExecNode => true case _ => false } @@ -87,7 +92,7 @@ class LogicalPlanTagInSparkPlanSuite extends TPCDSQuerySuite { case _: SubqueryExec | _: ReusedSubqueryExec => assert(plan.getTagValue(SparkPlan.LOGICAL_PLAN_TAG).isEmpty) - case _ if isScanPlanTree(plan) => + case _ if isScanPlanTree(plan, true) => // The strategies for planning scan can remove or add FilterExec/ProjectExec nodes, // so it's not simple to check. Instead, we only check that the origin LogicalPlan // contains the corresponding leaf node of the SparkPlan. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala index 9462ee1..a276e47 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala @@ -121,29 +121,6 @@ class WholeStageCodegenSuite extends QueryTest with SharedSQLContext { assert(ds.collect() === Array(("a", 10.0), ("b", 3.0), ("c", 1.0))) } - test("cache for primitive type should be in WholeStageCodegen with InMemoryTableScanExec") { - import testImplicits._ - - val dsInt = spark.range(3).cache() - dsInt.count() - val dsIntFilter = dsInt.filter(_ > 0) - val planInt = dsIntFilter.queryExecution.executedPlan - assert(planInt.collect { - case WholeStageCodegenExec(FilterExec(_, i: InMemoryTableScanExec)) if i.supportsBatch => () - }.length == 1) - assert(dsIntFilter.collect() === Array(1, 2)) - - // cache for string type is not supported for InMemoryTableScanExec - val dsString = spark.range(3).map(_.toString).cache() - dsString.count() - val dsStringFilter = dsString.filter(_ == "1") - val planString = dsStringFilter.queryExecution.executedPlan - assert(planString.collect { - case i: InMemoryTableScanExec if !i.supportsBatch => () - }.length == 1) - assert(dsStringFilter.collect() === Array("1")) - } - test("SPARK-19512 codegen for comparing structs is incorrect") { // this would raise CompileException before the fix spark.range(10) @@ -213,25 +190,6 @@ class WholeStageCodegenSuite extends QueryTest with SharedSQLContext { assert(maxCodeSize2 > SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.defaultValue.get) } - ignore("bytecode of batch file scan exceeds the limit of WHOLESTAGE_HUGE_METHOD_LIMIT") { - import testImplicits._ - withTempPath { dir => - val path = dir.getCanonicalPath - val df = spark.range(10).select(Seq.tabulate(201) {i => ('id + i).as(s"c$i")} : _*) - df.write.mode(SaveMode.Overwrite).parquet(path) - - withSQLConf(SQLConf.WHOLESTAGE_MAX_NUM_FIELDS.key -> "202", - SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.key -> "2000") { - // wide table batch scan causes the byte code of codegen exceeds the limit of - // WHOLESTAGE_HUGE_METHOD_LIMIT - val df2 = spark.read.parquet(path) - val fileScan2 = df2.queryExecution.sparkPlan.find(_.isInstanceOf[FileSourceScanExec]).get - assert(fileScan2.asInstanceOf[FileSourceScanExec].supportsBatch) - checkAnswer(df2, df) - } - } - } - test("Control splitting consume function by operators with config") { import testImplicits._ val df = spark.range(10).select(Seq.tabulate(2) {i => ('id + i).as(s"c$i")} : _*) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala index d31e49c..466baf2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.{DataFrame, QueryTest, Row} import org.apache.spark.sql.catalyst.expressions.{AttributeReference, AttributeSet, In} import org.apache.spark.sql.catalyst.plans.logical.LocalRelation import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning -import org.apache.spark.sql.execution.{FilterExec, LocalTableScanExec, WholeStageCodegenExec} +import org.apache.spark.sql.execution.{ColumnarToRowExec, FilterExec, InputAdapter, LocalTableScanExec, WholeStageCodegenExec} import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext @@ -486,15 +486,12 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { val df2 = df1.where("y = 3") val planBeforeFilter = df2.queryExecution.executedPlan.collect { - case f: FilterExec => f.child + case FilterExec(_, c: ColumnarToRowExec) => c.child + case WholeStageCodegenExec(FilterExec(_, ColumnarToRowExec(i: InputAdapter))) => i.child } assert(planBeforeFilter.head.isInstanceOf[InMemoryTableScanExec]) - val execPlan = if (codegenEnabled == "true") { - WholeStageCodegenExec(planBeforeFilter.head)(codegenStageId = 0) - } else { - planBeforeFilter.head - } + val execPlan = planBeforeFilter.head assert(execPlan.executeCollectPublic().length == 0) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala index 7aa0ba7..a6429bf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -924,14 +924,14 @@ class ParquetV1QuerySuite extends ParquetQuerySuite { // donot return batch, because whole stage codegen is disabled for wide table (>200 columns) val df2 = spark.read.parquet(path) val fileScan2 = df2.queryExecution.sparkPlan.find(_.isInstanceOf[FileSourceScanExec]).get - assert(!fileScan2.asInstanceOf[FileSourceScanExec].supportsBatch) + assert(!fileScan2.asInstanceOf[FileSourceScanExec].supportsColumnar) checkAnswer(df2, df) // return batch val columns = Seq.tabulate(9) {i => s"c$i"} val df3 = df2.selectExpr(columns : _*) val fileScan3 = df3.queryExecution.sparkPlan.find(_.isInstanceOf[FileSourceScanExec]).get - assert(fileScan3.asInstanceOf[FileSourceScanExec].supportsBatch) + assert(fileScan3.asInstanceOf[FileSourceScanExec].supportsColumnar) checkAnswer(df3, df.selectExpr(columns : _*)) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala index a8d2308..b260f5d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala @@ -584,19 +584,19 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared sql("CREATE TEMPORARY VIEW inMemoryTable AS SELECT 1 AS c1") sql("CACHE TABLE inMemoryTable") testSparkPlanMetrics(spark.table("inMemoryTable"), 1, - Map(0L -> (("Scan In-memory table `inMemoryTable`", Map.empty))) + Map(1L -> (("Scan In-memory table `inMemoryTable`", Map.empty))) ) sql("CREATE TEMPORARY VIEW ```a``b``` AS SELECT 2 AS c1") sql("CACHE TABLE ```a``b```") testSparkPlanMetrics(spark.table("```a``b```"), 1, - Map(0L -> (("Scan In-memory table ```a``b```", Map.empty))) + Map(1L -> (("Scan In-memory table ```a``b```", Map.empty))) ) } // Show InMemoryTableScan on UI testSparkPlanMetrics(spark.range(1).cache().select("id"), 1, - Map(0L -> (("InMemoryTableScan", Map.empty))) + Map(1L -> (("InMemoryTableScan", Map.empty))) ) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala index da0e553..115536d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala @@ -392,7 +392,7 @@ private[sql] trait SQLTestUtilsBase */ protected def stripSparkFilter(df: DataFrame): DataFrame = { val schema = df.schema - val withoutFilters = df.queryExecution.sparkPlan.transform { + val withoutFilters = df.queryExecution.executedPlan.transform { case FilterExec(_, child) => child } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org