[ 
https://issues.apache.org/jira/browse/SPARK-22211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16225816#comment-16225816
 ] 

Henry Robinson edited comment on SPARK-22211 at 10/30/17 9:59 PM:
------------------------------------------------------------------

Thinking about it a more, I think the optimization that's currently implemented 
works as long as a) the limit is pushed to the streaming side of the join and 
b) the physical join implementation guarantees that it will emit rows that have 
non-null RHSs from the streaming side before any that have a null RHS.

That is: say we've got a build-side of one row, (A,C), and a streaming-side of 
(A,B). If we do a full outer-join of these two inputs, the result should be 
some ordering of (A, A), (null, B), (C, null). If we do a FOJ with LIMIT 1 
pushed to the streaming side, imagine it returns (B). It's an error if the join 
operator sees that A on the build-side has no match, and emits that (A, null) 
before it sees that B has no match and emits (null, B). But if it emits (null, 
B) first, the limit above it should kick in and no further rows will be 
emitted. 

It seems a bit fragile to rely on this behaviour from all join implementations, 
and it has some implications for other transformations (e.g. it would not be 
safe to flip the join order for a FOJ with a pushed-down limit - but would be 
ok for a non-pushed-down one). However, it's also a bit concerning to remove an 
optimization that's probably a big win for some queries, even if it's 
incorrect. There are rewrites that would work, e.g.:

{code}x.join(y, x('bar') === y('bar'), "outer").limit(10) ==> 
x.sort.limit(10).join(y.sort.limit(10), x('bar') === y('bar'), 
"outer").sort.limit(10) {code}

seems like it would be correct. But for now, how about we disable the push-down 
optimization in {{LimitPushDown}} and see if there's a need to investigate more 
complicated optimizations after that?


was (Author: henryr):
Thinking about it a more, I think the optimization that's currently implemented 
works as long as a) the limit is pushed to the streaming side of the join and 
b) the physical join implementation guarantees that it will emit rows that have 
non-null RHSs from the streaming side before any that have a null RHS.

That is: say we've got a build-side of one row, (A,C), and a streaming-side of 
(A,B). If we do a full outer-join of these two inputs, the result should be 
some ordering of (A, A), (null, B), (C, null). If we do a FOJ with LIMIT 1 
pushed to the streaming side, imagine it returns (B). It's an error if the join 
operator sees that A on the build-side has no match, and emits that (A, null) 
before it sees that B has no match and emits (null, B). But if it emits (null, 
B) first, the limit above it should kick in and no further rows will be 
emitted. 

It seems a bit fragile to rely on this behaviour from all join implementations, 
and it has some implications for other transformations (e.g. it would not be 
safe to flip the join order for a FOJ with a pushed-down limit - but would be 
ok for a non-pushed-down one). However, it's also a bit concerning to remove an 
optimization that's probably a big win for some queries, even if it's 
incorrect. There are rewrites that would work, e.g.:

{{ x.join(y, x('bar') === y('bar'), "outer").limit(10) ==> 
x.sort.limit(10).join(y.sort.limit(10), x('bar') === y('bar'), 
"outer").sort.limit(10) }}

seems like it would be correct. But for now, how about we disable the push-down 
optimization in {{LimitPushDown}} and see if there's a need to investigate more 
complicated optimizations after that?

> LimitPushDown optimization for FullOuterJoin generates wrong results
> --------------------------------------------------------------------
>
>                 Key: SPARK-22211
>                 URL: https://issues.apache.org/jira/browse/SPARK-22211
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.2.0
>         Environment: on community.cloude.databrick.com 
> Runtime Version 3.2 (includes Apache Spark 2.2.0, Scala 2.11)
>            Reporter: Benyi Wang
>
> LimitPushDown pushes LocalLimit to one side for FullOuterJoin, but this may 
> generate a wrong result:
> Assume we use limit(1) and LocalLimit will be pushed to left side, and id=999 
> is selected, but at right side we have 100K rows including 999, the result 
> will be
> - one row is (999, 999)
> - the rest rows are (null, xxx)
> Once you call show(), the row (999,999) has only 1/100000th chance to be 
> selected by CollectLimit.
> The actual optimization might be, 
> - push down limit
> - but convert the join to Broadcast LeftOuterJoin or RightOuterJoin.
> Here is my notebook:
> https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/349451637617406/2750346983121008/6888856075277290/latest.html
> {code:java}
> import scala.util.Random._
> val dl = shuffle(1 to 100000).toDF("id")
> val dr = shuffle(1 to 100000).toDF("id")
> println("data frame dl:")
> dl.explain
> println("data frame dr:")
> dr.explain
> val j = dl.join(dr, dl("id") === dr("id"), "outer").limit(1)
> j.explain
> j.show(false)
> {code}
> {code}
> data frame dl:
> == Physical Plan ==
> LocalTableScan [id#10]
> data frame dr:
> == Physical Plan ==
> LocalTableScan [id#16]
> == Physical Plan ==
> CollectLimit 1
> +- SortMergeJoin [id#10], [id#16], FullOuter
>    :- *Sort [id#10 ASC NULLS FIRST], false, 0
>    :  +- Exchange hashpartitioning(id#10, 200)
>    :     +- *LocalLimit 1
>    :        +- LocalTableScan [id#10]
>    +- *Sort [id#16 ASC NULLS FIRST], false, 0
>       +- Exchange hashpartitioning(id#16, 200)
>          +- LocalTableScan [id#16]
> import scala.util.Random._
> dl: org.apache.spark.sql.DataFrame = [id: int]
> dr: org.apache.spark.sql.DataFrame = [id: int]
> j: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: int, id: int]
> +----+---+
> |id  |id |
> +----+---+
> |null|148|
> +----+---+
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to