[jira] [Commented] (SPARK-22211) LimitPushDown optimization for FullOuterJoin generates wrong results

2017-11-08 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245063#comment-16245063
 ] 

Apache Spark commented on SPARK-22211:
--

User 'henryr' has created a pull request for this issue:
https://github.com/apache/spark/pull/19701

> LimitPushDown optimization for FullOuterJoin generates wrong results
> 
>
> Key: SPARK-22211
> URL: https://issues.apache.org/jira/browse/SPARK-22211
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
> Environment: on community.cloude.databrick.com 
> Runtime Version 3.2 (includes Apache Spark 2.2.0, Scala 2.11)
>Reporter: Benyi Wang
>Assignee: Henry Robinson
> Fix For: 2.2.1, 2.3.0
>
>
> LimitPushDown pushes LocalLimit to one side for FullOuterJoin, but this may 
> generate a wrong result:
> Assume we use limit(1) and LocalLimit will be pushed to left side, and id=999 
> is selected, but at right side we have 100K rows including 999, the result 
> will be
> - one row is (999, 999)
> - the rest rows are (null, xxx)
> Once you call show(), the row (999,999) has only 1/10th chance to be 
> selected by CollectLimit.
> The actual optimization might be, 
> - push down limit
> - but convert the join to Broadcast LeftOuterJoin or RightOuterJoin.
> Here is my notebook:
> https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/349451637617406/2750346983121008/656075277290/latest.html
> {code:java}
> import scala.util.Random._
> val dl = shuffle(1 to 10).toDF("id")
> val dr = shuffle(1 to 10).toDF("id")
> println("data frame dl:")
> dl.explain
> println("data frame dr:")
> dr.explain
> val j = dl.join(dr, dl("id") === dr("id"), "outer").limit(1)
> j.explain
> j.show(false)
> {code}
> {code}
> data frame dl:
> == Physical Plan ==
> LocalTableScan [id#10]
> data frame dr:
> == Physical Plan ==
> LocalTableScan [id#16]
> == Physical Plan ==
> CollectLimit 1
> +- SortMergeJoin [id#10], [id#16], FullOuter
>:- *Sort [id#10 ASC NULLS FIRST], false, 0
>:  +- Exchange hashpartitioning(id#10, 200)
>: +- *LocalLimit 1
>:+- LocalTableScan [id#10]
>+- *Sort [id#16 ASC NULLS FIRST], false, 0
>   +- Exchange hashpartitioning(id#16, 200)
>  +- LocalTableScan [id#16]
> import scala.util.Random._
> dl: org.apache.spark.sql.DataFrame = [id: int]
> dr: org.apache.spark.sql.DataFrame = [id: int]
> j: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: int, id: int]
> ++---+
> |id  |id |
> ++---+
> |null|148|
> ++---+
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22211) LimitPushDown optimization for FullOuterJoin generates wrong results

2017-11-08 Thread Dongjoon Hyun (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245029#comment-16245029
 ] 

Dongjoon Hyun commented on SPARK-22211:
---

Hi, All.
This breaks `branch-2.2`.
- SBT: 
https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-branch-2.2-test-sbt-hadoop-2.7/408/
- MAVEN: 
https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-branch-2.2-test-maven-hadoop-2.7/423

> LimitPushDown optimization for FullOuterJoin generates wrong results
> 
>
> Key: SPARK-22211
> URL: https://issues.apache.org/jira/browse/SPARK-22211
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
> Environment: on community.cloude.databrick.com 
> Runtime Version 3.2 (includes Apache Spark 2.2.0, Scala 2.11)
>Reporter: Benyi Wang
>Assignee: Henry Robinson
> Fix For: 2.2.1, 2.3.0
>
>
> LimitPushDown pushes LocalLimit to one side for FullOuterJoin, but this may 
> generate a wrong result:
> Assume we use limit(1) and LocalLimit will be pushed to left side, and id=999 
> is selected, but at right side we have 100K rows including 999, the result 
> will be
> - one row is (999, 999)
> - the rest rows are (null, xxx)
> Once you call show(), the row (999,999) has only 1/10th chance to be 
> selected by CollectLimit.
> The actual optimization might be, 
> - push down limit
> - but convert the join to Broadcast LeftOuterJoin or RightOuterJoin.
> Here is my notebook:
> https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/349451637617406/2750346983121008/656075277290/latest.html
> {code:java}
> import scala.util.Random._
> val dl = shuffle(1 to 10).toDF("id")
> val dr = shuffle(1 to 10).toDF("id")
> println("data frame dl:")
> dl.explain
> println("data frame dr:")
> dr.explain
> val j = dl.join(dr, dl("id") === dr("id"), "outer").limit(1)
> j.explain
> j.show(false)
> {code}
> {code}
> data frame dl:
> == Physical Plan ==
> LocalTableScan [id#10]
> data frame dr:
> == Physical Plan ==
> LocalTableScan [id#16]
> == Physical Plan ==
> CollectLimit 1
> +- SortMergeJoin [id#10], [id#16], FullOuter
>:- *Sort [id#10 ASC NULLS FIRST], false, 0
>:  +- Exchange hashpartitioning(id#10, 200)
>: +- *LocalLimit 1
>:+- LocalTableScan [id#10]
>+- *Sort [id#16 ASC NULLS FIRST], false, 0
>   +- Exchange hashpartitioning(id#16, 200)
>  +- LocalTableScan [id#16]
> import scala.util.Random._
> dl: org.apache.spark.sql.DataFrame = [id: int]
> dr: org.apache.spark.sql.DataFrame = [id: int]
> j: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: int, id: int]
> ++---+
> |id  |id |
> ++---+
> |null|148|
> ++---+
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22211) LimitPushDown optimization for FullOuterJoin generates wrong results

2017-11-03 Thread Henry Robinson (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16238281#comment-16238281
 ] 

Henry Robinson commented on SPARK-22211:


Sounds good, thanks both.

> LimitPushDown optimization for FullOuterJoin generates wrong results
> 
>
> Key: SPARK-22211
> URL: https://issues.apache.org/jira/browse/SPARK-22211
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
> Environment: on community.cloude.databrick.com 
> Runtime Version 3.2 (includes Apache Spark 2.2.0, Scala 2.11)
>Reporter: Benyi Wang
>Priority: Major
>
> LimitPushDown pushes LocalLimit to one side for FullOuterJoin, but this may 
> generate a wrong result:
> Assume we use limit(1) and LocalLimit will be pushed to left side, and id=999 
> is selected, but at right side we have 100K rows including 999, the result 
> will be
> - one row is (999, 999)
> - the rest rows are (null, xxx)
> Once you call show(), the row (999,999) has only 1/10th chance to be 
> selected by CollectLimit.
> The actual optimization might be, 
> - push down limit
> - but convert the join to Broadcast LeftOuterJoin or RightOuterJoin.
> Here is my notebook:
> https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/349451637617406/2750346983121008/656075277290/latest.html
> {code:java}
> import scala.util.Random._
> val dl = shuffle(1 to 10).toDF("id")
> val dr = shuffle(1 to 10).toDF("id")
> println("data frame dl:")
> dl.explain
> println("data frame dr:")
> dr.explain
> val j = dl.join(dr, dl("id") === dr("id"), "outer").limit(1)
> j.explain
> j.show(false)
> {code}
> {code}
> data frame dl:
> == Physical Plan ==
> LocalTableScan [id#10]
> data frame dr:
> == Physical Plan ==
> LocalTableScan [id#16]
> == Physical Plan ==
> CollectLimit 1
> +- SortMergeJoin [id#10], [id#16], FullOuter
>:- *Sort [id#10 ASC NULLS FIRST], false, 0
>:  +- Exchange hashpartitioning(id#10, 200)
>: +- *LocalLimit 1
>:+- LocalTableScan [id#10]
>+- *Sort [id#16 ASC NULLS FIRST], false, 0
>   +- Exchange hashpartitioning(id#16, 200)
>  +- LocalTableScan [id#16]
> import scala.util.Random._
> dl: org.apache.spark.sql.DataFrame = [id: int]
> dr: org.apache.spark.sql.DataFrame = [id: int]
> j: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: int, id: int]
> ++---+
> |id  |id |
> ++---+
> |null|148|
> ++---+
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22211) LimitPushDown optimization for FullOuterJoin generates wrong results

2017-11-03 Thread Xiao Li (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16237825#comment-16237825
 ] 

Xiao Li commented on SPARK-22211:
-

We should merge it to the master and the previous releases at first.

> LimitPushDown optimization for FullOuterJoin generates wrong results
> 
>
> Key: SPARK-22211
> URL: https://issues.apache.org/jira/browse/SPARK-22211
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
> Environment: on community.cloude.databrick.com 
> Runtime Version 3.2 (includes Apache Spark 2.2.0, Scala 2.11)
>Reporter: Benyi Wang
>Priority: Major
>
> LimitPushDown pushes LocalLimit to one side for FullOuterJoin, but this may 
> generate a wrong result:
> Assume we use limit(1) and LocalLimit will be pushed to left side, and id=999 
> is selected, but at right side we have 100K rows including 999, the result 
> will be
> - one row is (999, 999)
> - the rest rows are (null, xxx)
> Once you call show(), the row (999,999) has only 1/10th chance to be 
> selected by CollectLimit.
> The actual optimization might be, 
> - push down limit
> - but convert the join to Broadcast LeftOuterJoin or RightOuterJoin.
> Here is my notebook:
> https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/349451637617406/2750346983121008/656075277290/latest.html
> {code:java}
> import scala.util.Random._
> val dl = shuffle(1 to 10).toDF("id")
> val dr = shuffle(1 to 10).toDF("id")
> println("data frame dl:")
> dl.explain
> println("data frame dr:")
> dr.explain
> val j = dl.join(dr, dl("id") === dr("id"), "outer").limit(1)
> j.explain
> j.show(false)
> {code}
> {code}
> data frame dl:
> == Physical Plan ==
> LocalTableScan [id#10]
> data frame dr:
> == Physical Plan ==
> LocalTableScan [id#16]
> == Physical Plan ==
> CollectLimit 1
> +- SortMergeJoin [id#10], [id#16], FullOuter
>:- *Sort [id#10 ASC NULLS FIRST], false, 0
>:  +- Exchange hashpartitioning(id#10, 200)
>: +- *LocalLimit 1
>:+- LocalTableScan [id#10]
>+- *Sort [id#16 ASC NULLS FIRST], false, 0
>   +- Exchange hashpartitioning(id#16, 200)
>  +- LocalTableScan [id#16]
> import scala.util.Random._
> dl: org.apache.spark.sql.DataFrame = [id: int]
> dr: org.apache.spark.sql.DataFrame = [id: int]
> j: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: int, id: int]
> ++---+
> |id  |id |
> ++---+
> |null|148|
> ++---+
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22211) LimitPushDown optimization for FullOuterJoin generates wrong results

2017-11-03 Thread Sean Owen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16237822#comment-16237822
 ] 

Sean Owen commented on SPARK-22211:
---

In the name of correctness, still worth disabling for now, and then fixing 
later right?

> LimitPushDown optimization for FullOuterJoin generates wrong results
> 
>
> Key: SPARK-22211
> URL: https://issues.apache.org/jira/browse/SPARK-22211
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
> Environment: on community.cloude.databrick.com 
> Runtime Version 3.2 (includes Apache Spark 2.2.0, Scala 2.11)
>Reporter: Benyi Wang
>Priority: Major
>
> LimitPushDown pushes LocalLimit to one side for FullOuterJoin, but this may 
> generate a wrong result:
> Assume we use limit(1) and LocalLimit will be pushed to left side, and id=999 
> is selected, but at right side we have 100K rows including 999, the result 
> will be
> - one row is (999, 999)
> - the rest rows are (null, xxx)
> Once you call show(), the row (999,999) has only 1/10th chance to be 
> selected by CollectLimit.
> The actual optimization might be, 
> - push down limit
> - but convert the join to Broadcast LeftOuterJoin or RightOuterJoin.
> Here is my notebook:
> https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/349451637617406/2750346983121008/656075277290/latest.html
> {code:java}
> import scala.util.Random._
> val dl = shuffle(1 to 10).toDF("id")
> val dr = shuffle(1 to 10).toDF("id")
> println("data frame dl:")
> dl.explain
> println("data frame dr:")
> dr.explain
> val j = dl.join(dr, dl("id") === dr("id"), "outer").limit(1)
> j.explain
> j.show(false)
> {code}
> {code}
> data frame dl:
> == Physical Plan ==
> LocalTableScan [id#10]
> data frame dr:
> == Physical Plan ==
> LocalTableScan [id#16]
> == Physical Plan ==
> CollectLimit 1
> +- SortMergeJoin [id#10], [id#16], FullOuter
>:- *Sort [id#10 ASC NULLS FIRST], false, 0
>:  +- Exchange hashpartitioning(id#10, 200)
>: +- *LocalLimit 1
>:+- LocalTableScan [id#10]
>+- *Sort [id#16 ASC NULLS FIRST], false, 0
>   +- Exchange hashpartitioning(id#16, 200)
>  +- LocalTableScan [id#16]
> import scala.util.Random._
> dl: org.apache.spark.sql.DataFrame = [id: int]
> dr: org.apache.spark.sql.DataFrame = [id: int]
> j: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: int, id: int]
> ++---+
> |id  |id |
> ++---+
> |null|148|
> ++---+
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22211) LimitPushDown optimization for FullOuterJoin generates wrong results

2017-11-03 Thread Xiao Li (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16237797#comment-16237797
 ] 

Xiao Li commented on SPARK-22211:
-

The Join operator should be limit aware. Anyway, we can do it later.

> LimitPushDown optimization for FullOuterJoin generates wrong results
> 
>
> Key: SPARK-22211
> URL: https://issues.apache.org/jira/browse/SPARK-22211
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
> Environment: on community.cloude.databrick.com 
> Runtime Version 3.2 (includes Apache Spark 2.2.0, Scala 2.11)
>Reporter: Benyi Wang
>Priority: Major
>
> LimitPushDown pushes LocalLimit to one side for FullOuterJoin, but this may 
> generate a wrong result:
> Assume we use limit(1) and LocalLimit will be pushed to left side, and id=999 
> is selected, but at right side we have 100K rows including 999, the result 
> will be
> - one row is (999, 999)
> - the rest rows are (null, xxx)
> Once you call show(), the row (999,999) has only 1/10th chance to be 
> selected by CollectLimit.
> The actual optimization might be, 
> - push down limit
> - but convert the join to Broadcast LeftOuterJoin or RightOuterJoin.
> Here is my notebook:
> https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/349451637617406/2750346983121008/656075277290/latest.html
> {code:java}
> import scala.util.Random._
> val dl = shuffle(1 to 10).toDF("id")
> val dr = shuffle(1 to 10).toDF("id")
> println("data frame dl:")
> dl.explain
> println("data frame dr:")
> dr.explain
> val j = dl.join(dr, dl("id") === dr("id"), "outer").limit(1)
> j.explain
> j.show(false)
> {code}
> {code}
> data frame dl:
> == Physical Plan ==
> LocalTableScan [id#10]
> data frame dr:
> == Physical Plan ==
> LocalTableScan [id#16]
> == Physical Plan ==
> CollectLimit 1
> +- SortMergeJoin [id#10], [id#16], FullOuter
>:- *Sort [id#10 ASC NULLS FIRST], false, 0
>:  +- Exchange hashpartitioning(id#10, 200)
>: +- *LocalLimit 1
>:+- LocalTableScan [id#10]
>+- *Sort [id#16 ASC NULLS FIRST], false, 0
>   +- Exchange hashpartitioning(id#16, 200)
>  +- LocalTableScan [id#16]
> import scala.util.Random._
> dl: org.apache.spark.sql.DataFrame = [id: int]
> dr: org.apache.spark.sql.DataFrame = [id: int]
> j: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: int, id: int]
> ++---+
> |id  |id |
> ++---+
> |null|148|
> ++---+
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22211) LimitPushDown optimization for FullOuterJoin generates wrong results

2017-11-03 Thread Henry Robinson (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16237778#comment-16237778
 ] 

Henry Robinson commented on SPARK-22211:


[~smilegator] - sounds good! What will your approach be? I wasn't able to see a 
safe way to push the limit through the join without either a more invasive 
rewrite or restricting the set of join operators for FOJ. 

> LimitPushDown optimization for FullOuterJoin generates wrong results
> 
>
> Key: SPARK-22211
> URL: https://issues.apache.org/jira/browse/SPARK-22211
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
> Environment: on community.cloude.databrick.com 
> Runtime Version 3.2 (includes Apache Spark 2.2.0, Scala 2.11)
>Reporter: Benyi Wang
>Priority: Major
>
> LimitPushDown pushes LocalLimit to one side for FullOuterJoin, but this may 
> generate a wrong result:
> Assume we use limit(1) and LocalLimit will be pushed to left side, and id=999 
> is selected, but at right side we have 100K rows including 999, the result 
> will be
> - one row is (999, 999)
> - the rest rows are (null, xxx)
> Once you call show(), the row (999,999) has only 1/10th chance to be 
> selected by CollectLimit.
> The actual optimization might be, 
> - push down limit
> - but convert the join to Broadcast LeftOuterJoin or RightOuterJoin.
> Here is my notebook:
> https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/349451637617406/2750346983121008/656075277290/latest.html
> {code:java}
> import scala.util.Random._
> val dl = shuffle(1 to 10).toDF("id")
> val dr = shuffle(1 to 10).toDF("id")
> println("data frame dl:")
> dl.explain
> println("data frame dr:")
> dr.explain
> val j = dl.join(dr, dl("id") === dr("id"), "outer").limit(1)
> j.explain
> j.show(false)
> {code}
> {code}
> data frame dl:
> == Physical Plan ==
> LocalTableScan [id#10]
> data frame dr:
> == Physical Plan ==
> LocalTableScan [id#16]
> == Physical Plan ==
> CollectLimit 1
> +- SortMergeJoin [id#10], [id#16], FullOuter
>:- *Sort [id#10 ASC NULLS FIRST], false, 0
>:  +- Exchange hashpartitioning(id#10, 200)
>: +- *LocalLimit 1
>:+- LocalTableScan [id#10]
>+- *Sort [id#16 ASC NULLS FIRST], false, 0
>   +- Exchange hashpartitioning(id#16, 200)
>  +- LocalTableScan [id#16]
> import scala.util.Random._
> dl: org.apache.spark.sql.DataFrame = [id: int]
> dr: org.apache.spark.sql.DataFrame = [id: int]
> j: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: int, id: int]
> ++---+
> |id  |id |
> ++---+
> |null|148|
> ++---+
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22211) LimitPushDown optimization for FullOuterJoin generates wrong results

2017-11-03 Thread Xiao Li (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16237133#comment-16237133
 ] 

Xiao Li commented on SPARK-22211:
-

Will submit a PR based on my previous PR 
https://github.com/apache/spark/pull/10454

> LimitPushDown optimization for FullOuterJoin generates wrong results
> 
>
> Key: SPARK-22211
> URL: https://issues.apache.org/jira/browse/SPARK-22211
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
> Environment: on community.cloude.databrick.com 
> Runtime Version 3.2 (includes Apache Spark 2.2.0, Scala 2.11)
>Reporter: Benyi Wang
>Priority: Major
>
> LimitPushDown pushes LocalLimit to one side for FullOuterJoin, but this may 
> generate a wrong result:
> Assume we use limit(1) and LocalLimit will be pushed to left side, and id=999 
> is selected, but at right side we have 100K rows including 999, the result 
> will be
> - one row is (999, 999)
> - the rest rows are (null, xxx)
> Once you call show(), the row (999,999) has only 1/10th chance to be 
> selected by CollectLimit.
> The actual optimization might be, 
> - push down limit
> - but convert the join to Broadcast LeftOuterJoin or RightOuterJoin.
> Here is my notebook:
> https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/349451637617406/2750346983121008/656075277290/latest.html
> {code:java}
> import scala.util.Random._
> val dl = shuffle(1 to 10).toDF("id")
> val dr = shuffle(1 to 10).toDF("id")
> println("data frame dl:")
> dl.explain
> println("data frame dr:")
> dr.explain
> val j = dl.join(dr, dl("id") === dr("id"), "outer").limit(1)
> j.explain
> j.show(false)
> {code}
> {code}
> data frame dl:
> == Physical Plan ==
> LocalTableScan [id#10]
> data frame dr:
> == Physical Plan ==
> LocalTableScan [id#16]
> == Physical Plan ==
> CollectLimit 1
> +- SortMergeJoin [id#10], [id#16], FullOuter
>:- *Sort [id#10 ASC NULLS FIRST], false, 0
>:  +- Exchange hashpartitioning(id#10, 200)
>: +- *LocalLimit 1
>:+- LocalTableScan [id#10]
>+- *Sort [id#16 ASC NULLS FIRST], false, 0
>   +- Exchange hashpartitioning(id#16, 200)
>  +- LocalTableScan [id#16]
> import scala.util.Random._
> dl: org.apache.spark.sql.DataFrame = [id: int]
> dr: org.apache.spark.sql.DataFrame = [id: int]
> j: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: int, id: int]
> ++---+
> |id  |id |
> ++---+
> |null|148|
> ++---+
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22211) LimitPushDown optimization for FullOuterJoin generates wrong results

2017-11-02 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16236861#comment-16236861
 ] 

Apache Spark commented on SPARK-22211:
--

User 'henryr' has created a pull request for this issue:
https://github.com/apache/spark/pull/19647

> LimitPushDown optimization for FullOuterJoin generates wrong results
> 
>
> Key: SPARK-22211
> URL: https://issues.apache.org/jira/browse/SPARK-22211
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
> Environment: on community.cloude.databrick.com 
> Runtime Version 3.2 (includes Apache Spark 2.2.0, Scala 2.11)
>Reporter: Benyi Wang
>Priority: Major
>
> LimitPushDown pushes LocalLimit to one side for FullOuterJoin, but this may 
> generate a wrong result:
> Assume we use limit(1) and LocalLimit will be pushed to left side, and id=999 
> is selected, but at right side we have 100K rows including 999, the result 
> will be
> - one row is (999, 999)
> - the rest rows are (null, xxx)
> Once you call show(), the row (999,999) has only 1/10th chance to be 
> selected by CollectLimit.
> The actual optimization might be, 
> - push down limit
> - but convert the join to Broadcast LeftOuterJoin or RightOuterJoin.
> Here is my notebook:
> https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/349451637617406/2750346983121008/656075277290/latest.html
> {code:java}
> import scala.util.Random._
> val dl = shuffle(1 to 10).toDF("id")
> val dr = shuffle(1 to 10).toDF("id")
> println("data frame dl:")
> dl.explain
> println("data frame dr:")
> dr.explain
> val j = dl.join(dr, dl("id") === dr("id"), "outer").limit(1)
> j.explain
> j.show(false)
> {code}
> {code}
> data frame dl:
> == Physical Plan ==
> LocalTableScan [id#10]
> data frame dr:
> == Physical Plan ==
> LocalTableScan [id#16]
> == Physical Plan ==
> CollectLimit 1
> +- SortMergeJoin [id#10], [id#16], FullOuter
>:- *Sort [id#10 ASC NULLS FIRST], false, 0
>:  +- Exchange hashpartitioning(id#10, 200)
>: +- *LocalLimit 1
>:+- LocalTableScan [id#10]
>+- *Sort [id#16 ASC NULLS FIRST], false, 0
>   +- Exchange hashpartitioning(id#16, 200)
>  +- LocalTableScan [id#16]
> import scala.util.Random._
> dl: org.apache.spark.sql.DataFrame = [id: int]
> dr: org.apache.spark.sql.DataFrame = [id: int]
> j: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: int, id: int]
> ++---+
> |id  |id |
> ++---+
> |null|148|
> ++---+
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22211) LimitPushDown optimization for FullOuterJoin generates wrong results

2017-10-30 Thread Henry Robinson (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16225816#comment-16225816
 ] 

Henry Robinson commented on SPARK-22211:


Thinking about it a more, I think the optimization that's currently implemented 
works as long as a) the limit is pushed to the streaming side of the join and 
b) the physical join implementation guarantees that it will emit rows that have 
non-null RHSs from the streaming side before any that have a null RHS.

That is: say we've got a build-side of one row, (A,C), and a streaming-side of 
(A,B). If we do a full outer-join of these two inputs, the result should be 
some ordering of (A, A), (null, B), (C, null). If we do a FOJ with LIMIT 1 
pushed to the streaming side, imagine it returns (B). It's an error if the join 
operator sees that A on the build-side has no match, and emits that (A, null) 
before it sees that B has no match and emits (null, B). But if it emits (null, 
B) first, the limit above it should kick in and no further rows will be 
emitted. 

It seems a bit fragile to rely on this behaviour from all join implementations, 
and it has some implications for other transformations (e.g. it would not be 
safe to flip the join order for a FOJ with a pushed-down limit - but would be 
ok for a non-pushed-down one). However, it's also a bit concerning to remove an 
optimization that's probably a big win for some queries, even if it's 
incorrect. There are rewrites that would work, e.g.:

{{ x.join(y, x('bar') === y('bar'), "outer").limit(10) ==> 
x.sort.limit(10).join(y.sort.limit(10), x('bar') === y('bar'), 
"outer").sort.limit(10) }}

seems like it would be correct. But for now, how about we disable the push-down 
optimization in {{LimitPushDown}} and see if there's a need to investigate more 
complicated optimizations after that?

> LimitPushDown optimization for FullOuterJoin generates wrong results
> 
>
> Key: SPARK-22211
> URL: https://issues.apache.org/jira/browse/SPARK-22211
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
> Environment: on community.cloude.databrick.com 
> Runtime Version 3.2 (includes Apache Spark 2.2.0, Scala 2.11)
>Reporter: Benyi Wang
>
> LimitPushDown pushes LocalLimit to one side for FullOuterJoin, but this may 
> generate a wrong result:
> Assume we use limit(1) and LocalLimit will be pushed to left side, and id=999 
> is selected, but at right side we have 100K rows including 999, the result 
> will be
> - one row is (999, 999)
> - the rest rows are (null, xxx)
> Once you call show(), the row (999,999) has only 1/10th chance to be 
> selected by CollectLimit.
> The actual optimization might be, 
> - push down limit
> - but convert the join to Broadcast LeftOuterJoin or RightOuterJoin.
> Here is my notebook:
> https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/349451637617406/2750346983121008/656075277290/latest.html
> {code:java}
> import scala.util.Random._
> val dl = shuffle(1 to 10).toDF("id")
> val dr = shuffle(1 to 10).toDF("id")
> println("data frame dl:")
> dl.explain
> println("data frame dr:")
> dr.explain
> val j = dl.join(dr, dl("id") === dr("id"), "outer").limit(1)
> j.explain
> j.show(false)
> {code}
> {code}
> data frame dl:
> == Physical Plan ==
> LocalTableScan [id#10]
> data frame dr:
> == Physical Plan ==
> LocalTableScan [id#16]
> == Physical Plan ==
> CollectLimit 1
> +- SortMergeJoin [id#10], [id#16], FullOuter
>:- *Sort [id#10 ASC NULLS FIRST], false, 0
>:  +- Exchange hashpartitioning(id#10, 200)
>: +- *LocalLimit 1
>:+- LocalTableScan [id#10]
>+- *Sort [id#16 ASC NULLS FIRST], false, 0
>   +- Exchange hashpartitioning(id#16, 200)
>  +- LocalTableScan [id#16]
> import scala.util.Random._
> dl: org.apache.spark.sql.DataFrame = [id: int]
> dr: org.apache.spark.sql.DataFrame = [id: int]
> j: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: int, id: int]
> ++---+
> |id  |id |
> ++---+
> |null|148|
> ++---+
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22211) LimitPushDown optimization for FullOuterJoin generates wrong results

2017-10-27 Thread Henry Robinson (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16223073#comment-16223073
 ] 

Henry Robinson commented on SPARK-22211:


I think the optimization proposed works only for a self-join. If you transform 
a FOJ into a one-sided outer join in general, you might omit rows from the 
result that should be there. 

Assume that the idea is to transform the full outer-join into a right 
outer-join, with the limit pushed to either the LHS or RHS. One problem comes 
if the limit is pushed to the RHS, but is larger than the number of rows in the 
RHS. For example, if you have tables T1 = (1,2) and T2 = (1), consider the 
following query:

{{SELECT * FROM T1 a FULL OUTER JOIN T2 b on a.col = b.col LIMIT 2}}.

If the limit is pushed into T2's scan (and the FOJ is changed to a ROJ), the 
query would emit only the tuple (1,1) - and omit (2,null) which should be 
included.

(If the limit is pushed into T1's scan there are other bugs if the limit is 
_less_ than the number of rows, by a similar argument).

I checked and I don't think Postgres tries to push limits into a FOJ either. 
Impala doesn't. 


> LimitPushDown optimization for FullOuterJoin generates wrong results
> 
>
> Key: SPARK-22211
> URL: https://issues.apache.org/jira/browse/SPARK-22211
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
> Environment: on community.cloude.databrick.com 
> Runtime Version 3.2 (includes Apache Spark 2.2.0, Scala 2.11)
>Reporter: Benyi Wang
>
> LimitPushDown pushes LocalLimit to one side for FullOuterJoin, but this may 
> generate a wrong result:
> Assume we use limit(1) and LocalLimit will be pushed to left side, and id=999 
> is selected, but at right side we have 100K rows including 999, the result 
> will be
> - one row is (999, 999)
> - the rest rows are (null, xxx)
> Once you call show(), the row (999,999) has only 1/10th chance to be 
> selected by CollectLimit.
> The actual optimization might be, 
> - push down limit
> - but convert the join to Broadcast LeftOuterJoin or RightOuterJoin.
> Here is my notebook:
> https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/349451637617406/2750346983121008/656075277290/latest.html
> {code:java}
> import scala.util.Random._
> val dl = shuffle(1 to 10).toDF("id")
> val dr = shuffle(1 to 10).toDF("id")
> println("data frame dl:")
> dl.explain
> println("data frame dr:")
> dr.explain
> val j = dl.join(dr, dl("id") === dr("id"), "outer").limit(1)
> j.explain
> j.show(false)
> {code}
> {code}
> data frame dl:
> == Physical Plan ==
> LocalTableScan [id#10]
> data frame dr:
> == Physical Plan ==
> LocalTableScan [id#16]
> == Physical Plan ==
> CollectLimit 1
> +- SortMergeJoin [id#10], [id#16], FullOuter
>:- *Sort [id#10 ASC NULLS FIRST], false, 0
>:  +- Exchange hashpartitioning(id#10, 200)
>: +- *LocalLimit 1
>:+- LocalTableScan [id#10]
>+- *Sort [id#16 ASC NULLS FIRST], false, 0
>   +- Exchange hashpartitioning(id#16, 200)
>  +- LocalTableScan [id#16]
> import scala.util.Random._
> dl: org.apache.spark.sql.DataFrame = [id: int]
> dr: org.apache.spark.sql.DataFrame = [id: int]
> j: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: int, id: int]
> ++---+
> |id  |id |
> ++---+
> |null|148|
> ++---+
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22211) LimitPushDown optimization for FullOuterJoin generates wrong results

2017-10-12 Thread Takeshi Yamamuro (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16202863#comment-16202863
 ] 

Takeshi Yamamuro commented on SPARK-22211:
--

Aha, I misunderstood. But, I think the case 3 is not acceptable (I know we can 
get big performance gains though...) because the transformation of relational 
expressions must not change results. IIUC, in the case 3, the push-down changes 
the result, right?

> LimitPushDown optimization for FullOuterJoin generates wrong results
> 
>
> Key: SPARK-22211
> URL: https://issues.apache.org/jira/browse/SPARK-22211
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
> Environment: on community.cloude.databrick.com 
> Runtime Version 3.2 (includes Apache Spark 2.2.0, Scala 2.11)
>Reporter: Benyi Wang
>
> LimitPushDown pushes LocalLimit to one side for FullOuterJoin, but this may 
> generate a wrong result:
> Assume we use limit(1) and LocalLimit will be pushed to left side, and id=999 
> is selected, but at right side we have 100K rows including 999, the result 
> will be
> - one row is (999, 999)
> - the rest rows are (null, xxx)
> Once you call show(), the row (999,999) has only 1/10th chance to be 
> selected by CollectLimit.
> The actual optimization might be, 
> - push down limit
> - but convert the join to Broadcast LeftOuterJoin or RightOuterJoin.
> Here is my notebook:
> https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/349451637617406/2750346983121008/656075277290/latest.html
> {code:java}
> import scala.util.Random._
> val dl = shuffle(1 to 10).toDF("id")
> val dr = shuffle(1 to 10).toDF("id")
> println("data frame dl:")
> dl.explain
> println("data frame dr:")
> dr.explain
> val j = dl.join(dr, dl("id") === dr("id"), "outer").limit(1)
> j.explain
> j.show(false)
> {code}
> {code}
> data frame dl:
> == Physical Plan ==
> LocalTableScan [id#10]
> data frame dr:
> == Physical Plan ==
> LocalTableScan [id#16]
> == Physical Plan ==
> CollectLimit 1
> +- SortMergeJoin [id#10], [id#16], FullOuter
>:- *Sort [id#10 ASC NULLS FIRST], false, 0
>:  +- Exchange hashpartitioning(id#10, 200)
>: +- *LocalLimit 1
>:+- LocalTableScan [id#10]
>+- *Sort [id#16 ASC NULLS FIRST], false, 0
>   +- Exchange hashpartitioning(id#16, 200)
>  +- LocalTableScan [id#16]
> import scala.util.Random._
> dl: org.apache.spark.sql.DataFrame = [id: int]
> dr: org.apache.spark.sql.DataFrame = [id: int]
> j: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: int, id: int]
> ++---+
> |id  |id |
> ++---+
> |null|148|
> ++---+
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22211) LimitPushDown optimization for FullOuterJoin generates wrong results

2017-10-12 Thread Benyi Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16202278#comment-16202278
 ] 

Benyi Wang commented on SPARK-22211:


I think my suggestion solution is correct.

|| Case || Left join key || Right join key || Full outer join ||
| 1 | Y | N | {{(left(\*), null)}} |
| 2 | Y | Y | {{(left(\*), right(\*))}} |
| 3 | N | Y | {{(null, right(\*))}} |
| 4 | N | N | Not applied |

If LimitPushDown pushes limit to the left side, whatever a limit value is and 
how big of left side table, you will always select some rows, in other words, 
the join keys are always exists, and only case 1 and 2 will happen, so it is 
actually a Left-join instead. It is equivalent to right-join when pushing down 
to  the right side.

The only problem of this method is: case 3 has no chance to be shown while 
pushing down to the left side, and case 1 for the right side. I would say this 
is not a big issue because we just want to see some samples of the join result, 
but the benefit is huge. If we want to see left-only or right-only, we might 
add where clause.  

> LimitPushDown optimization for FullOuterJoin generates wrong results
> 
>
> Key: SPARK-22211
> URL: https://issues.apache.org/jira/browse/SPARK-22211
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
> Environment: on community.cloude.databrick.com 
> Runtime Version 3.2 (includes Apache Spark 2.2.0, Scala 2.11)
>Reporter: Benyi Wang
>
> LimitPushDown pushes LocalLimit to one side for FullOuterJoin, but this may 
> generate a wrong result:
> Assume we use limit(1) and LocalLimit will be pushed to left side, and id=999 
> is selected, but at right side we have 100K rows including 999, the result 
> will be
> - one row is (999, 999)
> - the rest rows are (null, xxx)
> Once you call show(), the row (999,999) has only 1/10th chance to be 
> selected by CollectLimit.
> The actual optimization might be, 
> - push down limit
> - but convert the join to Broadcast LeftOuterJoin or RightOuterJoin.
> Here is my notebook:
> https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/349451637617406/2750346983121008/656075277290/latest.html
> {code:java}
> import scala.util.Random._
> val dl = shuffle(1 to 10).toDF("id")
> val dr = shuffle(1 to 10).toDF("id")
> println("data frame dl:")
> dl.explain
> println("data frame dr:")
> dr.explain
> val j = dl.join(dr, dl("id") === dr("id"), "outer").limit(1)
> j.explain
> j.show(false)
> {code}
> {code}
> data frame dl:
> == Physical Plan ==
> LocalTableScan [id#10]
> data frame dr:
> == Physical Plan ==
> LocalTableScan [id#16]
> == Physical Plan ==
> CollectLimit 1
> +- SortMergeJoin [id#10], [id#16], FullOuter
>:- *Sort [id#10 ASC NULLS FIRST], false, 0
>:  +- Exchange hashpartitioning(id#10, 200)
>: +- *LocalLimit 1
>:+- LocalTableScan [id#10]
>+- *Sort [id#16 ASC NULLS FIRST], false, 0
>   +- Exchange hashpartitioning(id#16, 200)
>  +- LocalTableScan [id#16]
> import scala.util.Random._
> dl: org.apache.spark.sql.DataFrame = [id: int]
> dr: org.apache.spark.sql.DataFrame = [id: int]
> j: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: int, id: int]
> ++---+
> |id  |id |
> ++---+
> |null|148|
> ++---+
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22211) LimitPushDown optimization for FullOuterJoin generates wrong results

2017-10-05 Thread Takeshi Yamamuro (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16194170#comment-16194170
 ] 

Takeshi Yamamuro commented on SPARK-22211:
--

Probably, the suggested solution does not work when both-side tables are small 
and a limit value is high. IMHO one option to solve this is just to stop the 
pushdown for full-outer joins (because I couldn't find better solutions to 
solve this smartly...).

> LimitPushDown optimization for FullOuterJoin generates wrong results
> 
>
> Key: SPARK-22211
> URL: https://issues.apache.org/jira/browse/SPARK-22211
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
> Environment: on community.cloude.databrick.com 
> Runtime Version 3.2 (includes Apache Spark 2.2.0, Scala 2.11)
>Reporter: Benyi Wang
>
> LimitPushDown pushes LocalLimit to one side for FullOuterJoin, but this may 
> generate a wrong result:
> Assume we use limit(1) and LocalLimit will be pushed to left side, and id=999 
> is selected, but at right side we have 100K rows including 999, the result 
> will be
> - one row is (999, 999)
> - the rest rows are (null, xxx)
> Once you call show(), the row (999,999) has only 1/10th chance to be 
> selected by CollectLimit.
> The actual optimization might be, 
> - push down limit
> - but convert the join to Broadcast LeftOuterJoin or RightOuterJoin.
> Here is my notebook:
> https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/349451637617406/2750346983121008/656075277290/latest.html
> {code:java}
> import scala.util.Random._
> val dl = shuffle(1 to 10).toDF("id")
> val dr = shuffle(1 to 10).toDF("id")
> println("data frame dl:")
> dl.explain
> println("data frame dr:")
> dr.explain
> val j = dl.join(dr, dl("id") === dr("id"), "outer").limit(1)
> j.explain
> j.show(false)
> {code}
> {code}
> data frame dl:
> == Physical Plan ==
> LocalTableScan [id#10]
> data frame dr:
> == Physical Plan ==
> LocalTableScan [id#16]
> == Physical Plan ==
> CollectLimit 1
> +- SortMergeJoin [id#10], [id#16], FullOuter
>:- *Sort [id#10 ASC NULLS FIRST], false, 0
>:  +- Exchange hashpartitioning(id#10, 200)
>: +- *LocalLimit 1
>:+- LocalTableScan [id#10]
>+- *Sort [id#16 ASC NULLS FIRST], false, 0
>   +- Exchange hashpartitioning(id#16, 200)
>  +- LocalTableScan [id#16]
> import scala.util.Random._
> dl: org.apache.spark.sql.DataFrame = [id: int]
> dr: org.apache.spark.sql.DataFrame = [id: int]
> j: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: int, id: int]
> ++---+
> |id  |id |
> ++---+
> |null|148|
> ++---+
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org