Github user viirya commented on a diff in the pull request:
https://github.com/apache/spark/pull/15596#discussion_r85264043
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
@@ -27,32 +27,14 @@ import
org.apache.spark.sql.execution.exchange.ShuffleExchange
import org.apache.spark.util.Utils
/**
- * Take the first `limit` elements and collect them to a single partition.
- *
- * This operator will be used when a logical `Limit` operation is the
final operator in an
- * logical plan, which happens when the user is collecting results back to
the driver.
- */
-case class CollectLimitExec(limit: Int, child: SparkPlan) extends
UnaryExecNode {
- override def output: Seq[Attribute] = child.output
- override def outputPartitioning: Partitioning = SinglePartition
- override def executeCollect(): Array[InternalRow] =
child.executeTake(limit)
- private val serializer: Serializer = new
UnsafeRowSerializer(child.output.size)
- protected override def doExecute(): RDD[InternalRow] = {
- val locallyLimited =
child.execute().mapPartitionsInternal(_.take(limit))
- val shuffled = new ShuffledRowRDD(
- ShuffleExchange.prepareShuffleDependency(
- locallyLimited, child.output, SinglePartition, serializer))
- shuffled.mapPartitionsInternal(_.take(limit))
- }
-}
-
-/**
* Helper trait which defines methods that are shared by both
* [[LocalLimitExec]] and [[GlobalLimitExec]].
*/
trait BaseLimitExec extends UnaryExecNode with CodegenSupport {
val limit: Int
override def output: Seq[Attribute] = child.output
+ override def executeCollect(): Array[InternalRow] =
child.executeTake(limit)
+ override def executeTake(n: Int): Array[InternalRow] =
child.executeTake(limit)
--- End diff --
Thanks @pwoody! Agreed. But I am thinking not to replace `CollectLimitExec`
with `GlobalLimitExec`. The reason is commented below. Let's wait for
@JoshRosen's response. If we decide to keep `CollectLimitExec`, your change at
#15614 can be applied then.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]