viirya commented on a change in pull request #26809: [SPARK-30185][SQL] 
Implement Dataset.tail API
URL: https://github.com/apache/spark/pull/26809#discussion_r355746293
 
 

 ##########
 File path: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
 ##########
 @@ -65,6 +65,50 @@ case class CollectLimitExec(limit: Int, child: SparkPlan) 
extends LimitExec {
   }
 }
 
+/**
+ * Take the last `limit` elements and collect them to a single partition.
+ *
+ * This operator will be used when a logical `Tail` operation is the final 
operator in an
+ * logical plan, which happens when the user is collecting results back to the 
driver.
+ */
+case class CollectTailExec(limit: Int, child: SparkPlan) extends LimitExec {
+  override def output: Seq[Attribute] = child.output
+  override def outputPartitioning: Partitioning = SinglePartition
+  override def executeCollect(): Array[InternalRow] = child.executeTail(limit)
+  private val serializer: Serializer = new 
UnsafeRowSerializer(child.output.size)
+  private lazy val writeMetrics =
+    SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext)
+  private lazy val readMetrics =
+    SQLShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext)
+  override lazy val metrics = readMetrics ++ writeMetrics
+  protected override def doExecute(): RDD[InternalRow] = {
+    val locallyLimited = child.execute().mapPartitionsInternal { iter =>
+      var last: Seq[InternalRow] = Seq.empty[InternalRow]
+      if (limit > 0) {
+        val slidingIter = iter.sliding(limit)
+        while (slidingIter.hasNext) { last = slidingIter.next() }
+      }
+      last.toIterator
+    }
+    val shuffled = new ShuffledRowRDD(
+      ShuffleExchangeExec.prepareShuffleDependency(
+        locallyLimited,
+        child.output,
+        SinglePartition,
+        serializer,
+        writeMetrics),
+      readMetrics)
+    shuffled.mapPartitionsInternal { iter =>
 
 Review comment:
   hmm, once we shuffle, do we still keep original order? `Tail` is a 
`OrderPreservingUnaryNode`.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to