[jira] [Comment Edited] (SPARK-20073) Unexpected Cartesian product when using eqNullSafe in join with a derived table
[ https://issues.apache.org/jira/browse/SPARK-20073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16028561#comment-16028561 ] Everett Anderson edited comment on SPARK-20073 at 5/29/17 6:45 PM: --- [~maropu] Thanks for working on this. I think having a warning message is great, but I'd like to suggest more that in many of these cases we shouldn't be doing a Cartesian product at all. For example, if we have table A that goes through 2 separate transformations fn1(A) and fn2(A) to create tables B and C, respectively, if B and C happen to be joined on col1 and col2 that should not produce a Cartesian product join. This is especially true if fn1() or fn2() changed the number of rows in A. I imagine that the reason this is happening is that the Spark libraries aren't updating the Column equality check in these cases to say that B[col1] and C[col1] are no longer the same as A[col1]. But they should be considered different. Is that right? Would it be hard to instead track that these Columns are no longer the same? was (Author: everett): [~maropu] Thanks for working on this. I think having a warning message is great, but I'd like to suggest more that in many of these cases we shouldn't be doing a Cartesian product at all. For example, if we have table A that goes through 2 separate transformations fn1(A) and fn2(A) to create tables B and C, respectively, if B and C happen to be joined on col1 and col2 that should not produce a Cartesian product join. This is especially true if fn1() or fn2() changed the number of rows in A. I imagine that the reason this is happening is that the Spark libraries aren't updating the Column equality check in these cases to say that B[col1] and C[col1] are no longer different from A[col1]. But they should be considered different. Is that right? Would it be hard to instead track that these Columns are no longer the same? > Unexpected Cartesian product when using eqNullSafe in join with a derived > table > --- > > Key: SPARK-20073 > URL: https://issues.apache.org/jira/browse/SPARK-20073 > Project: Spark > Issue Type: Bug > Components: Optimizer >Affects Versions: 2.0.2, 2.1.0 >Reporter: Everett Anderson > Labels: correctness > > It appears that if you try to join tables A and B when B is derived from A > and you use the eqNullSafe / <=> operator for the join condition, Spark > performs a Cartesian product. > However, if you perform the join on tables of the same data when they don't > have a relationship, the expected non-Cartesian product join occurs. > {noformat} > // Create some fake data. > import org.apache.spark.sql.Row > import org.apache.spark.sql.Dataset > import org.apache.spark.sql.types._ > import org.apache.spark.sql.functions > val peopleRowsRDD = sc.parallelize(Seq( > Row("Fred", 8, 1), > Row("Fred", 8, 2), > Row(null, 10, 3), > Row(null, 10, 4), > Row("Amy", 12, 5), > Row("Amy", 12, 6))) > > val peopleSchema = StructType(Seq( > StructField("name", StringType, nullable = true), > StructField("group", IntegerType, nullable = true), > StructField("data", IntegerType, nullable = true))) > > val people = spark.createDataFrame(peopleRowsRDD, peopleSchema) > people.createOrReplaceTempView("people") > scala> people.show > ++-++ > |name|group|data| > ++-++ > |Fred|8| 1| > |Fred|8| 2| > |null| 10| 3| > |null| 10| 4| > | Amy| 12| 5| > | Amy| 12| 6| > ++-++ > // Now create a derived table from that table. It doesn't matter much what. > val variantCounts = spark.sql("select name, count(distinct(name, group, > data)) as variant_count from people group by name having variant_count > 1") > variantCounts.show > ++-+ > > |name|variant_count| > ++-+ > |Fred|2| > |null|2| > | Amy|2| > ++-+ > // Now try an inner join using the regular equalTo that drops nulls. This > works fine. > val innerJoinEqualTo = variantCounts.join(people, > variantCounts("name").equalTo(people("name"))) > innerJoinEqualTo.show > ++-++-++ > > |name|variant_count|name|group|data| > ++-++-++ > |Fred|2|Fred|8| 1| > |Fred|2|Fred|8| 2| > | Amy|2| Amy| 12| 5| > | Amy|2| Amy| 12| 6| > ++-++-++ > // Okay now lets switch to the <=> operator > // > // If you haven't set spark.sql.crossJoin.enabled=true, you'll get an error > like > // "Cartesian joins could be prohibitively expensive and are
[jira] [Comment Edited] (SPARK-20073) Unexpected Cartesian product when using eqNullSafe in join with a derived table
[ https://issues.apache.org/jira/browse/SPARK-20073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15939822#comment-15939822 ] Takeshi Yamamuro edited comment on SPARK-20073 at 3/24/17 5:16 AM: --- I think this is the known issue and you need to assign an alias in this case. This is like; {code} scala> sql("""SET spark.sql.crossJoin.enabled = true""") scala> val origDf = Seq(("a", 0), ("b", 0), (null, 1)).toDF("key", "value") scala> origDf.createOrReplaceTempView("origTable") scala> // Add an alias for `key` scala> val aggDf = sql("""SELECT key AS k, COUNT(value) FROM origTable GROUP BY key""") scala> aggDf.join(origDf, aggDf("k") <=> origDf("key")).show ++++-+ | k|count(value)| key|value| ++++-+ |null| 1|null|1| | b| 1| b|0| | a| 1| a|0| ++++-+ {code} was (Author: maropu): I think this is the known issue and you need to assign an alias for name in `variantCounts`; This is like; {code} scala> sql("""SET spark.sql.crossJoin.enabled = true""") scala> val origDf = Seq(("a", 0), ("b", 0), (null, 1)).toDF("key", "value") scala> origDf.createOrReplaceTempView("origTable") scala> // Add an alias for `key` scala> val aggDf = sql("""SELECT key AS k, COUNT(value) FROM origTable GROUP BY key""") scala> aggDf.join(origDf, aggDf("k") <=> origDf("key")).show ++++-+ | k|count(value)| key|value| ++++-+ |null| 1|null|1| | b| 1| b|0| | a| 1| a|0| ++++-+ {code} > Unexpected Cartesian product when using eqNullSafe in join with a derived > table > --- > > Key: SPARK-20073 > URL: https://issues.apache.org/jira/browse/SPARK-20073 > Project: Spark > Issue Type: Bug > Components: Optimizer >Affects Versions: 2.0.2, 2.1.0 >Reporter: Everett Anderson > Labels: correctness > > It appears that if you try to join tables A and B when B is derived from A > and you use the eqNullSafe / <=> operator for the join condition, Spark > performs a Cartesian product. > However, if you perform the join on tables of the same data when they don't > have a relationship, the expected non-Cartesian product join occurs. > {noformat} > // Create some fake data. > import org.apache.spark.sql.Row > import org.apache.spark.sql.Dataset > import org.apache.spark.sql.types._ > import org.apache.spark.sql.functions > val peopleRowsRDD = sc.parallelize(Seq( > Row("Fred", 8, 1), > Row("Fred", 8, 2), > Row(null, 10, 3), > Row(null, 10, 4), > Row("Amy", 12, 5), > Row("Amy", 12, 6))) > > val peopleSchema = StructType(Seq( > StructField("name", StringType, nullable = true), > StructField("group", IntegerType, nullable = true), > StructField("data", IntegerType, nullable = true))) > > val people = spark.createDataFrame(peopleRowsRDD, peopleSchema) > people.createOrReplaceTempView("people") > scala> people.show > ++-++ > |name|group|data| > ++-++ > |Fred|8| 1| > |Fred|8| 2| > |null| 10| 3| > |null| 10| 4| > | Amy| 12| 5| > | Amy| 12| 6| > ++-++ > // Now create a derived table from that table. It doesn't matter much what. > val variantCounts = spark.sql("select name, count(distinct(name, group, > data)) as variant_count from people group by name having variant_count > 1") > variantCounts.show > ++-+ > > |name|variant_count| > ++-+ > |Fred|2| > |null|2| > | Amy|2| > ++-+ > // Now try an inner join using the regular equalTo that drops nulls. This > works fine. > val innerJoinEqualTo = variantCounts.join(people, > variantCounts("name").equalTo(people("name"))) > innerJoinEqualTo.show > ++-++-++ > > |name|variant_count|name|group|data| > ++-++-++ > |Fred|2|Fred|8| 1| > |Fred|2|Fred|8| 2| > | Amy|2| Amy| 12| 5| > | Amy|2| Amy| 12| 6| > ++-++-++ > // Okay now lets switch to the <=> operator > // > // If you haven't set spark.sql.crossJoin.enabled=true, you'll get an error > like > // "Cartesian joins could be prohibitively expensive and are disabled by > default. To explicitly enable them, please set spark.sql.crossJoin.enabled = > true;" > // > // if you have enabled them, you'll get the table below. > // > //
[jira] [Comment Edited] (SPARK-20073) Unexpected Cartesian product when using eqNullSafe in join with a derived table
[ https://issues.apache.org/jira/browse/SPARK-20073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15939822#comment-15939822 ] Takeshi Yamamuro edited comment on SPARK-20073 at 3/24/17 5:15 AM: --- I think this is the known issue and you need to assign an alias for name in `variantCounts`; This is like; {code} scala> sql("""SET spark.sql.crossJoin.enabled = true""") scala> val origDf = Seq(("a", 0), ("b", 0), (null, 1)).toDF("key", "value") scala> origDf.createOrReplaceTempView("origTable") scala> // Add an alias for `key` scala> val aggDf = sql("""SELECT key AS k, COUNT(value) FROM origTable GROUP BY key""") scala> aggDf.join(origDf, aggDf("k") <=> origDf("key")).show ++++-+ | k|count(value)| key|value| ++++-+ |null| 1|null|1| | b| 1| b|0| | a| 1| a|0| ++++-+ {code} was (Author: maropu): I think this is the known issue and you need to assign an alias for name in `variantCounts`; This is like; {code} scala> sql("""SET spark.sql.crossJoin.enabled = true""") scala> val origDf = Seq(("a", 0), ("b", 0), (null, 1)).toDF("key", "value") scala> origDf.createOrReplaceTempView("origTable") scala> // Add an alias for `key` scala> val aggDf = sql("""SELECT key AS k, COUNT(value) FROM origTable GROUP BY key""") scala> aggDf.join(origDf, aggDf("k") <=> origDf("key")).show ++++-+ | k|count(value)| key|value| ++++-+ |null| 1|null|1| | b| 1| b|0| | a| 1| a|0| ++++-+ {code} > Unexpected Cartesian product when using eqNullSafe in join with a derived > table > --- > > Key: SPARK-20073 > URL: https://issues.apache.org/jira/browse/SPARK-20073 > Project: Spark > Issue Type: Bug > Components: Optimizer >Affects Versions: 2.0.2, 2.1.0 >Reporter: Everett Anderson > Labels: correctness > > It appears that if you try to join tables A and B when B is derived from A > and you use the eqNullSafe / <=> operator for the join condition, Spark > performs a Cartesian product. > However, if you perform the join on tables of the same data when they don't > have a relationship, the expected non-Cartesian product join occurs. > {noformat} > // Create some fake data. > import org.apache.spark.sql.Row > import org.apache.spark.sql.Dataset > import org.apache.spark.sql.types._ > import org.apache.spark.sql.functions > val peopleRowsRDD = sc.parallelize(Seq( > Row("Fred", 8, 1), > Row("Fred", 8, 2), > Row(null, 10, 3), > Row(null, 10, 4), > Row("Amy", 12, 5), > Row("Amy", 12, 6))) > > val peopleSchema = StructType(Seq( > StructField("name", StringType, nullable = true), > StructField("group", IntegerType, nullable = true), > StructField("data", IntegerType, nullable = true))) > > val people = spark.createDataFrame(peopleRowsRDD, peopleSchema) > people.createOrReplaceTempView("people") > scala> people.show > ++-++ > |name|group|data| > ++-++ > |Fred|8| 1| > |Fred|8| 2| > |null| 10| 3| > |null| 10| 4| > | Amy| 12| 5| > | Amy| 12| 6| > ++-++ > // Now create a derived table from that table. It doesn't matter much what. > val variantCounts = spark.sql("select name, count(distinct(name, group, > data)) as variant_count from people group by name having variant_count > 1") > variantCounts.show > ++-+ > > |name|variant_count| > ++-+ > |Fred|2| > |null|2| > | Amy|2| > ++-+ > // Now try an inner join using the regular equalTo that drops nulls. This > works fine. > val innerJoinEqualTo = variantCounts.join(people, > variantCounts("name").equalTo(people("name"))) > innerJoinEqualTo.show > ++-++-++ > > |name|variant_count|name|group|data| > ++-++-++ > |Fred|2|Fred|8| 1| > |Fred|2|Fred|8| 2| > | Amy|2| Amy| 12| 5| > | Amy|2| Amy| 12| 6| > ++-++-++ > // Okay now lets switch to the <=> operator > // > // If you haven't set spark.sql.crossJoin.enabled=true, you'll get an error > like > // "Cartesian joins could be prohibitively expensive and are disabled by > default. To explicitly enable them, please set spark.sql.crossJoin.enabled = > true;" > // > // if you have enabled them, you'll get the table