[jira] [Comment Edited] (SPARK-20073) Unexpected Cartesian product when using eqNullSafe in join with a derived table

2017-05-29 Thread Everett Anderson (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16028561#comment-16028561
 ] 

Everett Anderson edited comment on SPARK-20073 at 5/29/17 6:45 PM:
---

[~maropu] Thanks for working on this.

I think having a warning message is great, but I'd like to suggest more that in 
many of these cases we shouldn't be doing a Cartesian product at all.

For example, if we have table A that goes through 2 separate transformations 
fn1(A) and fn2(A) to create tables B and C, respectively, if B and C happen to 
be joined on col1 and col2 that should not produce a Cartesian product join. 
This is especially true if fn1() or fn2() changed the number of rows in A.

I imagine that the reason this is happening is that the Spark libraries aren't 
updating the Column equality check in these cases to say that B[col1] and 
C[col1] are no longer the same as A[col1]. But they should be considered 
different. Is that right?

Would it be hard to instead track that these Columns are no longer the same?


was (Author: everett):
[~maropu] Thanks for working on this.

I think having a warning message is great, but I'd like to suggest more that in 
many of these cases we shouldn't be doing a Cartesian product at all.

For example, if we have table A that goes through 2 separate transformations 
fn1(A) and fn2(A) to create tables B and C, respectively, if B and C happen to 
be joined on col1 and col2 that should not produce a Cartesian product join. 
This is especially true if fn1() or fn2() changed the number of rows in A.

I imagine that the reason this is happening is that the Spark libraries aren't 
updating the Column equality check in these cases to say that B[col1] and 
C[col1] are no longer different from A[col1]. But they should be considered 
different. Is that right?

Would it be hard to instead track that these Columns are no longer the same?

> Unexpected Cartesian product when using eqNullSafe in join with a derived 
> table
> ---
>
> Key: SPARK-20073
> URL: https://issues.apache.org/jira/browse/SPARK-20073
> Project: Spark
>  Issue Type: Bug
>  Components: Optimizer
>Affects Versions: 2.0.2, 2.1.0
>Reporter: Everett Anderson
>  Labels: correctness
>
> It appears that if you try to join tables A and B when B is derived from A 
> and you use the eqNullSafe / <=> operator for the join condition, Spark 
> performs a Cartesian product.
> However, if you perform the join on tables of the same data when they don't 
> have a relationship, the expected non-Cartesian product join occurs.
> {noformat}
> // Create some fake data.
> import org.apache.spark.sql.Row
> import org.apache.spark.sql.Dataset
> import org.apache.spark.sql.types._
> import org.apache.spark.sql.functions
> val peopleRowsRDD = sc.parallelize(Seq(
> Row("Fred", 8, 1),
> Row("Fred", 8, 2),
> Row(null, 10, 3),
> Row(null, 10, 4),
> Row("Amy", 12, 5),
> Row("Amy", 12, 6)))
> 
> val peopleSchema = StructType(Seq(
> StructField("name", StringType, nullable = true),
> StructField("group", IntegerType, nullable = true),
> StructField("data", IntegerType, nullable = true)))
> 
> val people = spark.createDataFrame(peopleRowsRDD, peopleSchema)
> people.createOrReplaceTempView("people")
> scala> people.show
> ++-++
> |name|group|data|
> ++-++
> |Fred|8|   1|
> |Fred|8|   2|
> |null|   10|   3|
> |null|   10|   4|
> | Amy|   12|   5|
> | Amy|   12|   6|
> ++-++
> // Now create a derived table from that table. It doesn't matter much what.
> val variantCounts = spark.sql("select name, count(distinct(name, group, 
> data)) as variant_count from people group by name having variant_count > 1")
> variantCounts.show
> ++-+  
>   
> |name|variant_count|
> ++-+
> |Fred|2|
> |null|2|
> | Amy|2|
> ++-+
> // Now try an inner join using the regular equalTo that drops nulls. This 
> works fine.
> val innerJoinEqualTo = variantCounts.join(people, 
> variantCounts("name").equalTo(people("name")))
> innerJoinEqualTo.show
> ++-++-++  
>   
> |name|variant_count|name|group|data|
> ++-++-++
> |Fred|2|Fred|8|   1|
> |Fred|2|Fred|8|   2|
> | Amy|2| Amy|   12|   5|
> | Amy|2| Amy|   12|   6|
> ++-++-++
> // Okay now lets switch to the <=> operator
> //
> // If you haven't set spark.sql.crossJoin.enabled=true, you'll get an error 
> like
> // "Cartesian joins could be prohibitively expensive and are 

[jira] [Comment Edited] (SPARK-20073) Unexpected Cartesian product when using eqNullSafe in join with a derived table

2017-03-23 Thread Takeshi Yamamuro (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15939822#comment-15939822
 ] 

