[jira] [Commented] (SPARK-22211) LimitPushDown optimization for FullOuterJoin generates wrong results
[ https://issues.apache.org/jira/browse/SPARK-22211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245063#comment-16245063 ] Apache Spark commented on SPARK-22211: -- User 'henryr' has created a pull request for this issue: https://github.com/apache/spark/pull/19701 > LimitPushDown optimization for FullOuterJoin generates wrong results > > > Key: SPARK-22211 > URL: https://issues.apache.org/jira/browse/SPARK-22211 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0 > Environment: on community.cloude.databrick.com > Runtime Version 3.2 (includes Apache Spark 2.2.0, Scala 2.11) >Reporter: Benyi Wang >Assignee: Henry Robinson > Fix For: 2.2.1, 2.3.0 > > > LimitPushDown pushes LocalLimit to one side for FullOuterJoin, but this may > generate a wrong result: > Assume we use limit(1) and LocalLimit will be pushed to left side, and id=999 > is selected, but at right side we have 100K rows including 999, the result > will be > - one row is (999, 999) > - the rest rows are (null, xxx) > Once you call show(), the row (999,999) has only 1/10th chance to be > selected by CollectLimit. > The actual optimization might be, > - push down limit > - but convert the join to Broadcast LeftOuterJoin or RightOuterJoin. > Here is my notebook: > https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/349451637617406/2750346983121008/656075277290/latest.html > {code:java} > import scala.util.Random._ > val dl = shuffle(1 to 10).toDF("id") > val dr = shuffle(1 to 10).toDF("id") > println("data frame dl:") > dl.explain > println("data frame dr:") > dr.explain > val j = dl.join(dr, dl("id") === dr("id"), "outer").limit(1) > j.explain > j.show(false) > {code} > {code} > data frame dl: > == Physical Plan == > LocalTableScan [id#10] > data frame dr: > == Physical Plan == > LocalTableScan [id#16] > == Physical Plan == > CollectLimit 1 > +- SortMergeJoin [id#10], [id#16], FullOuter >:- *Sort [id#10 ASC NULLS FIRST], false, 0 >: +- Exchange hashpartitioning(id#10, 200) >: +- *LocalLimit 1 >:+- LocalTableScan [id#10] >+- *Sort [id#16 ASC NULLS FIRST], false, 0 > +- Exchange hashpartitioning(id#16, 200) > +- LocalTableScan [id#16] > import scala.util.Random._ > dl: org.apache.spark.sql.DataFrame = [id: int] > dr: org.apache.spark.sql.DataFrame = [id: int] > j: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: int, id: int] > ++---+ > |id |id | > ++---+ > |null|148| > ++---+ > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22211) LimitPushDown optimization for FullOuterJoin generates wrong results
[ https://issues.apache.org/jira/browse/SPARK-22211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245029#comment-16245029 ] Dongjoon Hyun commented on SPARK-22211: --- Hi, All. This breaks `branch-2.2`. - SBT: https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-branch-2.2-test-sbt-hadoop-2.7/408/ - MAVEN: https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-branch-2.2-test-maven-hadoop-2.7/423 > LimitPushDown optimization for FullOuterJoin generates wrong results > > > Key: SPARK-22211 > URL: https://issues.apache.org/jira/browse/SPARK-22211 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0 > Environment: on community.cloude.databrick.com > Runtime Version 3.2 (includes Apache Spark 2.2.0, Scala 2.11) >Reporter: Benyi Wang >Assignee: Henry Robinson > Fix For: 2.2.1, 2.3.0 > > > LimitPushDown pushes LocalLimit to one side for FullOuterJoin, but this may > generate a wrong result: > Assume we use limit(1) and LocalLimit will be pushed to left side, and id=999 > is selected, but at right side we have 100K rows including 999, the result > will be > - one row is (999, 999) > - the rest rows are (null, xxx) > Once you call show(), the row (999,999) has only 1/10th chance to be > selected by CollectLimit. > The actual optimization might be, > - push down limit > - but convert the join to Broadcast LeftOuterJoin or RightOuterJoin. > Here is my notebook: > https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/349451637617406/2750346983121008/656075277290/latest.html > {code:java} > import scala.util.Random._ > val dl = shuffle(1 to 10).toDF("id") > val dr = shuffle(1 to 10).toDF("id") > println("data frame dl:") > dl.explain > println("data frame dr:") > dr.explain > val j = dl.join(dr, dl("id") === dr("id"), "outer").limit(1) > j.explain > j.show(false) > {code} > {code} > data frame dl: > == Physical Plan == > LocalTableScan [id#10] > data frame dr: > == Physical Plan == > LocalTableScan [id#16] > == Physical Plan == > CollectLimit 1 > +- SortMergeJoin [id#10], [id#16], FullOuter >:- *Sort [id#10 ASC NULLS FIRST], false, 0 >: +- Exchange hashpartitioning(id#10, 200) >: +- *LocalLimit 1 >:+- LocalTableScan [id#10] >+- *Sort [id#16 ASC NULLS FIRST], false, 0 > +- Exchange hashpartitioning(id#16, 200) > +- LocalTableScan [id#16] > import scala.util.Random._ > dl: org.apache.spark.sql.DataFrame = [id: int] > dr: org.apache.spark.sql.DataFrame = [id: int] > j: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: int, id: int] > ++---+ > |id |id | > ++---+ > |null|148| > ++---+ > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22211) LimitPushDown optimization for FullOuterJoin generates wrong results
[ https://issues.apache.org/jira/browse/SPARK-22211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16238281#comment-16238281 ] Henry Robinson commented on SPARK-22211: Sounds good, thanks both. > LimitPushDown optimization for FullOuterJoin generates wrong results > > > Key: SPARK-22211 > URL: https://issues.apache.org/jira/browse/SPARK-22211 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0 > Environment: on community.cloude.databrick.com > Runtime Version 3.2 (includes Apache Spark 2.2.0, Scala 2.11) >Reporter: Benyi Wang >Priority: Major > > LimitPushDown pushes LocalLimit to one side for FullOuterJoin, but this may > generate a wrong result: > Assume we use limit(1) and LocalLimit will be pushed to left side, and id=999 > is selected, but at right side we have 100K rows including 999, the result > will be > - one row is (999, 999) > - the rest rows are (null, xxx) > Once you call show(), the row (999,999) has only 1/10th chance to be > selected by CollectLimit. > The actual optimization might be, > - push down limit > - but convert the join to Broadcast LeftOuterJoin or RightOuterJoin. > Here is my notebook: > https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/349451637617406/2750346983121008/656075277290/latest.html > {code:java} > import scala.util.Random._ > val dl = shuffle(1 to 10).toDF("id") > val dr = shuffle(1 to 10).toDF("id") > println("data frame dl:") > dl.explain > println("data frame dr:") > dr.explain > val j = dl.join(dr, dl("id") === dr("id"), "outer").limit(1) > j.explain > j.show(false) > {code} > {code} > data frame dl: > == Physical Plan == > LocalTableScan [id#10] > data frame dr: > == Physical Plan == > LocalTableScan [id#16] > == Physical Plan == > CollectLimit 1 > +- SortMergeJoin [id#10], [id#16], FullOuter >:- *Sort [id#10 ASC NULLS FIRST], false, 0 >: +- Exchange hashpartitioning(id#10, 200) >: +- *LocalLimit 1 >:+- LocalTableScan [id#10] >+- *Sort [id#16 ASC NULLS FIRST], false, 0 > +- Exchange hashpartitioning(id#16, 200) > +- LocalTableScan [id#16] > import scala.util.Random._ > dl: org.apache.spark.sql.DataFrame = [id: int] > dr: org.apache.spark.sql.DataFrame = [id: int] > j: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: int, id: int] > ++---+ > |id |id | > ++---+ > |null|148| > ++---+ > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22211) LimitPushDown optimization for FullOuterJoin generates wrong results
[ https://issues.apache.org/jira/browse/SPARK-22211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16237825#comment-16237825 ] Xiao Li commented on SPARK-22211: - We should merge it to the master and the previous releases at first. > LimitPushDown optimization for FullOuterJoin generates wrong results > > > Key: SPARK-22211 > URL: https://issues.apache.org/jira/browse/SPARK-22211 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0 > Environment: on community.cloude.databrick.com > Runtime Version 3.2 (includes Apache Spark 2.2.0, Scala 2.11) >Reporter: Benyi Wang >Priority: Major > > LimitPushDown pushes LocalLimit to one side for FullOuterJoin, but this may > generate a wrong result: > Assume we use limit(1) and LocalLimit will be pushed to left side, and id=999 > is selected, but at right side we have 100K rows including 999, the result > will be > - one row is (999, 999) > - the rest rows are (null, xxx) > Once you call show(), the row (999,999) has only 1/10th chance to be > selected by CollectLimit. > The actual optimization might be, > - push down limit > - but convert the join to Broadcast LeftOuterJoin or RightOuterJoin. > Here is my notebook: > https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/349451637617406/2750346983121008/656075277290/latest.html > {code:java} > import scala.util.Random._ > val dl = shuffle(1 to 10).toDF("id") > val dr = shuffle(1 to 10).toDF("id") > println("data frame dl:") > dl.explain > println("data frame dr:") > dr.explain > val j = dl.join(dr, dl("id") === dr("id"), "outer").limit(1) > j.explain > j.show(false) > {code} > {code} > data frame dl: > == Physical Plan == > LocalTableScan [id#10] > data frame dr: > == Physical Plan == > LocalTableScan [id#16] > == Physical Plan == > CollectLimit 1 > +- SortMergeJoin [id#10], [id#16], FullOuter >:- *Sort [id#10 ASC NULLS FIRST], false, 0 >: +- Exchange hashpartitioning(id#10, 200) >: +- *LocalLimit 1 >:+- LocalTableScan [id#10] >+- *Sort [id#16 ASC NULLS FIRST], false, 0 > +- Exchange hashpartitioning(id#16, 200) > +- LocalTableScan [id#16] > import scala.util.Random._ > dl: org.apache.spark.sql.DataFrame = [id: int] > dr: org.apache.spark.sql.DataFrame = [id: int] > j: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: int, id: int] > ++---+ > |id |id | > ++---+ > |null|148| > ++---+ > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22211) LimitPushDown optimization for FullOuterJoin generates wrong results
[ https://issues.apache.org/jira/browse/SPARK-22211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16237822#comment-16237822 ] Sean Owen commented on SPARK-22211: --- In the name of correctness, still worth disabling for now, and then fixing later right? > LimitPushDown optimization for FullOuterJoin generates wrong results > > > Key: SPARK-22211 > URL: https://issues.apache.org/jira/browse/SPARK-22211 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0 > Environment: on community.cloude.databrick.com > Runtime Version 3.2 (includes Apache Spark 2.2.0, Scala 2.11) >Reporter: Benyi Wang >Priority: Major > > LimitPushDown pushes LocalLimit to one side for FullOuterJoin, but this may > generate a wrong result: > Assume we use limit(1) and LocalLimit will be pushed to left side, and id=999 > is selected, but at right side we have 100K rows including 999, the result > will be > - one row is (999, 999) > - the rest rows are (null, xxx) > Once you call show(), the row (999,999) has only 1/10th chance to be > selected by CollectLimit. > The actual optimization might be, > - push down limit > - but convert the join to Broadcast LeftOuterJoin or RightOuterJoin. > Here is my notebook: > https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/349451637617406/2750346983121008/656075277290/latest.html > {code:java} > import scala.util.Random._ > val dl = shuffle(1 to 10).toDF("id") > val dr = shuffle(1 to 10).toDF("id") > println("data frame dl:") > dl.explain > println("data frame dr:") > dr.explain > val j = dl.join(dr, dl("id") === dr("id"), "outer").limit(1) > j.explain > j.show(false) > {code} > {code} > data frame dl: > == Physical Plan == > LocalTableScan [id#10] > data frame dr: > == Physical Plan == > LocalTableScan [id#16] > == Physical Plan == > CollectLimit 1 > +- SortMergeJoin [id#10], [id#16], FullOuter >:- *Sort [id#10 ASC NULLS FIRST], false, 0 >: +- Exchange hashpartitioning(id#10, 200) >: +- *LocalLimit 1 >:+- LocalTableScan [id#10] >+- *Sort [id#16 ASC NULLS FIRST], false, 0 > +- Exchange hashpartitioning(id#16, 200) > +- LocalTableScan [id#16] > import scala.util.Random._ > dl: org.apache.spark.sql.DataFrame = [id: int] > dr: org.apache.spark.sql.DataFrame = [id: int] > j: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: int, id: int] > ++---+ > |id |id | > ++---+ > |null|148| > ++---+ > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22211) LimitPushDown optimization for FullOuterJoin generates wrong results
[ https://issues.apache.org/jira/browse/SPARK-22211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16237797#comment-16237797 ] Xiao Li commented on SPARK-22211: - The Join operator should be limit aware. Anyway, we can do it later. > LimitPushDown optimization for FullOuterJoin generates wrong results > > > Key: SPARK-22211 > URL: https://issues.apache.org/jira/browse/SPARK-22211 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0 > Environment: on community.cloude.databrick.com > Runtime Version 3.2 (includes Apache Spark 2.2.0, Scala 2.11) >Reporter: Benyi Wang >Priority: Major > > LimitPushDown pushes LocalLimit to one side for FullOuterJoin, but this may > generate a wrong result: > Assume we use limit(1) and LocalLimit will be pushed to left side, and id=999 > is selected, but at right side we have 100K rows including 999, the result > will be > - one row is (999, 999) > - the rest rows are (null, xxx) > Once you call show(), the row (999,999) has only 1/10th chance to be > selected by CollectLimit. > The actual optimization might be, > - push down limit > - but convert the join to Broadcast LeftOuterJoin or RightOuterJoin. > Here is my notebook: > https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/349451637617406/2750346983121008/656075277290/latest.html > {code:java} > import scala.util.Random._ > val dl = shuffle(1 to 10).toDF("id") > val dr = shuffle(1 to 10).toDF("id") > println("data frame dl:") > dl.explain > println("data frame dr:") > dr.explain > val j = dl.join(dr, dl("id") === dr("id"), "outer").limit(1) > j.explain > j.show(false) > {code} > {code} > data frame dl: > == Physical Plan == > LocalTableScan [id#10] > data frame dr: > == Physical Plan == > LocalTableScan [id#16] > == Physical Plan == > CollectLimit 1 > +- SortMergeJoin [id#10], [id#16], FullOuter >:- *Sort [id#10 ASC NULLS FIRST], false, 0 >: +- Exchange hashpartitioning(id#10, 200) >: +- *LocalLimit 1 >:+- LocalTableScan [id#10] >+- *Sort [id#16 ASC NULLS FIRST], false, 0 > +- Exchange hashpartitioning(id#16, 200) > +- LocalTableScan [id#16] > import scala.util.Random._ > dl: org.apache.spark.sql.DataFrame = [id: int] > dr: org.apache.spark.sql.DataFrame = [id: int] > j: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: int, id: int] > ++---+ > |id |id | > ++---+ > |null|148| > ++---+ > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22211) LimitPushDown optimization for FullOuterJoin generates wrong results
[ https://issues.apache.org/jira/browse/SPARK-22211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16237778#comment-16237778 ] Henry Robinson commented on SPARK-22211: [~smilegator] - sounds good! What will your approach be? I wasn't able to see a safe way to push the limit through the join without either a more invasive rewrite or restricting the set of join operators for FOJ. > LimitPushDown optimization for FullOuterJoin generates wrong results > > > Key: SPARK-22211 > URL: https://issues.apache.org/jira/browse/SPARK-22211 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0 > Environment: on community.cloude.databrick.com > Runtime Version 3.2 (includes Apache Spark 2.2.0, Scala 2.11) >Reporter: Benyi Wang >Priority: Major > > LimitPushDown pushes LocalLimit to one side for FullOuterJoin, but this may > generate a wrong result: > Assume we use limit(1) and LocalLimit will be pushed to left side, and id=999 > is selected, but at right side we have 100K rows including 999, the result > will be > - one row is (999, 999) > - the rest rows are (null, xxx) > Once you call show(), the row (999,999) has only 1/10th chance to be > selected by CollectLimit. > The actual optimization might be, > - push down limit > - but convert the join to Broadcast LeftOuterJoin or RightOuterJoin. > Here is my notebook: > https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/349451637617406/2750346983121008/656075277290/latest.html > {code:java} > import scala.util.Random._ > val dl = shuffle(1 to 10).toDF("id") > val dr = shuffle(1 to 10).toDF("id") > println("data frame dl:") > dl.explain > println("data frame dr:") > dr.explain > val j = dl.join(dr, dl("id") === dr("id"), "outer").limit(1) > j.explain > j.show(false) > {code} > {code} > data frame dl: > == Physical Plan == > LocalTableScan [id#10] > data frame dr: > == Physical Plan == > LocalTableScan [id#16] > == Physical Plan == > CollectLimit 1 > +- SortMergeJoin [id#10], [id#16], FullOuter >:- *Sort [id#10 ASC NULLS FIRST], false, 0 >: +- Exchange hashpartitioning(id#10, 200) >: +- *LocalLimit 1 >:+- LocalTableScan [id#10] >+- *Sort [id#16 ASC NULLS FIRST], false, 0 > +- Exchange hashpartitioning(id#16, 200) > +- LocalTableScan [id#16] > import scala.util.Random._ > dl: org.apache.spark.sql.DataFrame = [id: int] > dr: org.apache.spark.sql.DataFrame = [id: int] > j: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: int, id: int] > ++---+ > |id |id | > ++---+ > |null|148| > ++---+ > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22211) LimitPushDown optimization for FullOuterJoin generates wrong results
[ https://issues.apache.org/jira/browse/SPARK-22211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16237133#comment-16237133 ] Xiao Li commented on SPARK-22211: - Will submit a PR based on my previous PR https://github.com/apache/spark/pull/10454 > LimitPushDown optimization for FullOuterJoin generates wrong results > > > Key: SPARK-22211 > URL: https://issues.apache.org/jira/browse/SPARK-22211 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0 > Environment: on community.cloude.databrick.com > Runtime Version 3.2 (includes Apache Spark 2.2.0, Scala 2.11) >Reporter: Benyi Wang >Priority: Major > > LimitPushDown pushes LocalLimit to one side for FullOuterJoin, but this may > generate a wrong result: > Assume we use limit(1) and LocalLimit will be pushed to left side, and id=999 > is selected, but at right side we have 100K rows including 999, the result > will be > - one row is (999, 999) > - the rest rows are (null, xxx) > Once you call show(), the row (999,999) has only 1/10th chance to be > selected by CollectLimit. > The actual optimization might be, > - push down limit > - but convert the join to Broadcast LeftOuterJoin or RightOuterJoin. > Here is my notebook: > https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/349451637617406/2750346983121008/656075277290/latest.html > {code:java} > import scala.util.Random._ > val dl = shuffle(1 to 10).toDF("id") > val dr = shuffle(1 to 10).toDF("id") > println("data frame dl:") > dl.explain > println("data frame dr:") > dr.explain > val j = dl.join(dr, dl("id") === dr("id"), "outer").limit(1) > j.explain > j.show(false) > {code} > {code} > data frame dl: > == Physical Plan == > LocalTableScan [id#10] > data frame dr: > == Physical Plan == > LocalTableScan [id#16] > == Physical Plan == > CollectLimit 1 > +- SortMergeJoin [id#10], [id#16], FullOuter >:- *Sort [id#10 ASC NULLS FIRST], false, 0 >: +- Exchange hashpartitioning(id#10, 200) >: +- *LocalLimit 1 >:+- LocalTableScan [id#10] >+- *Sort [id#16 ASC NULLS FIRST], false, 0 > +- Exchange hashpartitioning(id#16, 200) > +- LocalTableScan [id#16] > import scala.util.Random._ > dl: org.apache.spark.sql.DataFrame = [id: int] > dr: org.apache.spark.sql.DataFrame = [id: int] > j: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: int, id: int] > ++---+ > |id |id | > ++---+ > |null|148| > ++---+ > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22211) LimitPushDown optimization for FullOuterJoin generates wrong results
[ https://issues.apache.org/jira/browse/SPARK-22211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16236861#comment-16236861 ] Apache Spark commented on SPARK-22211: -- User 'henryr' has created a pull request for this issue: https://github.com/apache/spark/pull/19647 > LimitPushDown optimization for FullOuterJoin generates wrong results > > > Key: SPARK-22211 > URL: https://issues.apache.org/jira/browse/SPARK-22211 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0 > Environment: on community.cloude.databrick.com > Runtime Version 3.2 (includes Apache Spark 2.2.0, Scala 2.11) >Reporter: Benyi Wang >Priority: Major > > LimitPushDown pushes LocalLimit to one side for FullOuterJoin, but this may > generate a wrong result: > Assume we use limit(1) and LocalLimit will be pushed to left side, and id=999 > is selected, but at right side we have 100K rows including 999, the result > will be > - one row is (999, 999) > - the rest rows are (null, xxx) > Once you call show(), the row (999,999) has only 1/10th chance to be > selected by CollectLimit. > The actual optimization might be, > - push down limit > - but convert the join to Broadcast LeftOuterJoin or RightOuterJoin. > Here is my notebook: > https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/349451637617406/2750346983121008/656075277290/latest.html > {code:java} > import scala.util.Random._ > val dl = shuffle(1 to 10).toDF("id") > val dr = shuffle(1 to 10).toDF("id") > println("data frame dl:") > dl.explain > println("data frame dr:") > dr.explain > val j = dl.join(dr, dl("id") === dr("id"), "outer").limit(1) > j.explain > j.show(false) > {code} > {code} > data frame dl: > == Physical Plan == > LocalTableScan [id#10] > data frame dr: > == Physical Plan == > LocalTableScan [id#16] > == Physical Plan == > CollectLimit 1 > +- SortMergeJoin [id#10], [id#16], FullOuter >:- *Sort [id#10 ASC NULLS FIRST], false, 0 >: +- Exchange hashpartitioning(id#10, 200) >: +- *LocalLimit 1 >:+- LocalTableScan [id#10] >+- *Sort [id#16 ASC NULLS FIRST], false, 0 > +- Exchange hashpartitioning(id#16, 200) > +- LocalTableScan [id#16] > import scala.util.Random._ > dl: org.apache.spark.sql.DataFrame = [id: int] > dr: org.apache.spark.sql.DataFrame = [id: int] > j: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: int, id: int] > ++---+ > |id |id | > ++---+ > |null|148| > ++---+ > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22211) LimitPushDown optimization for FullOuterJoin generates wrong results
[ https://issues.apache.org/jira/browse/SPARK-22211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16225816#comment-16225816 ] Henry Robinson commented on SPARK-22211: Thinking about it a more, I think the optimization that's currently implemented works as long as a) the limit is pushed to the streaming side of the join and b) the physical join implementation guarantees that it will emit rows that have non-null RHSs from the streaming side before any that have a null RHS. That is: say we've got a build-side of one row, (A,C), and a streaming-side of (A,B). If we do a full outer-join of these two inputs, the result should be some ordering of (A, A), (null, B), (C, null). If we do a FOJ with LIMIT 1 pushed to the streaming side, imagine it returns (B). It's an error if the join operator sees that A on the build-side has no match, and emits that (A, null) before it sees that B has no match and emits (null, B). But if it emits (null, B) first, the limit above it should kick in and no further rows will be emitted. It seems a bit fragile to rely on this behaviour from all join implementations, and it has some implications for other transformations (e.g. it would not be safe to flip the join order for a FOJ with a pushed-down limit - but would be ok for a non-pushed-down one). However, it's also a bit concerning to remove an optimization that's probably a big win for some queries, even if it's incorrect. There are rewrites that would work, e.g.: {{ x.join(y, x('bar') === y('bar'), "outer").limit(10) ==> x.sort.limit(10).join(y.sort.limit(10), x('bar') === y('bar'), "outer").sort.limit(10) }} seems like it would be correct. But for now, how about we disable the push-down optimization in {{LimitPushDown}} and see if there's a need to investigate more complicated optimizations after that? > LimitPushDown optimization for FullOuterJoin generates wrong results > > > Key: SPARK-22211 > URL: https://issues.apache.org/jira/browse/SPARK-22211 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0 > Environment: on community.cloude.databrick.com > Runtime Version 3.2 (includes Apache Spark 2.2.0, Scala 2.11) >Reporter: Benyi Wang > > LimitPushDown pushes LocalLimit to one side for FullOuterJoin, but this may > generate a wrong result: > Assume we use limit(1) and LocalLimit will be pushed to left side, and id=999 > is selected, but at right side we have 100K rows including 999, the result > will be > - one row is (999, 999) > - the rest rows are (null, xxx) > Once you call show(), the row (999,999) has only 1/10th chance to be > selected by CollectLimit. > The actual optimization might be, > - push down limit > - but convert the join to Broadcast LeftOuterJoin or RightOuterJoin. > Here is my notebook: > https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/349451637617406/2750346983121008/656075277290/latest.html > {code:java} > import scala.util.Random._ > val dl = shuffle(1 to 10).toDF("id") > val dr = shuffle(1 to 10).toDF("id") > println("data frame dl:") > dl.explain > println("data frame dr:") > dr.explain > val j = dl.join(dr, dl("id") === dr("id"), "outer").limit(1) > j.explain > j.show(false) > {code} > {code} > data frame dl: > == Physical Plan == > LocalTableScan [id#10] > data frame dr: > == Physical Plan == > LocalTableScan [id#16] > == Physical Plan == > CollectLimit 1 > +- SortMergeJoin [id#10], [id#16], FullOuter >:- *Sort [id#10 ASC NULLS FIRST], false, 0 >: +- Exchange hashpartitioning(id#10, 200) >: +- *LocalLimit 1 >:+- LocalTableScan [id#10] >+- *Sort [id#16 ASC NULLS FIRST], false, 0 > +- Exchange hashpartitioning(id#16, 200) > +- LocalTableScan [id#16] > import scala.util.Random._ > dl: org.apache.spark.sql.DataFrame = [id: int] > dr: org.apache.spark.sql.DataFrame = [id: int] > j: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: int, id: int] > ++---+ > |id |id | > ++---+ > |null|148| > ++---+ > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22211) LimitPushDown optimization for FullOuterJoin generates wrong results
[ https://issues.apache.org/jira/browse/SPARK-22211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16223073#comment-16223073 ] Henry Robinson commented on SPARK-22211: I think the optimization proposed works only for a self-join. If you transform a FOJ into a one-sided outer join in general, you might omit rows from the result that should be there. Assume that the idea is to transform the full outer-join into a right outer-join, with the limit pushed to either the LHS or RHS. One problem comes if the limit is pushed to the RHS, but is larger than the number of rows in the RHS. For example, if you have tables T1 = (1,2) and T2 = (1), consider the following query: {{SELECT * FROM T1 a FULL OUTER JOIN T2 b on a.col = b.col LIMIT 2}}. If the limit is pushed into T2's scan (and the FOJ is changed to a ROJ), the query would emit only the tuple (1,1) - and omit (2,null) which should be included. (If the limit is pushed into T1's scan there are other bugs if the limit is _less_ than the number of rows, by a similar argument). I checked and I don't think Postgres tries to push limits into a FOJ either. Impala doesn't. > LimitPushDown optimization for FullOuterJoin generates wrong results > > > Key: SPARK-22211 > URL: https://issues.apache.org/jira/browse/SPARK-22211 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0 > Environment: on community.cloude.databrick.com > Runtime Version 3.2 (includes Apache Spark 2.2.0, Scala 2.11) >Reporter: Benyi Wang > > LimitPushDown pushes LocalLimit to one side for FullOuterJoin, but this may > generate a wrong result: > Assume we use limit(1) and LocalLimit will be pushed to left side, and id=999 > is selected, but at right side we have 100K rows including 999, the result > will be > - one row is (999, 999) > - the rest rows are (null, xxx) > Once you call show(), the row (999,999) has only 1/10th chance to be > selected by CollectLimit. > The actual optimization might be, > - push down limit > - but convert the join to Broadcast LeftOuterJoin or RightOuterJoin. > Here is my notebook: > https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/349451637617406/2750346983121008/656075277290/latest.html > {code:java} > import scala.util.Random._ > val dl = shuffle(1 to 10).toDF("id") > val dr = shuffle(1 to 10).toDF("id") > println("data frame dl:") > dl.explain > println("data frame dr:") > dr.explain > val j = dl.join(dr, dl("id") === dr("id"), "outer").limit(1) > j.explain > j.show(false) > {code} > {code} > data frame dl: > == Physical Plan == > LocalTableScan [id#10] > data frame dr: > == Physical Plan == > LocalTableScan [id#16] > == Physical Plan == > CollectLimit 1 > +- SortMergeJoin [id#10], [id#16], FullOuter >:- *Sort [id#10 ASC NULLS FIRST], false, 0 >: +- Exchange hashpartitioning(id#10, 200) >: +- *LocalLimit 1 >:+- LocalTableScan [id#10] >+- *Sort [id#16 ASC NULLS FIRST], false, 0 > +- Exchange hashpartitioning(id#16, 200) > +- LocalTableScan [id#16] > import scala.util.Random._ > dl: org.apache.spark.sql.DataFrame = [id: int] > dr: org.apache.spark.sql.DataFrame = [id: int] > j: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: int, id: int] > ++---+ > |id |id | > ++---+ > |null|148| > ++---+ > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22211) LimitPushDown optimization for FullOuterJoin generates wrong results
[ https://issues.apache.org/jira/browse/SPARK-22211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16202863#comment-16202863 ] Takeshi Yamamuro commented on SPARK-22211: -- Aha, I misunderstood. But, I think the case 3 is not acceptable (I know we can get big performance gains though...) because the transformation of relational expressions must not change results. IIUC, in the case 3, the push-down changes the result, right? > LimitPushDown optimization for FullOuterJoin generates wrong results > > > Key: SPARK-22211 > URL: https://issues.apache.org/jira/browse/SPARK-22211 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0 > Environment: on community.cloude.databrick.com > Runtime Version 3.2 (includes Apache Spark 2.2.0, Scala 2.11) >Reporter: Benyi Wang > > LimitPushDown pushes LocalLimit to one side for FullOuterJoin, but this may > generate a wrong result: > Assume we use limit(1) and LocalLimit will be pushed to left side, and id=999 > is selected, but at right side we have 100K rows including 999, the result > will be > - one row is (999, 999) > - the rest rows are (null, xxx) > Once you call show(), the row (999,999) has only 1/10th chance to be > selected by CollectLimit. > The actual optimization might be, > - push down limit > - but convert the join to Broadcast LeftOuterJoin or RightOuterJoin. > Here is my notebook: > https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/349451637617406/2750346983121008/656075277290/latest.html > {code:java} > import scala.util.Random._ > val dl = shuffle(1 to 10).toDF("id") > val dr = shuffle(1 to 10).toDF("id") > println("data frame dl:") > dl.explain > println("data frame dr:") > dr.explain > val j = dl.join(dr, dl("id") === dr("id"), "outer").limit(1) > j.explain > j.show(false) > {code} > {code} > data frame dl: > == Physical Plan == > LocalTableScan [id#10] > data frame dr: > == Physical Plan == > LocalTableScan [id#16] > == Physical Plan == > CollectLimit 1 > +- SortMergeJoin [id#10], [id#16], FullOuter >:- *Sort [id#10 ASC NULLS FIRST], false, 0 >: +- Exchange hashpartitioning(id#10, 200) >: +- *LocalLimit 1 >:+- LocalTableScan [id#10] >+- *Sort [id#16 ASC NULLS FIRST], false, 0 > +- Exchange hashpartitioning(id#16, 200) > +- LocalTableScan [id#16] > import scala.util.Random._ > dl: org.apache.spark.sql.DataFrame = [id: int] > dr: org.apache.spark.sql.DataFrame = [id: int] > j: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: int, id: int] > ++---+ > |id |id | > ++---+ > |null|148| > ++---+ > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22211) LimitPushDown optimization for FullOuterJoin generates wrong results
[ https://issues.apache.org/jira/browse/SPARK-22211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16202278#comment-16202278 ] Benyi Wang commented on SPARK-22211: I think my suggestion solution is correct. || Case || Left join key || Right join key || Full outer join || | 1 | Y | N | {{(left(\*), null)}} | | 2 | Y | Y | {{(left(\*), right(\*))}} | | 3 | N | Y | {{(null, right(\*))}} | | 4 | N | N | Not applied | If LimitPushDown pushes limit to the left side, whatever a limit value is and how big of left side table, you will always select some rows, in other words, the join keys are always exists, and only case 1 and 2 will happen, so it is actually a Left-join instead. It is equivalent to right-join when pushing down to the right side. The only problem of this method is: case 3 has no chance to be shown while pushing down to the left side, and case 1 for the right side. I would say this is not a big issue because we just want to see some samples of the join result, but the benefit is huge. If we want to see left-only or right-only, we might add where clause. > LimitPushDown optimization for FullOuterJoin generates wrong results > > > Key: SPARK-22211 > URL: https://issues.apache.org/jira/browse/SPARK-22211 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0 > Environment: on community.cloude.databrick.com > Runtime Version 3.2 (includes Apache Spark 2.2.0, Scala 2.11) >Reporter: Benyi Wang > > LimitPushDown pushes LocalLimit to one side for FullOuterJoin, but this may > generate a wrong result: > Assume we use limit(1) and LocalLimit will be pushed to left side, and id=999 > is selected, but at right side we have 100K rows including 999, the result > will be > - one row is (999, 999) > - the rest rows are (null, xxx) > Once you call show(), the row (999,999) has only 1/10th chance to be > selected by CollectLimit. > The actual optimization might be, > - push down limit > - but convert the join to Broadcast LeftOuterJoin or RightOuterJoin. > Here is my notebook: > https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/349451637617406/2750346983121008/656075277290/latest.html > {code:java} > import scala.util.Random._ > val dl = shuffle(1 to 10).toDF("id") > val dr = shuffle(1 to 10).toDF("id") > println("data frame dl:") > dl.explain > println("data frame dr:") > dr.explain > val j = dl.join(dr, dl("id") === dr("id"), "outer").limit(1) > j.explain > j.show(false) > {code} > {code} > data frame dl: > == Physical Plan == > LocalTableScan [id#10] > data frame dr: > == Physical Plan == > LocalTableScan [id#16] > == Physical Plan == > CollectLimit 1 > +- SortMergeJoin [id#10], [id#16], FullOuter >:- *Sort [id#10 ASC NULLS FIRST], false, 0 >: +- Exchange hashpartitioning(id#10, 200) >: +- *LocalLimit 1 >:+- LocalTableScan [id#10] >+- *Sort [id#16 ASC NULLS FIRST], false, 0 > +- Exchange hashpartitioning(id#16, 200) > +- LocalTableScan [id#16] > import scala.util.Random._ > dl: org.apache.spark.sql.DataFrame = [id: int] > dr: org.apache.spark.sql.DataFrame = [id: int] > j: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: int, id: int] > ++---+ > |id |id | > ++---+ > |null|148| > ++---+ > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22211) LimitPushDown optimization for FullOuterJoin generates wrong results
[ https://issues.apache.org/jira/browse/SPARK-22211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16194170#comment-16194170 ] Takeshi Yamamuro commented on SPARK-22211: -- Probably, the suggested solution does not work when both-side tables are small and a limit value is high. IMHO one option to solve this is just to stop the pushdown for full-outer joins (because I couldn't find better solutions to solve this smartly...). > LimitPushDown optimization for FullOuterJoin generates wrong results > > > Key: SPARK-22211 > URL: https://issues.apache.org/jira/browse/SPARK-22211 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0 > Environment: on community.cloude.databrick.com > Runtime Version 3.2 (includes Apache Spark 2.2.0, Scala 2.11) >Reporter: Benyi Wang > > LimitPushDown pushes LocalLimit to one side for FullOuterJoin, but this may > generate a wrong result: > Assume we use limit(1) and LocalLimit will be pushed to left side, and id=999 > is selected, but at right side we have 100K rows including 999, the result > will be > - one row is (999, 999) > - the rest rows are (null, xxx) > Once you call show(), the row (999,999) has only 1/10th chance to be > selected by CollectLimit. > The actual optimization might be, > - push down limit > - but convert the join to Broadcast LeftOuterJoin or RightOuterJoin. > Here is my notebook: > https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/349451637617406/2750346983121008/656075277290/latest.html > {code:java} > import scala.util.Random._ > val dl = shuffle(1 to 10).toDF("id") > val dr = shuffle(1 to 10).toDF("id") > println("data frame dl:") > dl.explain > println("data frame dr:") > dr.explain > val j = dl.join(dr, dl("id") === dr("id"), "outer").limit(1) > j.explain > j.show(false) > {code} > {code} > data frame dl: > == Physical Plan == > LocalTableScan [id#10] > data frame dr: > == Physical Plan == > LocalTableScan [id#16] > == Physical Plan == > CollectLimit 1 > +- SortMergeJoin [id#10], [id#16], FullOuter >:- *Sort [id#10 ASC NULLS FIRST], false, 0 >: +- Exchange hashpartitioning(id#10, 200) >: +- *LocalLimit 1 >:+- LocalTableScan [id#10] >+- *Sort [id#16 ASC NULLS FIRST], false, 0 > +- Exchange hashpartitioning(id#16, 200) > +- LocalTableScan [id#16] > import scala.util.Random._ > dl: org.apache.spark.sql.DataFrame = [id: int] > dr: org.apache.spark.sql.DataFrame = [id: int] > j: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: int, id: int] > ++---+ > |id |id | > ++---+ > |null|148| > ++---+ > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org