hvanhovell commented on a change in pull request #25264:
[SPARK-28213][SQL][followup] code cleanup and bug fix for columnar execution
framework
URL: https://github.com/apache/spark/pull/25264#discussion_r309854132
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala
##########
@@ -57,40 +57,38 @@ class ColumnarRule {
* [[org.apache.spark.sql.execution.python.ArrowEvalPythonExec]] and
* [[MapPartitionsInRWithArrowExec]]. Eventually this should replace those
implementations.
*/
-case class ColumnarToRowExec(child: SparkPlan)
- extends UnaryExecNode with CodegenSupport {
+case class ColumnarToRowExec(child: SparkPlan) extends UnaryExecNode with
CodegenSupport {
+ assert(child.supportsColumnar)
override def output: Seq[Attribute] = child.output
override def outputPartitioning: Partitioning = child.outputPartitioning
override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+ // `ColumnarToRowExec` processes the input RDD directly, which is kind of a
leaf node in the
+ // codegen stage and needs to do the limit check.
+ protected override def canCheckLimitNotReached: Boolean = true
+
override lazy val metrics: Map[String, SQLMetric] = Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output
rows"),
- "numInputBatches" -> SQLMetrics.createMetric(sparkContext, "number of
input batches"),
- "scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time")
+ "numInputBatches" -> SQLMetrics.createMetric(sparkContext, "number of
input batches")
)
override def doExecute(): RDD[InternalRow] = {
val numOutputRows = longMetric("numOutputRows")
val numInputBatches = longMetric("numInputBatches")
- val scanTime = longMetric("scanTime")
- // UnsafeProjection is not serializable so do it on the executor side,
which is why it is lazy
- @transient lazy val outputProject = UnsafeProjection.create(output, output)
- val batches = child.executeColumnar()
- batches.flatMap(batch => {
- val batchStartNs = System.nanoTime()
- numInputBatches += 1
- // In order to match the numOutputRows metric in the generated code we
update
- // numOutputRows for each batch. This is less accurate than doing it at
output
- // because it will over count the number of rows output in the case of a
limit,
- // but it is more efficient.
- numOutputRows += batch.numRows()
- val ret = batch.rowIterator().asScala
- scanTime += ((System.nanoTime() - batchStartNs) / (1000 * 1000))
- ret.map(outputProject)
- })
+ // This avoids calling `output` in the RDD closure, so that we don't need
to include the entire
+ // plan (this) in the closure.
+ val localOutput = this.output
Review comment:
+1
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]