Takeshi Yamamuro edited comment on SPARK-20073 at 3/24/17 5:16 AM:
---

I think this is the known issue and you need to assign an alias in this case.
This is like;
{code}

scala> sql("""SET spark.sql.crossJoin.enabled = true""")
scala> val origDf = Seq(("a", 0), ("b", 0), (null, 1)).toDF("key", "value")
scala> origDf.createOrReplaceTempView("origTable")
scala> // Add an alias for `key`
scala> val aggDf = sql("""SELECT key AS k, COUNT(value) FROM origTable GROUP BY 
key""")

scala> aggDf.join(origDf, aggDf("k") <=> origDf("key")).show
++++-+  
|   k|count(value)| key|value|
++++-+
|null|   1|null|1|
|   b|   1|   b|0|
|   a|   1|   a|0|
++++-+
{code}


was (Author: maropu):
I think this is the known issue and you need to assign an alias for name in 
`variantCounts`;
This is like;
{code}

scala> sql("""SET spark.sql.crossJoin.enabled = true""")
scala> val origDf = Seq(("a", 0), ("b", 0), (null, 1)).toDF("key", "value")
scala> origDf.createOrReplaceTempView("origTable")
scala> // Add an alias for `key`
scala> val aggDf = sql("""SELECT key AS k, COUNT(value) FROM origTable GROUP BY 
key""")

scala> aggDf.join(origDf, aggDf("k") <=> origDf("key")).show
++++-+  
|   k|count(value)| key|value|
++++-+
|null|   1|null|1|
|   b|   1|   b|0|
|   a|   1|   a|0|
++++-+
{code}

> Unexpected Cartesian product when using eqNullSafe in join with a derived 
> table
> ---
>
> Key: SPARK-20073
> URL: https://issues.apache.org/jira/browse/SPARK-20073
> Project: Spark
>  Issue Type: Bug
>  Components: Optimizer
>Affects Versions: 2.0.2, 2.1.0
>Reporter: Everett Anderson
>  Labels: correctness
>
> It appears that if you try to join tables A and B when B is derived from A 
> and you use the eqNullSafe / <=> operator for the join condition, Spark 
> performs a Cartesian product.
> However, if you perform the join on tables of the same data when they don't 
> have a relationship, the expected non-Cartesian product join occurs.
> {noformat}
> // Create some fake data.
> import org.apache.spark.sql.Row
> import org.apache.spark.sql.Dataset
> import org.apache.spark.sql.types._
> import org.apache.spark.sql.functions
> val peopleRowsRDD = sc.parallelize(Seq(
> Row("Fred", 8, 1),
> Row("Fred", 8, 2),
> Row(null, 10, 3),
> Row(null, 10, 4),
> Row("Amy", 12, 5),
> Row("Amy", 12, 6)))
> 
> val peopleSchema = StructType(Seq(
> StructField("name", StringType, nullable = true),
> StructField("group", IntegerType, nullable = true),
> StructField("data", IntegerType, nullable = true)))
> 
> val people = spark.createDataFrame(peopleRowsRDD, peopleSchema)
> people.createOrReplaceTempView("people")
> scala> people.show
> ++-++
> |name|group|data|
> ++-++
> |Fred|8|   1|
> |Fred|8|   2|
> |null|   10|   3|
> |null|   10|   4|
> | Amy|   12|   5|
> | Amy|   12|   6|
> ++-++
> // Now create a derived table from that table. It doesn't matter much what.
> val variantCounts = spark.sql("select name, count(distinct(name, group, 
> data)) as variant_count from people group by name having variant_count > 1")
> variantCounts.show
> ++-+  
>   
> |name|variant_count|
> ++-+
> |Fred|2|
> |null|2|
> | Amy|2|
> ++-+
> // Now try an inner join using the regular equalTo that drops nulls. This 
> works fine.
> val innerJoinEqualTo = variantCounts.join(people, 
> variantCounts("name").equalTo(people("name")))
> innerJoinEqualTo.show
> ++-++-++  
>   
> |name|variant_count|name|group|data|
> ++-++-++
> |Fred|2|Fred|8|   1|
> |Fred|2|Fred|8|   2|
> | Amy|2| Amy|   12|   5|
> | Amy|2| Amy|   12|   6|
> ++-++-++
> // Okay now lets switch to the <=> operator
> //
> // If you haven't set spark.sql.crossJoin.enabled=true, you'll get an error 
> like
> // "Cartesian joins could be prohibitively expensive and are disabled by 
> default. To explicitly enable them, please set spark.sql.crossJoin.enabled = 
> true;"
> //
> // if you have enabled them, you'll get the table below.
> //
> // 

[jira] [Comment Edited] (SPARK-20073) Unexpected Cartesian product when using eqNullSafe in join with a derived table

