hvanhovell commented on a change in pull request #25264: 
[SPARK-28213][SQL][followup] code cleanup and bug fix for columnar execution 
framework
URL: https://github.com/apache/spark/pull/25264#discussion_r309854132
 
 

 ##########
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala
 ##########
 @@ -57,40 +57,38 @@ class ColumnarRule {
  * [[org.apache.spark.sql.execution.python.ArrowEvalPythonExec]] and
  * [[MapPartitionsInRWithArrowExec]]. Eventually this should replace those 
implementations.
  */
-case class ColumnarToRowExec(child: SparkPlan)
-  extends UnaryExecNode with CodegenSupport {
+case class ColumnarToRowExec(child: SparkPlan) extends UnaryExecNode with 
CodegenSupport {
+  assert(child.supportsColumnar)
 
   override def output: Seq[Attribute] = child.output
 
   override def outputPartitioning: Partitioning = child.outputPartitioning
 
   override def outputOrdering: Seq[SortOrder] = child.outputOrdering
 
+  // `ColumnarToRowExec` processes the input RDD directly, which is kind of a 
leaf node in the
+  // codegen stage and needs to do the limit check.
+  protected override def canCheckLimitNotReached: Boolean = true
+
   override lazy val metrics: Map[String, SQLMetric] = Map(
     "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output 
rows"),
-    "numInputBatches" -> SQLMetrics.createMetric(sparkContext, "number of 
input batches"),
-    "scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time")
+    "numInputBatches" -> SQLMetrics.createMetric(sparkContext, "number of 
input batches")
   )
 
   override def doExecute(): RDD[InternalRow] = {
     val numOutputRows = longMetric("numOutputRows")
     val numInputBatches = longMetric("numInputBatches")
-    val scanTime = longMetric("scanTime")
-    // UnsafeProjection is not serializable so do it on the executor side, 
which is why it is lazy
-    @transient lazy val outputProject = UnsafeProjection.create(output, output)
-    val batches = child.executeColumnar()
-    batches.flatMap(batch => {
-      val batchStartNs = System.nanoTime()
-      numInputBatches += 1
-      // In order to match the numOutputRows metric in the generated code we 
update
-      // numOutputRows for each batch. This is less accurate than doing it at 
output
-      // because it will over count the number of rows output in the case of a 
limit,
-      // but it is more efficient.
-      numOutputRows += batch.numRows()
-      val ret = batch.rowIterator().asScala
-      scanTime += ((System.nanoTime() - batchStartNs) / (1000 * 1000))
-      ret.map(outputProject)
-    })
+    // This avoids calling `output` in the RDD closure, so that we don't need 
to include the entire
+    // plan (this) in the closure.
+    val localOutput = this.output
 
 Review comment:
   +1

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to