Github user liancheng commented on a diff in the pull request:
https://github.com/apache/spark/pull/233#discussion_r10984569
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala ---
@@ -48,37 +49,59 @@ case class Filter(condition: Expression, child:
SparkPlan) extends UnaryNode {
case class Sample(fraction: Double, withReplacement: Boolean, seed: Int,
child: SparkPlan)
extends UnaryNode {
- def output = child.output
+ override def output = child.output
// TODO: How to pick seed?
- def execute() = child.execute().sample(withReplacement, fraction, seed)
+ override def execute() = child.execute().sample(withReplacement,
fraction, seed)
}
case class Union(children: Seq[SparkPlan])(@transient sc: SparkContext)
extends SparkPlan {
// TODO: attributes output by union should be distinct for nullability
purposes
- def output = children.head.output
- def execute() = sc.union(children.map(_.execute()))
+ override def output = children.head.output
+ override def execute() = sc.union(children.map(_.execute()))
override def otherCopyArgs = sc :: Nil
}
-case class StopAfter(limit: Int, child: SparkPlan)(@transient sc:
SparkContext) extends UnaryNode {
+/**
+ * Take the first limit elements. Note that the implementation is
different depending on whether
+ * this is a terminal operator or not. If it is terminal and is invoked
using executeCollect,
+ * this operator uses Spark's take method on the Spark driver. If it is
not terminal or is
+ * invoked using execute, we first take the limit on each partition, and
then repartition all the
+ * data to a single partition to compute the global limit.
+ */
+case class Limit(limit: Int, child: SparkPlan)(@transient sc:
SparkContext) extends UnaryNode {
+ // TODO: Implement a partition local limit, and use a strategy to
generate the proper limit plan:
+ // partition local limit -> exchange into one partition -> partition
local limit again
+
override def otherCopyArgs = sc :: Nil
- def output = child.output
+ override def output = child.output
override def executeCollect() = child.execute().map(_.copy()).take(limit)
- // TODO: Terminal split should be implemented differently from
non-terminal split.
- // TODO: Pick num splits based on |limit|.
- def execute() = sc.makeRDD(executeCollect(), 1)
+ override def execute() = {
+ val rdd = child.execute().mapPartitions { iter =>
+ val mutablePair = new MutablePair[Boolean, Row]()
+ iter.take(limit).map(row => mutablePair.update(false, row))
+ }
+ val part = new HashPartitioner(1)
+ val shuffled = new ShuffledRDD[Boolean, Row, MutablePair[Boolean,
Row]](rdd, part)
+ shuffled.setSerializer(new SparkSqlSerializer(new SparkConf(false)))
--- End diff --
This line causes failure of the `udf_map` test case. Also tried
`JavaSerializer` and `KryoSerializer`, neither helps... But since this one
fails, similar logic in `Execute.execute()` should also fail.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---