Thanks Reynold.
Not sure why doExecute is not invoked, since CollectLimit does not support
wholeStage
case class CollectLimit(limit: Int, child: SparkPlan) extends UnaryNode {
I will dig further into this.
Zhan Zhang
On Apr 18, 2016, at 10:36 PM, Reynold Xin
Anyway we can verify this easily. I just added a println to each row and
verified that only limit + 1 row was printed after the join and before the
limit.
It'd be great if you do some debugging yourself and see if it is going
through some other code path.
On Mon, Apr 18, 2016 at 10:35 PM,
But doExecute is not called?
On Mon, Apr 18, 2016 at 10:32 PM, Zhan Zhang wrote:
> Hi Reynold,
>
> I just check the code for CollectLimit, there is a shuffle happening to
> collect them in one partition.
>
> protected override def doExecute(): RDD[InternalRow] = {
>
Hi Reynold,
I just check the code for CollectLimit, there is a shuffle happening to collect
them in one partition.
protected override def doExecute(): RDD[InternalRow] = {
val shuffled = new ShuffledRowRDD(
ShuffleExchange.prepareShuffleDependency(
child.execute(), child.output,
Unless I'm really missing something I don't think so. As I said, it goes
through an iterator and after processing each stream side we do a
shouldStop check. The generated code looks like
/* 094 */ protected void processNext() throws java.io.IOException {
/* 095 */ /*** PRODUCE: Project
>From the physical plan, the limit is one level up than the WholeStageCodegen,
>Thus, I don’t think shouldStop would work here. To move it work, the limit has
>to be part of the wholeStageCodeGen.
Correct me if I am wrong.
Thanks.
Zhan Zhang
On Apr 18, 2016, at 11:09 AM, Reynold Xin
While you can't automatically push the limit *through* the join, we could
push it *into* the join (stop processing after generating 10 records). I
believe that is what Rajesh is suggesting.
On Tue, Apr 12, 2016 at 7:46 AM, Herman van Hövell tot Westerflier <
hvanhov...@questtec.nl> wrote:
> I am