2017-03-23 Thread Takeshi Yamamuro (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15939822#comment-15939822
 ] 

Takeshi Yamamuro edited comment on SPARK-20073 at 3/24/17 5:15 AM:
---

I think this is the known issue and you need to assign an alias for name in 
`variantCounts`;
This is like;
{code}

scala> sql("""SET spark.sql.crossJoin.enabled = true""")
scala> val origDf = Seq(("a", 0), ("b", 0), (null, 1)).toDF("key", "value")
scala> origDf.createOrReplaceTempView("origTable")
scala> // Add an alias for `key`
scala> val aggDf = sql("""SELECT key AS k, COUNT(value) FROM origTable GROUP BY 
key""")

scala> aggDf.join(origDf, aggDf("k") <=> origDf("key")).show
++++-+  
|   k|count(value)| key|value|
++++-+
|null|   1|null|1|
|   b|   1|   b|0|
|   a|   1|   a|0|
++++-+
{code}


was (Author: maropu):
I think this is the known issue and you need to assign an alias for name in 
`variantCounts`;
This is like;
{code}

scala> sql("""SET spark.sql.crossJoin.enabled = true""")

scala> val origDf = Seq(("a", 0), ("b", 0), (null, 1)).toDF("key", "value")
scala> origDf.createOrReplaceTempView("origTable")

scala> // Add an alias for `key`
scala> val aggDf = sql("""SELECT key AS k, COUNT(value) FROM origTable GROUP BY 
key""")

scala> aggDf.join(origDf, aggDf("k") <=> origDf("key")).show
++++-+  
|   k|count(value)| key|value|
++++-+
|null|   1|null|1|
|   b|   1|   b|0|
|   a|   1|   a|0|
++++-+
{code}

> Unexpected Cartesian product when using eqNullSafe in join with a derived 
> table
> ---
>
> Key: SPARK-20073
> URL: https://issues.apache.org/jira/browse/SPARK-20073
> Project: Spark
>  Issue Type: Bug
>  Components: Optimizer
>Affects Versions: 2.0.2, 2.1.0
>Reporter: Everett Anderson
>  Labels: correctness
>
> It appears that if you try to join tables A and B when B is derived from A 
> and you use the eqNullSafe / <=> operator for the join condition, Spark 
> performs a Cartesian product.
> However, if you perform the join on tables of the same data when they don't 
> have a relationship, the expected non-Cartesian product join occurs.
> {noformat}
> // Create some fake data.
> import org.apache.spark.sql.Row
> import org.apache.spark.sql.Dataset
> import org.apache.spark.sql.types._
> import org.apache.spark.sql.functions
> val peopleRowsRDD = sc.parallelize(Seq(
> Row("Fred", 8, 1),
> Row("Fred", 8, 2),
> Row(null, 10, 3),
> Row(null, 10, 4),
> Row("Amy", 12, 5),
> Row("Amy", 12, 6)))
> 
> val peopleSchema = StructType(Seq(
> StructField("name", StringType, nullable = true),
> StructField("group", IntegerType, nullable = true),
> StructField("data", IntegerType, nullable = true)))
> 
> val people = spark.createDataFrame(peopleRowsRDD, peopleSchema)
> people.createOrReplaceTempView("people")
> scala> people.show
> ++-++
> |name|group|data|
> ++-++
> |Fred|8|   1|
> |Fred|8|   2|
> |null|   10|   3|
> |null|   10|   4|
> | Amy|   12|   5|
> | Amy|   12|   6|
> ++-++
> // Now create a derived table from that table. It doesn't matter much what.
> val variantCounts = spark.sql("select name, count(distinct(name, group, 
> data)) as variant_count from people group by name having variant_count > 1")
> variantCounts.show
> ++-+  
>   
> |name|variant_count|
> ++-+
> |Fred|2|
> |null|2|
> | Amy|2|
> ++-+
> // Now try an inner join using the regular equalTo that drops nulls. This 
> works fine.
> val innerJoinEqualTo = variantCounts.join(people, 
> variantCounts("name").equalTo(people("name")))
> innerJoinEqualTo.show
> ++-++-++  
>   
> |name|variant_count|name|group|data|
> ++-++-++
> |Fred|2|Fred|8|   1|
> |Fred|2|Fred|8|   2|
> | Amy|2| Amy|   12|   5|
> | Amy|2| Amy|   12|   6|
> ++-++-++
> // Okay now lets switch to the <=> operator
> //
> // If you haven't set spark.sql.crossJoin.enabled=true, you'll get an error 
> like
> // "Cartesian joins could be prohibitively expensive and are disabled by 
> default. To explicitly enable them, please set spark.sql.crossJoin.enabled = 
> true;"
> //
> // if you have enabled them, you'll get the table