viirya commented on a change in pull request #26809: [SPARK-30185][SQL]
Implement Dataset.tail API
URL: https://github.com/apache/spark/pull/26809#discussion_r355755337
##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
##########
@@ -65,6 +65,50 @@ case class CollectLimitExec(limit: Int, child: SparkPlan)
extends LimitExec {
}
}
+/**
+ * Take the last `limit` elements and collect them to a single partition.
+ *
+ * This operator will be used when a logical `Tail` operation is the final
operator in an
+ * logical plan, which happens when the user is collecting results back to the
driver.
+ */
+case class CollectTailExec(limit: Int, child: SparkPlan) extends LimitExec {
+ override def output: Seq[Attribute] = child.output
+ override def outputPartitioning: Partitioning = SinglePartition
+ override def executeCollect(): Array[InternalRow] = child.executeTail(limit)
+ private val serializer: Serializer = new
UnsafeRowSerializer(child.output.size)
+ private lazy val writeMetrics =
+ SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext)
+ private lazy val readMetrics =
+ SQLShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext)
+ override lazy val metrics = readMetrics ++ writeMetrics
+ protected override def doExecute(): RDD[InternalRow] = {
Review comment:
looks like `executeTail` scans partitions from the end until returning last
`n` rows? This `doExecute` looks like different to it?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]