[jira] [Commented] (SPARK-17673) Reused Exchange Aggregations Produce Incorrect Results
[ https://issues.apache.org/jira/browse/SPARK-17673?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15531265#comment-15531265 ] Russell Spitzer commented on SPARK-17673: - Looks good on my end {code} scala> min2.union(min1).show() +---+---+ | id|min| +---+---+ | A| 7| | A| 1| +---+---+ scala> min1.union(min2).show() +---+---+ | id|min| +---+---+ | A| 1| | A| 7| +---+---+ {code} > Reused Exchange Aggregations Produce Incorrect Results > -- > > Key: SPARK-17673 > URL: https://issues.apache.org/jira/browse/SPARK-17673 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.0, 2.0.1 >Reporter: Russell Spitzer >Assignee: Eric Liang >Priority: Blocker > Labels: correctness > Fix For: 2.0.1, 2.1.0 > > > https://datastax-oss.atlassian.net/browse/SPARKC-429 > Was brought to my attention where the following code produces incorrect > results > {code} > val data = List(TestData("A", 1, 7)) > val frame = > session.sqlContext.createDataFrame(session.sparkContext.parallelize(data)) > frame.createCassandraTable( > keySpaceName, > table, > partitionKeyColumns = Some(Seq("id"))) > frame > .write > .format("org.apache.spark.sql.cassandra") > .mode(SaveMode.Append) > .options(Map("table" -> table, "keyspace" -> keySpaceName)) > .save() > val loaded = sparkSession.sqlContext > .read > .format("org.apache.spark.sql.cassandra") > .options(Map("table" -> table, "keyspace" -> ks)) > .load() > .select("id", "col1", "col2") > val min1 = loaded.groupBy("id").agg(min("col1").as("min")) > val min2 = loaded.groupBy("id").agg(min("col2").as("min")) > min1.union(min2).show() > /* prints: > +---+---+ > | id|min| > +---+---+ > | A| 1| > | A| 1| > +---+---+ > Should be > | A| 1| > | A| 7| > */ > {code} > I looked into the explain pattern and saw > {code} > Union > :- *HashAggregate(keys=[id#93], functions=[min(col1#94)]) > : +- Exchange hashpartitioning(id#93, 200) > : +- *HashAggregate(keys=[id#93], functions=[partial_min(col1#94)]) > :+- *Scan > org.apache.spark.sql.cassandra.CassandraSourceRelation@7ec20844 > [id#93,col1#94] > +- *HashAggregate(keys=[id#93], functions=[min(col2#95)]) >+- ReusedExchange [id#93, min#153], Exchange hashpartitioning(id#93, 200) > {code} > Which was different than using a parallelized collection as the DF backing. > So I tested the same code with a Parquet backed DF and saw the same results. > {code} > frame.write.parquet("garbagetest") > val parquet = sparkSession.read.parquet("garbagetest").select("id", > "col1", "col2") > println("PDF") > parquetmin1.union(parquetmin2).explain() > parquetmin1.union(parquetmin2).show() > /* prints: > +---+---+ > | id|min| > +---+---+ > | A| 1| > | A| 1| > +---+---+ > */ > {code} > Which leads me to believe there is something wrong with the reused exchange. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17673) Reused Exchange Aggregations Produce Incorrect Results
[ https://issues.apache.org/jira/browse/SPARK-17673?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15530797#comment-15530797 ] Apache Spark commented on SPARK-17673: -- User 'ericl' has created a pull request for this issue: https://github.com/apache/spark/pull/15282 > Reused Exchange Aggregations Produce Incorrect Results > -- > > Key: SPARK-17673 > URL: https://issues.apache.org/jira/browse/SPARK-17673 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.0, 2.0.1 >Reporter: Russell Spitzer >Assignee: Eric Liang >Priority: Blocker > Labels: correctness > Fix For: 2.1.0 > > > https://datastax-oss.atlassian.net/browse/SPARKC-429 > Was brought to my attention where the following code produces incorrect > results > {code} > val data = List(TestData("A", 1, 7)) > val frame = > session.sqlContext.createDataFrame(session.sparkContext.parallelize(data)) > frame.createCassandraTable( > keySpaceName, > table, > partitionKeyColumns = Some(Seq("id"))) > frame > .write > .format("org.apache.spark.sql.cassandra") > .mode(SaveMode.Append) > .options(Map("table" -> table, "keyspace" -> keySpaceName)) > .save() > val loaded = sparkSession.sqlContext > .read > .format("org.apache.spark.sql.cassandra") > .options(Map("table" -> table, "keyspace" -> ks)) > .load() > .select("id", "col1", "col2") > val min1 = loaded.groupBy("id").agg(min("col1").as("min")) > val min2 = loaded.groupBy("id").agg(min("col2").as("min")) > min1.union(min2).show() > /* prints: > +---+---+ > | id|min| > +---+---+ > | A| 1| > | A| 1| > +---+---+ > Should be > | A| 1| > | A| 7| > */ > {code} > I looked into the explain pattern and saw > {code} > Union > :- *HashAggregate(keys=[id#93], functions=[min(col1#94)]) > : +- Exchange hashpartitioning(id#93, 200) > : +- *HashAggregate(keys=[id#93], functions=[partial_min(col1#94)]) > :+- *Scan > org.apache.spark.sql.cassandra.CassandraSourceRelation@7ec20844 > [id#93,col1#94] > +- *HashAggregate(keys=[id#93], functions=[min(col2#95)]) >+- ReusedExchange [id#93, min#153], Exchange hashpartitioning(id#93, 200) > {code} > Which was different than using a parallelized collection as the DF backing. > So I tested the same code with a Parquet backed DF and saw the same results. > {code} > frame.write.parquet("garbagetest") > val parquet = sparkSession.read.parquet("garbagetest").select("id", > "col1", "col2") > println("PDF") > parquetmin1.union(parquetmin2).explain() > parquetmin1.union(parquetmin2).show() > /* prints: > +---+---+ > | id|min| > +---+---+ > | A| 1| > | A| 1| > +---+---+ > */ > {code} > Which leads me to believe there is something wrong with the reused exchange. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17673) Reused Exchange Aggregations Produce Incorrect Results
[ https://issues.apache.org/jira/browse/SPARK-17673?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15530769#comment-15530769 ] Reynold Xin commented on SPARK-17673: - I merged this in master. There was a conflict with branch-2.0. [~ekhliang] can you submit a pr for 2.0? > Reused Exchange Aggregations Produce Incorrect Results > -- > > Key: SPARK-17673 > URL: https://issues.apache.org/jira/browse/SPARK-17673 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.0, 2.0.1 >Reporter: Russell Spitzer >Assignee: Eric Liang >Priority: Blocker > Labels: correctness > Fix For: 2.1.0 > > > https://datastax-oss.atlassian.net/browse/SPARKC-429 > Was brought to my attention where the following code produces incorrect > results > {code} > val data = List(TestData("A", 1, 7)) > val frame = > session.sqlContext.createDataFrame(session.sparkContext.parallelize(data)) > frame.createCassandraTable( > keySpaceName, > table, > partitionKeyColumns = Some(Seq("id"))) > frame > .write > .format("org.apache.spark.sql.cassandra") > .mode(SaveMode.Append) > .options(Map("table" -> table, "keyspace" -> keySpaceName)) > .save() > val loaded = sparkSession.sqlContext > .read > .format("org.apache.spark.sql.cassandra") > .options(Map("table" -> table, "keyspace" -> ks)) > .load() > .select("id", "col1", "col2") > val min1 = loaded.groupBy("id").agg(min("col1").as("min")) > val min2 = loaded.groupBy("id").agg(min("col2").as("min")) > min1.union(min2).show() > /* prints: > +---+---+ > | id|min| > +---+---+ > | A| 1| > | A| 1| > +---+---+ > Should be > | A| 1| > | A| 7| > */ > {code} > I looked into the explain pattern and saw > {code} > Union > :- *HashAggregate(keys=[id#93], functions=[min(col1#94)]) > : +- Exchange hashpartitioning(id#93, 200) > : +- *HashAggregate(keys=[id#93], functions=[partial_min(col1#94)]) > :+- *Scan > org.apache.spark.sql.cassandra.CassandraSourceRelation@7ec20844 > [id#93,col1#94] > +- *HashAggregate(keys=[id#93], functions=[min(col2#95)]) >+- ReusedExchange [id#93, min#153], Exchange hashpartitioning(id#93, 200) > {code} > Which was different than using a parallelized collection as the DF backing. > So I tested the same code with a Parquet backed DF and saw the same results. > {code} > frame.write.parquet("garbagetest") > val parquet = sparkSession.read.parquet("garbagetest").select("id", > "col1", "col2") > println("PDF") > parquetmin1.union(parquetmin2).explain() > parquetmin1.union(parquetmin2).show() > /* prints: > +---+---+ > | id|min| > +---+---+ > | A| 1| > | A| 1| > +---+---+ > */ > {code} > Which leads me to believe there is something wrong with the reused exchange. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17673) Reused Exchange Aggregations Produce Incorrect Results
[ https://issues.apache.org/jira/browse/SPARK-17673?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15528145#comment-15528145 ] Eric Liang commented on SPARK-17673: Russell, could you try applying this patch (wip) to see if it resolves the issue? https://github.com/apache/spark/pull/15273/files It fixes equality comparison for row datasource scans to take into account the output schema of the scan. > Reused Exchange Aggregations Produce Incorrect Results > -- > > Key: SPARK-17673 > URL: https://issues.apache.org/jira/browse/SPARK-17673 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.0, 2.0.1 >Reporter: Russell Spitzer >Priority: Blocker > Labels: correctness > > https://datastax-oss.atlassian.net/browse/SPARKC-429 > Was brought to my attention where the following code produces incorrect > results > {code} > val data = List(TestData("A", 1, 7)) > val frame = > session.sqlContext.createDataFrame(session.sparkContext.parallelize(data)) > frame.createCassandraTable( > keySpaceName, > table, > partitionKeyColumns = Some(Seq("id"))) > frame > .write > .format("org.apache.spark.sql.cassandra") > .mode(SaveMode.Append) > .options(Map("table" -> table, "keyspace" -> keySpaceName)) > .save() > val loaded = sparkSession.sqlContext > .read > .format("org.apache.spark.sql.cassandra") > .options(Map("table" -> table, "keyspace" -> ks)) > .load() > .select("id", "col1", "col2") > val min1 = loaded.groupBy("id").agg(min("col1").as("min")) > val min2 = loaded.groupBy("id").agg(min("col2").as("min")) > min1.union(min2).show() > /* prints: > +---+---+ > | id|min| > +---+---+ > | A| 1| > | A| 1| > +---+---+ > Should be > | A| 1| > | A| 7| > */ > {code} > I looked into the explain pattern and saw > {code} > Union > :- *HashAggregate(keys=[id#93], functions=[min(col1#94)]) > : +- Exchange hashpartitioning(id#93, 200) > : +- *HashAggregate(keys=[id#93], functions=[partial_min(col1#94)]) > :+- *Scan > org.apache.spark.sql.cassandra.CassandraSourceRelation@7ec20844 > [id#93,col1#94] > +- *HashAggregate(keys=[id#93], functions=[min(col2#95)]) >+- ReusedExchange [id#93, min#153], Exchange hashpartitioning(id#93, 200) > {code} > Which was different than using a parallelized collection as the DF backing. > So I tested the same code with a Parquet backed DF and saw the same results. > {code} > frame.write.parquet("garbagetest") > val parquet = sparkSession.read.parquet("garbagetest").select("id", > "col1", "col2") > println("PDF") > parquetmin1.union(parquetmin2).explain() > parquetmin1.union(parquetmin2).show() > /* prints: > +---+---+ > | id|min| > +---+---+ > | A| 1| > | A| 1| > +---+---+ > */ > {code} > Which leads me to believe there is something wrong with the reused exchange. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17673) Reused Exchange Aggregations Produce Incorrect Results
[ https://issues.apache.org/jira/browse/SPARK-17673?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15528142#comment-15528142 ] Apache Spark commented on SPARK-17673: -- User 'ericl' has created a pull request for this issue: https://github.com/apache/spark/pull/15273 > Reused Exchange Aggregations Produce Incorrect Results > -- > > Key: SPARK-17673 > URL: https://issues.apache.org/jira/browse/SPARK-17673 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.0, 2.0.1 >Reporter: Russell Spitzer >Priority: Blocker > Labels: correctness > > https://datastax-oss.atlassian.net/browse/SPARKC-429 > Was brought to my attention where the following code produces incorrect > results > {code} > val data = List(TestData("A", 1, 7)) > val frame = > session.sqlContext.createDataFrame(session.sparkContext.parallelize(data)) > frame.createCassandraTable( > keySpaceName, > table, > partitionKeyColumns = Some(Seq("id"))) > frame > .write > .format("org.apache.spark.sql.cassandra") > .mode(SaveMode.Append) > .options(Map("table" -> table, "keyspace" -> keySpaceName)) > .save() > val loaded = sparkSession.sqlContext > .read > .format("org.apache.spark.sql.cassandra") > .options(Map("table" -> table, "keyspace" -> ks)) > .load() > .select("id", "col1", "col2") > val min1 = loaded.groupBy("id").agg(min("col1").as("min")) > val min2 = loaded.groupBy("id").agg(min("col2").as("min")) > min1.union(min2).show() > /* prints: > +---+---+ > | id|min| > +---+---+ > | A| 1| > | A| 1| > +---+---+ > Should be > | A| 1| > | A| 7| > */ > {code} > I looked into the explain pattern and saw > {code} > Union > :- *HashAggregate(keys=[id#93], functions=[min(col1#94)]) > : +- Exchange hashpartitioning(id#93, 200) > : +- *HashAggregate(keys=[id#93], functions=[partial_min(col1#94)]) > :+- *Scan > org.apache.spark.sql.cassandra.CassandraSourceRelation@7ec20844 > [id#93,col1#94] > +- *HashAggregate(keys=[id#93], functions=[min(col2#95)]) >+- ReusedExchange [id#93, min#153], Exchange hashpartitioning(id#93, 200) > {code} > Which was different than using a parallelized collection as the DF backing. > So I tested the same code with a Parquet backed DF and saw the same results. > {code} > frame.write.parquet("garbagetest") > val parquet = sparkSession.read.parquet("garbagetest").select("id", > "col1", "col2") > println("PDF") > parquetmin1.union(parquetmin2).explain() > parquetmin1.union(parquetmin2).show() > /* prints: > +---+---+ > | id|min| > +---+---+ > | A| 1| > | A| 1| > +---+---+ > */ > {code} > Which leads me to believe there is something wrong with the reused exchange. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17673) Reused Exchange Aggregations Produce Incorrect Results
[ https://issues.apache.org/jira/browse/SPARK-17673?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15527327#comment-15527327 ] Reynold Xin commented on SPARK-17673: - I'm upgrading this to a blocker level issue. > Reused Exchange Aggregations Produce Incorrect Results > -- > > Key: SPARK-17673 > URL: https://issues.apache.org/jira/browse/SPARK-17673 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.0, 2.0.1 >Reporter: Russell Spitzer >Priority: Blocker > Labels: correctness > > https://datastax-oss.atlassian.net/browse/SPARKC-429 > Was brought to my attention where the following code produces incorrect > results > {code} > val data = List(TestData("A", 1, 7)) > val frame = > session.sqlContext.createDataFrame(session.sparkContext.parallelize(data)) > frame.createCassandraTable( > keySpaceName, > table, > partitionKeyColumns = Some(Seq("id"))) > frame > .write > .format("org.apache.spark.sql.cassandra") > .mode(SaveMode.Append) > .options(Map("table" -> table, "keyspace" -> keySpaceName)) > .save() > val loaded = sparkSession.sqlContext > .read > .format("org.apache.spark.sql.cassandra") > .options(Map("table" -> table, "keyspace" -> ks)) > .load() > .select("id", "col1", "col2") > val min1 = loaded.groupBy("id").agg(min("col1").as("min")) > val min2 = loaded.groupBy("id").agg(min("col2").as("min")) > min1.union(min2).show() > /* prints: > +---+---+ > | id|min| > +---+---+ > | A| 1| > | A| 1| > +---+---+ > Should be > | A| 1| > | A| 7| > */ > {code} > I looked into the explain pattern and saw > {code} > Union > :- *HashAggregate(keys=[id#93], functions=[min(col1#94)]) > : +- Exchange hashpartitioning(id#93, 200) > : +- *HashAggregate(keys=[id#93], functions=[partial_min(col1#94)]) > :+- *Scan > org.apache.spark.sql.cassandra.CassandraSourceRelation@7ec20844 > [id#93,col1#94] > +- *HashAggregate(keys=[id#93], functions=[min(col2#95)]) >+- ReusedExchange [id#93, min#153], Exchange hashpartitioning(id#93, 200) > {code} > Which was different than using a parallelized collection as the DF backing. > So I tested the same code with a Parquet backed DF and saw the same results. > {code} > frame.write.parquet("garbagetest") > val parquet = sparkSession.read.parquet("garbagetest").select("id", > "col1", "col2") > println("PDF") > parquetmin1.union(parquetmin2).explain() > parquetmin1.union(parquetmin2).show() > /* prints: > +---+---+ > | id|min| > +---+---+ > | A| 1| > | A| 1| > +---+---+ > */ > {code} > Which leads me to believe there is something wrong with the reused exchange. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17673) Reused Exchange Aggregations Produce Incorrect Results
[ https://issues.apache.org/jira/browse/SPARK-17673?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15527339#comment-15527339 ] Eric Liang commented on SPARK-17673: I'm looking at this now. > Reused Exchange Aggregations Produce Incorrect Results > -- > > Key: SPARK-17673 > URL: https://issues.apache.org/jira/browse/SPARK-17673 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.0, 2.0.1 >Reporter: Russell Spitzer >Priority: Blocker > Labels: correctness > > https://datastax-oss.atlassian.net/browse/SPARKC-429 > Was brought to my attention where the following code produces incorrect > results > {code} > val data = List(TestData("A", 1, 7)) > val frame = > session.sqlContext.createDataFrame(session.sparkContext.parallelize(data)) > frame.createCassandraTable( > keySpaceName, > table, > partitionKeyColumns = Some(Seq("id"))) > frame > .write > .format("org.apache.spark.sql.cassandra") > .mode(SaveMode.Append) > .options(Map("table" -> table, "keyspace" -> keySpaceName)) > .save() > val loaded = sparkSession.sqlContext > .read > .format("org.apache.spark.sql.cassandra") > .options(Map("table" -> table, "keyspace" -> ks)) > .load() > .select("id", "col1", "col2") > val min1 = loaded.groupBy("id").agg(min("col1").as("min")) > val min2 = loaded.groupBy("id").agg(min("col2").as("min")) > min1.union(min2).show() > /* prints: > +---+---+ > | id|min| > +---+---+ > | A| 1| > | A| 1| > +---+---+ > Should be > | A| 1| > | A| 7| > */ > {code} > I looked into the explain pattern and saw > {code} > Union > :- *HashAggregate(keys=[id#93], functions=[min(col1#94)]) > : +- Exchange hashpartitioning(id#93, 200) > : +- *HashAggregate(keys=[id#93], functions=[partial_min(col1#94)]) > :+- *Scan > org.apache.spark.sql.cassandra.CassandraSourceRelation@7ec20844 > [id#93,col1#94] > +- *HashAggregate(keys=[id#93], functions=[min(col2#95)]) >+- ReusedExchange [id#93, min#153], Exchange hashpartitioning(id#93, 200) > {code} > Which was different than using a parallelized collection as the DF backing. > So I tested the same code with a Parquet backed DF and saw the same results. > {code} > frame.write.parquet("garbagetest") > val parquet = sparkSession.read.parquet("garbagetest").select("id", > "col1", "col2") > println("PDF") > parquetmin1.union(parquetmin2).explain() > parquetmin1.union(parquetmin2).show() > /* prints: > +---+---+ > | id|min| > +---+---+ > | A| 1| > | A| 1| > +---+---+ > */ > {code} > Which leads me to believe there is something wrong with the reused exchange. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17673) Reused Exchange Aggregations Produce Incorrect Results
[ https://issues.apache.org/jira/browse/SPARK-17673?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15525101#comment-15525101 ] Russell Spitzer commented on SPARK-17673: - {code}== Parsed Logical Plan == Union :- Aggregate [id#101], [id#101, min(col1#102) AS min#128] : +- Project [id#101, col1#102, col2#103] : +- Relation[id#101,col1#102,col2#103] org.apache.spark.sql.cassandra.CassandraSourceRelation@486a9118 +- Aggregate [id#101], [id#101, min(col2#103) AS min#137] +- Project [id#101, col1#102, col2#103] +- Relation[id#101,col1#102,col2#103] org.apache.spark.sql.cassandra.CassandraSourceRelation@486a9118 == Analyzed Logical Plan == id: string, min: int Union :- Aggregate [id#101], [id#101, min(col1#102) AS min#128] : +- Project [id#101, col1#102, col2#103] : +- Relation[id#101,col1#102,col2#103] org.apache.spark.sql.cassandra.CassandraSourceRelation@486a9118 +- Aggregate [id#101], [id#101, min(col2#103) AS min#137] +- Project [id#101, col1#102, col2#103] +- Relation[id#101,col1#102,col2#103] org.apache.spark.sql.cassandra.CassandraSourceRelation@486a9118 == Optimized Logical Plan == Union :- Aggregate [id#101], [id#101, min(col1#102) AS min#128] : +- Project [id#101, col1#102] : +- Relation[id#101,col1#102,col2#103] org.apache.spark.sql.cassandra.CassandraSourceRelation@486a9118 +- Aggregate [id#101], [id#101, min(col2#103) AS min#137] +- Project [id#101, col2#103] +- Relation[id#101,col1#102,col2#103] org.apache.spark.sql.cassandra.CassandraSourceRelation@486a9118 == Physical Plan == Union :- *HashAggregate(keys=[id#101], functions=[min(col1#102)], output=[id#101, min#128]) : +- Exchange hashpartitioning(id#101, 200) : +- *HashAggregate(keys=[id#101], functions=[partial_min(col1#102)], output=[id#101, min#182]) :+- *Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@486a9118 [id#101,col1#102] +- *HashAggregate(keys=[id#101], functions=[min(col2#103)], output=[id#101, min#137]) +- ReusedExchange [id#101, min#190], Exchange hashpartitioning(id#101, 200) {code} > Reused Exchange Aggregations Produce Incorrect Results > -- > > Key: SPARK-17673 > URL: https://issues.apache.org/jira/browse/SPARK-17673 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.0, 2.0.1 >Reporter: Russell Spitzer >Priority: Critical > Labels: correctness > > https://datastax-oss.atlassian.net/browse/SPARKC-429 > Was brought to my attention where the following code produces incorrect > results > {code} > val data = List(TestData("A", 1, 7)) > val frame = > session.sqlContext.createDataFrame(session.sparkContext.parallelize(data)) > frame.createCassandraTable( > keySpaceName, > table, > partitionKeyColumns = Some(Seq("id"))) > frame > .write > .format("org.apache.spark.sql.cassandra") > .mode(SaveMode.Append) > .options(Map("table" -> table, "keyspace" -> keySpaceName)) > .save() > val loaded = sparkSession.sqlContext > .read > .format("org.apache.spark.sql.cassandra") > .options(Map("table" -> table, "keyspace" -> ks)) > .load() > .select("id", "col1", "col2") > val min1 = loaded.groupBy("id").agg(min("col1").as("min")) > val min2 = loaded.groupBy("id").agg(min("col2").as("min")) > min1.union(min2).show() > /* prints: > +---+---+ > | id|min| > +---+---+ > | A| 1| > | A| 1| > +---+---+ > Should be > | A| 1| > | A| 7| > */ > {code} > I looked into the explain pattern and saw > {code} > Union > :- *HashAggregate(keys=[id#93], functions=[min(col1#94)]) > : +- Exchange hashpartitioning(id#93, 200) > : +- *HashAggregate(keys=[id#93], functions=[partial_min(col1#94)]) > :+- *Scan > org.apache.spark.sql.cassandra.CassandraSourceRelation@7ec20844 > [id#93,col1#94] > +- *HashAggregate(keys=[id#93], functions=[min(col2#95)]) >+- ReusedExchange [id#93, min#153], Exchange hashpartitioning(id#93, 200) > {code} > Which was different than using a parallelized collection as the DF backing. > So I tested the same code with a Parquet backed DF and saw the same results. > {code} > frame.write.parquet("garbagetest") > val parquet = sparkSession.read.parquet("garbagetest").select("id", > "col1", "col2") > println("PDF") > parquetmin1.union(parquetmin2).explain() > parquetmin1.union(parquetmin2).show() > /* prints: > +---+---+ > | id|min| > +---+---+ > | A| 1| > | A| 1| > +---+---+ > */ > {code} > Which leads me to believe there is something wrong with the reused exchange. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To
[jira] [Commented] (SPARK-17673) Reused Exchange Aggregations Produce Incorrect Results
[ https://issues.apache.org/jira/browse/SPARK-17673?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15525096#comment-15525096 ] Russell Spitzer commented on SPARK-17673: - Ah yeah there would definitely be different pruning in both "source"s Getting the optimized plan now > Reused Exchange Aggregations Produce Incorrect Results > -- > > Key: SPARK-17673 > URL: https://issues.apache.org/jira/browse/SPARK-17673 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.0, 2.0.1 >Reporter: Russell Spitzer >Priority: Critical > Labels: correctness > > https://datastax-oss.atlassian.net/browse/SPARKC-429 > Was brought to my attention where the following code produces incorrect > results > {code} > val data = List(TestData("A", 1, 7)) > val frame = > session.sqlContext.createDataFrame(session.sparkContext.parallelize(data)) > frame.createCassandraTable( > keySpaceName, > table, > partitionKeyColumns = Some(Seq("id"))) > frame > .write > .format("org.apache.spark.sql.cassandra") > .mode(SaveMode.Append) > .options(Map("table" -> table, "keyspace" -> keySpaceName)) > .save() > val loaded = sparkSession.sqlContext > .read > .format("org.apache.spark.sql.cassandra") > .options(Map("table" -> table, "keyspace" -> ks)) > .load() > .select("id", "col1", "col2") > val min1 = loaded.groupBy("id").agg(min("col1").as("min")) > val min2 = loaded.groupBy("id").agg(min("col2").as("min")) > min1.union(min2).show() > /* prints: > +---+---+ > | id|min| > +---+---+ > | A| 1| > | A| 1| > +---+---+ > Should be > | A| 1| > | A| 7| > */ > {code} > I looked into the explain pattern and saw > {code} > Union > :- *HashAggregate(keys=[id#93], functions=[min(col1#94)]) > : +- Exchange hashpartitioning(id#93, 200) > : +- *HashAggregate(keys=[id#93], functions=[partial_min(col1#94)]) > :+- *Scan > org.apache.spark.sql.cassandra.CassandraSourceRelation@7ec20844 > [id#93,col1#94] > +- *HashAggregate(keys=[id#93], functions=[min(col2#95)]) >+- ReusedExchange [id#93, min#153], Exchange hashpartitioning(id#93, 200) > {code} > Which was different than using a parallelized collection as the DF backing. > So I tested the same code with a Parquet backed DF and saw the same results. > {code} > frame.write.parquet("garbagetest") > val parquet = sparkSession.read.parquet("garbagetest").select("id", > "col1", "col2") > println("PDF") > parquetmin1.union(parquetmin2).explain() > parquetmin1.union(parquetmin2).show() > /* prints: > +---+---+ > | id|min| > +---+---+ > | A| 1| > | A| 1| > +---+---+ > */ > {code} > Which leads me to believe there is something wrong with the reused exchange. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17673) Reused Exchange Aggregations Produce Incorrect Results
[ https://issues.apache.org/jira/browse/SPARK-17673?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15525036#comment-15525036 ] Herman van Hovell commented on SPARK-17673: --- Could you also share the optimized plan {{df.explain(true)}? I am wondering if the attributes are the same. > Reused Exchange Aggregations Produce Incorrect Results > -- > > Key: SPARK-17673 > URL: https://issues.apache.org/jira/browse/SPARK-17673 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.0, 2.0.1 >Reporter: Russell Spitzer >Priority: Critical > Labels: correctness > > https://datastax-oss.atlassian.net/browse/SPARKC-429 > Was brought to my attention where the following code produces incorrect > results > {code} > val data = List(TestData("A", 1, 7)) > val frame = > session.sqlContext.createDataFrame(session.sparkContext.parallelize(data)) > frame.createCassandraTable( > keySpaceName, > table, > partitionKeyColumns = Some(Seq("id"))) > frame > .write > .format("org.apache.spark.sql.cassandra") > .mode(SaveMode.Append) > .options(Map("table" -> table, "keyspace" -> keySpaceName)) > .save() > val loaded = sparkSession.sqlContext > .read > .format("org.apache.spark.sql.cassandra") > .options(Map("table" -> table, "keyspace" -> ks)) > .load() > .select("id", "col1", "col2") > val min1 = loaded.groupBy("id").agg(min("col1").as("min")) > val min2 = loaded.groupBy("id").agg(min("col2").as("min")) > min1.union(min2).show() > /* prints: > +---+---+ > | id|min| > +---+---+ > | A| 1| > | A| 1| > +---+---+ > Should be > | A| 1| > | A| 7| > */ > {code} > I looked into the explain pattern and saw > {code} > Union > :- *HashAggregate(keys=[id#93], functions=[min(col1#94)]) > : +- Exchange hashpartitioning(id#93, 200) > : +- *HashAggregate(keys=[id#93], functions=[partial_min(col1#94)]) > :+- *Scan > org.apache.spark.sql.cassandra.CassandraSourceRelation@7ec20844 > [id#93,col1#94] > +- *HashAggregate(keys=[id#93], functions=[min(col2#95)]) >+- ReusedExchange [id#93, min#153], Exchange hashpartitioning(id#93, 200) > {code} > Which was different than using a parallelized collection as the DF backing. > So I tested the same code with a Parquet backed DF and saw the same results. > {code} > frame.write.parquet("garbagetest") > val parquet = sparkSession.read.parquet("garbagetest").select("id", > "col1", "col2") > println("PDF") > parquetmin1.union(parquetmin2).explain() > parquetmin1.union(parquetmin2).show() > /* prints: > +---+---+ > | id|min| > +---+---+ > | A| 1| > | A| 1| > +---+---+ > */ > {code} > Which leads me to believe there is something wrong with the reused exchange. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17673) Reused Exchange Aggregations Produce Incorrect Results
[ https://issues.apache.org/jira/browse/SPARK-17673?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15525011#comment-15525011 ] Reynold Xin commented on SPARK-17673: - The only thing differentiating the two sides of the plan is column pruning right? It is possible the issue I mentioned earlier is the culprit. > Reused Exchange Aggregations Produce Incorrect Results > -- > > Key: SPARK-17673 > URL: https://issues.apache.org/jira/browse/SPARK-17673 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.0, 2.0.1 >Reporter: Russell Spitzer >Priority: Critical > Labels: correctness > > https://datastax-oss.atlassian.net/browse/SPARKC-429 > Was brought to my attention where the following code produces incorrect > results > {code} > val data = List(TestData("A", 1, 7)) > val frame = > session.sqlContext.createDataFrame(session.sparkContext.parallelize(data)) > frame.createCassandraTable( > keySpaceName, > table, > partitionKeyColumns = Some(Seq("id"))) > frame > .write > .format("org.apache.spark.sql.cassandra") > .mode(SaveMode.Append) > .options(Map("table" -> table, "keyspace" -> keySpaceName)) > .save() > val loaded = sparkSession.sqlContext > .read > .format("org.apache.spark.sql.cassandra") > .options(Map("table" -> table, "keyspace" -> ks)) > .load() > .select("id", "col1", "col2") > val min1 = loaded.groupBy("id").agg(min("col1").as("min")) > val min2 = loaded.groupBy("id").agg(min("col2").as("min")) > min1.union(min2).show() > /* prints: > +---+---+ > | id|min| > +---+---+ > | A| 1| > | A| 1| > +---+---+ > Should be > | A| 1| > | A| 7| > */ > {code} > I looked into the explain pattern and saw > {code} > Union > :- *HashAggregate(keys=[id#93], functions=[min(col1#94)]) > : +- Exchange hashpartitioning(id#93, 200) > : +- *HashAggregate(keys=[id#93], functions=[partial_min(col1#94)]) > :+- *Scan > org.apache.spark.sql.cassandra.CassandraSourceRelation@7ec20844 > [id#93,col1#94] > +- *HashAggregate(keys=[id#93], functions=[min(col2#95)]) >+- ReusedExchange [id#93, min#153], Exchange hashpartitioning(id#93, 200) > {code} > Which was different than using a parallelized collection as the DF backing. > So I tested the same code with a Parquet backed DF and saw the same results. > {code} > frame.write.parquet("garbagetest") > val parquet = sparkSession.read.parquet("garbagetest").select("id", > "col1", "col2") > println("PDF") > parquetmin1.union(parquetmin2).explain() > parquetmin1.union(parquetmin2).show() > /* prints: > +---+---+ > | id|min| > +---+---+ > | A| 1| > | A| 1| > +---+---+ > */ > {code} > Which leads me to believe there is something wrong with the reused exchange. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17673) Reused Exchange Aggregations Produce Incorrect Results
[ https://issues.apache.org/jira/browse/SPARK-17673?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15525007#comment-15525007 ] Russell Spitzer commented on SPARK-17673: - Looking at this plan ``` Union :- *HashAggregate(keys=[id#93], functions=[min(col1#94)]) : +- Exchange hashpartitioning(id#93, 200) : +- *HashAggregate(keys=[id#93], functions=[partial_min(col1#94)]) :+- *Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@7ec20844 [id#93,col1#94] +- *HashAggregate(keys=[id#93], functions=[min(col2#95)]) +- ReusedExchange [id#93, min#153], Exchange hashpartitioning(id#93, 200)``` I see it reuses the hash aggregate from the partial min on col1 even though the hash aggregate that runs it does min col2. Am I reading that right? The column names don't even match so I'm confused how that gets through? > Reused Exchange Aggregations Produce Incorrect Results > -- > > Key: SPARK-17673 > URL: https://issues.apache.org/jira/browse/SPARK-17673 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.0, 2.0.1 >Reporter: Russell Spitzer >Priority: Critical > Labels: correctness > > https://datastax-oss.atlassian.net/browse/SPARKC-429 > Was brought to my attention where the following code produces incorrect > results > {code} > val data = List(TestData("A", 1, 7)) > val frame = > session.sqlContext.createDataFrame(session.sparkContext.parallelize(data)) > frame.createCassandraTable( > keySpaceName, > table, > partitionKeyColumns = Some(Seq("id"))) > frame > .write > .format("org.apache.spark.sql.cassandra") > .mode(SaveMode.Append) > .options(Map("table" -> table, "keyspace" -> keySpaceName)) > .save() > val loaded = sparkSession.sqlContext > .read > .format("org.apache.spark.sql.cassandra") > .options(Map("table" -> table, "keyspace" -> ks)) > .load() > .select("id", "col1", "col2") > val min1 = loaded.groupBy("id").agg(min("col1").as("min")) > val min2 = loaded.groupBy("id").agg(min("col2").as("min")) > min1.union(min2).show() > /* prints: > +---+---+ > | id|min| > +---+---+ > | A| 1| > | A| 1| > +---+---+ > Should be > | A| 1| > | A| 7| > */ > {code} > I looked into the explain pattern and saw > {code} > Union > :- *HashAggregate(keys=[id#93], functions=[min(col1#94)]) > : +- Exchange hashpartitioning(id#93, 200) > : +- *HashAggregate(keys=[id#93], functions=[partial_min(col1#94)]) > :+- *Scan > org.apache.spark.sql.cassandra.CassandraSourceRelation@7ec20844 > [id#93,col1#94] > +- *HashAggregate(keys=[id#93], functions=[min(col2#95)]) >+- ReusedExchange [id#93, min#153], Exchange hashpartitioning(id#93, 200) > {code} > Which was different than using a parallelized collection as the DF backing. > So I tested the same code with a Parquet backed DF and saw the same results. > {code} > frame.write.parquet("garbagetest") > val parquet = sparkSession.read.parquet("garbagetest").select("id", > "col1", "col2") > println("PDF") > parquetmin1.union(parquetmin2).explain() > parquetmin1.union(parquetmin2).show() > /* prints: > +---+---+ > | id|min| > +---+---+ > | A| 1| > | A| 1| > +---+---+ > */ > {code} > Which leads me to believe there is something wrong with the reused exchange. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17673) Reused Exchange Aggregations Produce Incorrect Results
[ https://issues.apache.org/jira/browse/SPARK-17673?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15525000#comment-15525000 ] Reynold Xin commented on SPARK-17673: - RowDataSourceScanExec.sameResult is probably the problem: {code} // Ignore rdd when checking results override def sameResult(plan: SparkPlan): Boolean = plan match { case other: RowDataSourceScanExec => relation == other.relation && metadata == other.metadata case _ => false } {code} First glance -- it looks like it ignores predicate pushdown entirely. > Reused Exchange Aggregations Produce Incorrect Results > -- > > Key: SPARK-17673 > URL: https://issues.apache.org/jira/browse/SPARK-17673 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.0, 2.0.1 >Reporter: Russell Spitzer >Priority: Critical > Labels: correctness > > https://datastax-oss.atlassian.net/browse/SPARKC-429 > Was brought to my attention where the following code produces incorrect > results > {code} > val data = List(TestData("A", 1, 7)) > val frame = > session.sqlContext.createDataFrame(session.sparkContext.parallelize(data)) > frame.createCassandraTable( > keySpaceName, > table, > partitionKeyColumns = Some(Seq("id"))) > frame > .write > .format("org.apache.spark.sql.cassandra") > .mode(SaveMode.Append) > .options(Map("table" -> table, "keyspace" -> keySpaceName)) > .save() > val loaded = sparkSession.sqlContext > .read > .format("org.apache.spark.sql.cassandra") > .options(Map("table" -> table, "keyspace" -> ks)) > .load() > .select("id", "col1", "col2") > val min1 = loaded.groupBy("id").agg(min("col1").as("min")) > val min2 = loaded.groupBy("id").agg(min("col2").as("min")) > min1.union(min2).show() > /* prints: > +---+---+ > | id|min| > +---+---+ > | A| 1| > | A| 1| > +---+---+ > Should be > | A| 1| > | A| 7| > */ > {code} > I looked into the explain pattern and saw > {code} > Union > :- *HashAggregate(keys=[id#93], functions=[min(col1#94)]) > : +- Exchange hashpartitioning(id#93, 200) > : +- *HashAggregate(keys=[id#93], functions=[partial_min(col1#94)]) > :+- *Scan > org.apache.spark.sql.cassandra.CassandraSourceRelation@7ec20844 > [id#93,col1#94] > +- *HashAggregate(keys=[id#93], functions=[min(col2#95)]) >+- ReusedExchange [id#93, min#153], Exchange hashpartitioning(id#93, 200) > {code} > Which was different than using a parallelized collection as the DF backing. > So I tested the same code with a Parquet backed DF and saw the same results. > {code} > frame.write.parquet("garbagetest") > val parquet = sparkSession.read.parquet("garbagetest").select("id", > "col1", "col2") > println("PDF") > parquetmin1.union(parquetmin2).explain() > parquetmin1.union(parquetmin2).show() > /* prints: > +---+---+ > | id|min| > +---+---+ > | A| 1| > | A| 1| > +---+---+ > */ > {code} > Which leads me to believe there is something wrong with the reused exchange. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17673) Reused Exchange Aggregations Produce Incorrect Results
[ https://issues.apache.org/jira/browse/SPARK-17673?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15524999#comment-15524999 ] Russell Spitzer commented on SPARK-17673: - We shouldn't be ... The only thing we cache are underlying database connections and queries which shouldn't factor into this I would think :/ > Reused Exchange Aggregations Produce Incorrect Results > -- > > Key: SPARK-17673 > URL: https://issues.apache.org/jira/browse/SPARK-17673 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.0, 2.0.1 >Reporter: Russell Spitzer >Priority: Critical > Labels: correctness > > https://datastax-oss.atlassian.net/browse/SPARKC-429 > Was brought to my attention where the following code produces incorrect > results > {code} > val data = List(TestData("A", 1, 7)) > val frame = > session.sqlContext.createDataFrame(session.sparkContext.parallelize(data)) > frame.createCassandraTable( > keySpaceName, > table, > partitionKeyColumns = Some(Seq("id"))) > frame > .write > .format("org.apache.spark.sql.cassandra") > .mode(SaveMode.Append) > .options(Map("table" -> table, "keyspace" -> keySpaceName)) > .save() > val loaded = sparkSession.sqlContext > .read > .format("org.apache.spark.sql.cassandra") > .options(Map("table" -> table, "keyspace" -> ks)) > .load() > .select("id", "col1", "col2") > val min1 = loaded.groupBy("id").agg(min("col1").as("min")) > val min2 = loaded.groupBy("id").agg(min("col2").as("min")) > min1.union(min2).show() > /* prints: > +---+---+ > | id|min| > +---+---+ > | A| 1| > | A| 1| > +---+---+ > Should be > | A| 1| > | A| 7| > */ > {code} > I looked into the explain pattern and saw > {code} > Union > :- *HashAggregate(keys=[id#93], functions=[min(col1#94)]) > : +- Exchange hashpartitioning(id#93, 200) > : +- *HashAggregate(keys=[id#93], functions=[partial_min(col1#94)]) > :+- *Scan > org.apache.spark.sql.cassandra.CassandraSourceRelation@7ec20844 > [id#93,col1#94] > +- *HashAggregate(keys=[id#93], functions=[min(col2#95)]) >+- ReusedExchange [id#93, min#153], Exchange hashpartitioning(id#93, 200) > {code} > Which was different than using a parallelized collection as the DF backing. > So I tested the same code with a Parquet backed DF and saw the same results. > {code} > frame.write.parquet("garbagetest") > val parquet = sparkSession.read.parquet("garbagetest").select("id", > "col1", "col2") > println("PDF") > parquetmin1.union(parquetmin2).explain() > parquetmin1.union(parquetmin2).show() > /* prints: > +---+---+ > | id|min| > +---+---+ > | A| 1| > | A| 1| > +---+---+ > */ > {code} > Which leads me to believe there is something wrong with the reused exchange. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17673) Reused Exchange Aggregations Produce Incorrect Results
[ https://issues.apache.org/jira/browse/SPARK-17673?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15524993#comment-15524993 ] Herman van Hovell commented on SPARK-17673: --- [~rspitzer] we only reuse an exchange when they produce the same result. You can find the implementation for row datasource scans (what you are probably using) here https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala#L124-L128 I have taken a look at the cassandra datasource code, but that does not seem to implement equals at all and should not cause a problem. Do you cache instances of CassandraRelation at some point? > Reused Exchange Aggregations Produce Incorrect Results > -- > > Key: SPARK-17673 > URL: https://issues.apache.org/jira/browse/SPARK-17673 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.0, 2.0.1 >Reporter: Russell Spitzer >Priority: Critical > Labels: correctness > > https://datastax-oss.atlassian.net/browse/SPARKC-429 > Was brought to my attention where the following code produces incorrect > results > {code} > val data = List(TestData("A", 1, 7)) > val frame = > session.sqlContext.createDataFrame(session.sparkContext.parallelize(data)) > frame.createCassandraTable( > keySpaceName, > table, > partitionKeyColumns = Some(Seq("id"))) > frame > .write > .format("org.apache.spark.sql.cassandra") > .mode(SaveMode.Append) > .options(Map("table" -> table, "keyspace" -> keySpaceName)) > .save() > val loaded = sparkSession.sqlContext > .read > .format("org.apache.spark.sql.cassandra") > .options(Map("table" -> table, "keyspace" -> ks)) > .load() > .select("id", "col1", "col2") > val min1 = loaded.groupBy("id").agg(min("col1").as("min")) > val min2 = loaded.groupBy("id").agg(min("col2").as("min")) > min1.union(min2).show() > /* prints: > +---+---+ > | id|min| > +---+---+ > | A| 1| > | A| 1| > +---+---+ > Should be > | A| 1| > | A| 7| > */ > {code} > I looked into the explain pattern and saw > {code} > Union > :- *HashAggregate(keys=[id#93], functions=[min(col1#94)]) > : +- Exchange hashpartitioning(id#93, 200) > : +- *HashAggregate(keys=[id#93], functions=[partial_min(col1#94)]) > :+- *Scan > org.apache.spark.sql.cassandra.CassandraSourceRelation@7ec20844 > [id#93,col1#94] > +- *HashAggregate(keys=[id#93], functions=[min(col2#95)]) >+- ReusedExchange [id#93, min#153], Exchange hashpartitioning(id#93, 200) > {code} > Which was different than using a parallelized collection as the DF backing. > So I tested the same code with a Parquet backed DF and saw the same results. > {code} > frame.write.parquet("garbagetest") > val parquet = sparkSession.read.parquet("garbagetest").select("id", > "col1", "col2") > println("PDF") > parquetmin1.union(parquetmin2).explain() > parquetmin1.union(parquetmin2).show() > /* prints: > +---+---+ > | id|min| > +---+---+ > | A| 1| > | A| 1| > +---+---+ > */ > {code} > Which leads me to believe there is something wrong with the reused exchange. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17673) Reused Exchange Aggregations Produce Incorrect Results
[ https://issues.apache.org/jira/browse/SPARK-17673?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15524848#comment-15524848 ] Russell Spitzer commented on SPARK-17673: - I couldn't get this to happen without C*, hopefully tomorrow I can get some guidance tomorrow :/ It could be a hashcode / equals thing but we don't override those in the base class. Also i'm a little confused because this should be the same "grouping" operation on the RDD just with a different aggregate. I don't know enough about the ReusedExchange to know when it's applied and why. > Reused Exchange Aggregations Produce Incorrect Results > -- > > Key: SPARK-17673 > URL: https://issues.apache.org/jira/browse/SPARK-17673 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.0, 2.0.1 >Reporter: Russell Spitzer >Priority: Critical > Labels: correctness > > https://datastax-oss.atlassian.net/browse/SPARKC-429 > Was brought to my attention where the following code produces incorrect > results > {code} > val data = List(TestData("A", 1, 7)) > val frame = > session.sqlContext.createDataFrame(session.sparkContext.parallelize(data)) > frame.createCassandraTable( > keySpaceName, > table, > partitionKeyColumns = Some(Seq("id"))) > frame > .write > .format("org.apache.spark.sql.cassandra") > .mode(SaveMode.Append) > .options(Map("table" -> table, "keyspace" -> keySpaceName)) > .save() > val loaded = sparkSession.sqlContext > .read > .format("org.apache.spark.sql.cassandra") > .options(Map("table" -> table, "keyspace" -> ks)) > .load() > .select("id", "col1", "col2") > val min1 = loaded.groupBy("id").agg(min("col1").as("min")) > val min2 = loaded.groupBy("id").agg(min("col2").as("min")) > min1.union(min2).show() > /* prints: > +---+---+ > | id|min| > +---+---+ > | A| 1| > | A| 1| > +---+---+ > Should be > | A| 1| > | A| 7| > */ > {code} > I looked into the explain pattern and saw > {code} > Union > :- *HashAggregate(keys=[id#93], functions=[min(col1#94)]) > : +- Exchange hashpartitioning(id#93, 200) > : +- *HashAggregate(keys=[id#93], functions=[partial_min(col1#94)]) > :+- *Scan > org.apache.spark.sql.cassandra.CassandraSourceRelation@7ec20844 > [id#93,col1#94] > +- *HashAggregate(keys=[id#93], functions=[min(col2#95)]) >+- ReusedExchange [id#93, min#153], Exchange hashpartitioning(id#93, 200) > {code} > Which was different than using a parallelized collection as the DF backing. > So I tested the same code with a Parquet backed DF and saw the same results. > {code} > frame.write.parquet("garbagetest") > val parquet = sparkSession.read.parquet("garbagetest").select("id", > "col1", "col2") > println("PDF") > parquetmin1.union(parquetmin2).explain() > parquetmin1.union(parquetmin2).show() > /* prints: > +---+---+ > | id|min| > +---+---+ > | A| 1| > | A| 1| > +---+---+ > */ > {code} > Which leads me to believe there is something wrong with the reused exchange. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17673) Reused Exchange Aggregations Produce Incorrect Results
[ https://issues.apache.org/jira/browse/SPARK-17673?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15524734#comment-15524734 ] Russell Spitzer commented on SPARK-17673: - Well in this case they are equal correct? > Reused Exchange Aggregations Produce Incorrect Results > -- > > Key: SPARK-17673 > URL: https://issues.apache.org/jira/browse/SPARK-17673 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.0, 2.0.1 >Reporter: Russell Spitzer >Priority: Critical > Labels: correctness > > https://datastax-oss.atlassian.net/browse/SPARKC-429 > Was brought to my attention where the following code produces incorrect > results > {code} > val data = List(TestData("A", 1, 7)) > val frame = > session.sqlContext.createDataFrame(session.sparkContext.parallelize(data)) > frame.createCassandraTable( > keySpaceName, > table, > partitionKeyColumns = Some(Seq("id"))) > frame > .write > .format("org.apache.spark.sql.cassandra") > .mode(SaveMode.Append) > .options(Map("table" -> table, "keyspace" -> keySpaceName)) > .save() > val loaded = sparkSession.sqlContext > .read > .format("org.apache.spark.sql.cassandra") > .options(Map("table" -> table, "keyspace" -> ks)) > .load() > .select("id", "col1", "col2") > val min1 = loaded.groupBy("id").agg(min("col1").as("min")) > val min2 = loaded.groupBy("id").agg(min("col2").as("min")) > min1.union(min2).show() > /* prints: > +---+---+ > | id|min| > +---+---+ > | A| 1| > | A| 1| > +---+---+ > Should be > | A| 1| > | A| 7| > */ > {code} > I looked into the explain pattern and saw > {code} > Union > :- *HashAggregate(keys=[id#93], functions=[min(col1#94)]) > : +- Exchange hashpartitioning(id#93, 200) > : +- *HashAggregate(keys=[id#93], functions=[partial_min(col1#94)]) > :+- *Scan > org.apache.spark.sql.cassandra.CassandraSourceRelation@7ec20844 > [id#93,col1#94] > +- *HashAggregate(keys=[id#93], functions=[min(col2#95)]) >+- ReusedExchange [id#93, min#153], Exchange hashpartitioning(id#93, 200) > {code} > Which was different than using a parallelized collection as the DF backing. > So I tested the same code with a Parquet backed DF and saw the same results. > {code} > frame.write.parquet("garbagetest") > val parquet = sparkSession.read.parquet("garbagetest").select("id", > "col1", "col2") > println("PDF") > parquetmin1.union(parquetmin2).explain() > parquetmin1.union(parquetmin2).show() > /* prints: > +---+---+ > | id|min| > +---+---+ > | A| 1| > | A| 1| > +---+---+ > */ > {code} > Which leads me to believe there is something wrong with the reused exchange. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17673) Reused Exchange Aggregations Produce Incorrect Results
[ https://issues.apache.org/jira/browse/SPARK-17673?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15524725#comment-15524725 ] Reynold Xin commented on SPARK-17673: - It's possible if hashCode and equals are not defined properly in the Cassandra data source. > Reused Exchange Aggregations Produce Incorrect Results > -- > > Key: SPARK-17673 > URL: https://issues.apache.org/jira/browse/SPARK-17673 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.0, 2.0.1 >Reporter: Russell Spitzer >Priority: Critical > Labels: correctness > > https://datastax-oss.atlassian.net/browse/SPARKC-429 > Was brought to my attention where the following code produces incorrect > results > {code} > val data = List(TestData("A", 1, 7)) > val frame = > session.sqlContext.createDataFrame(session.sparkContext.parallelize(data)) > frame.createCassandraTable( > keySpaceName, > table, > partitionKeyColumns = Some(Seq("id"))) > frame > .write > .format("org.apache.spark.sql.cassandra") > .mode(SaveMode.Append) > .options(Map("table" -> table, "keyspace" -> keySpaceName)) > .save() > val loaded = sparkSession.sqlContext > .read > .format("org.apache.spark.sql.cassandra") > .options(Map("table" -> table, "keyspace" -> ks)) > .load() > .select("id", "col1", "col2") > val min1 = loaded.groupBy("id").agg(min("col1").as("min")) > val min2 = loaded.groupBy("id").agg(min("col2").as("min")) > min1.union(min2).show() > /* prints: > +---+---+ > | id|min| > +---+---+ > | A| 1| > | A| 1| > +---+---+ > Should be > | A| 1| > | A| 7| > */ > {code} > I looked into the explain pattern and saw > {code} > Union > :- *HashAggregate(keys=[id#93], functions=[min(col1#94)]) > : +- Exchange hashpartitioning(id#93, 200) > : +- *HashAggregate(keys=[id#93], functions=[partial_min(col1#94)]) > :+- *Scan > org.apache.spark.sql.cassandra.CassandraSourceRelation@7ec20844 > [id#93,col1#94] > +- *HashAggregate(keys=[id#93], functions=[min(col2#95)]) >+- ReusedExchange [id#93, min#153], Exchange hashpartitioning(id#93, 200) > {code} > Which was different than using a parallelized collection as the DF backing. > So I tested the same code with a Parquet backed DF and saw the same results. > {code} > frame.write.parquet("garbagetest") > val parquet = sparkSession.read.parquet("garbagetest").select("id", > "col1", "col2") > println("PDF") > parquetmin1.union(parquetmin2).explain() > parquetmin1.union(parquetmin2).show() > /* prints: > +---+---+ > | id|min| > +---+---+ > | A| 1| > | A| 1| > +---+---+ > */ > {code} > Which leads me to believe there is something wrong with the reused exchange. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17673) Reused Exchange Aggregations Produce Incorrect Results
[ https://issues.apache.org/jira/browse/SPARK-17673?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15524722#comment-15524722 ] Russell Spitzer commented on SPARK-17673: - Ugh I made a typo in my Parquet Example I don't see it repoing there now. Let me run investigate a little more as to why this would affect the C* Source... > Reused Exchange Aggregations Produce Incorrect Results > -- > > Key: SPARK-17673 > URL: https://issues.apache.org/jira/browse/SPARK-17673 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.0, 2.0.1 >Reporter: Russell Spitzer >Priority: Critical > Labels: correctness > > https://datastax-oss.atlassian.net/browse/SPARKC-429 > Was brought to my attention where the following code produces incorrect > results > {code} > val data = List(TestData("A", 1, 7)) > val frame = > session.sqlContext.createDataFrame(session.sparkContext.parallelize(data)) > frame.createCassandraTable( > keySpaceName, > table, > partitionKeyColumns = Some(Seq("id"))) > frame > .write > .format("org.apache.spark.sql.cassandra") > .mode(SaveMode.Append) > .options(Map("table" -> table, "keyspace" -> keySpaceName)) > .save() > val loaded = sparkSession.sqlContext > .read > .format("org.apache.spark.sql.cassandra") > .options(Map("table" -> table, "keyspace" -> ks)) > .load() > .select("id", "col1", "col2") > val min1 = loaded.groupBy("id").agg(min("col1").as("min")) > val min2 = loaded.groupBy("id").agg(min("col2").as("min")) > min1.union(min2).show() > /* prints: > +---+---+ > | id|min| > +---+---+ > | A| 1| > | A| 1| > +---+---+ > Should be > | A| 1| > | A| 7| > */ > {code} > I looked into the explain pattern and saw > {code} > Union > :- *HashAggregate(keys=[id#93], functions=[min(col1#94)]) > : +- Exchange hashpartitioning(id#93, 200) > : +- *HashAggregate(keys=[id#93], functions=[partial_min(col1#94)]) > :+- *Scan > org.apache.spark.sql.cassandra.CassandraSourceRelation@7ec20844 > [id#93,col1#94] > +- *HashAggregate(keys=[id#93], functions=[min(col2#95)]) >+- ReusedExchange [id#93, min#153], Exchange hashpartitioning(id#93, 200) > {code} > Which was different than using a parallelized collection as the DF backing. > So I tested the same code with a Parquet backed DF and saw the same results. > {code} > frame.write.parquet("garbagetest") > val parquet = sparkSession.read.parquet("garbagetest").select("id", > "col1", "col2") > println("PDF") > parquetmin1.union(parquetmin2).explain() > parquetmin1.union(parquetmin2).show() > /* prints: > +---+---+ > | id|min| > +---+---+ > | A| 1| > | A| 1| > +---+---+ > */ > {code} > Which leads me to believe there is something wrong with the reused exchange. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17673) Reused Exchange Aggregations Produce Incorrect Results
[ https://issues.apache.org/jira/browse/SPARK-17673?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15524687#comment-15524687 ] Reynold Xin commented on SPARK-17673: - Can you help create a repro (without the need to connect Cassandra)? > Reused Exchange Aggregations Produce Incorrect Results > -- > > Key: SPARK-17673 > URL: https://issues.apache.org/jira/browse/SPARK-17673 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.0, 2.0.1 >Reporter: Russell Spitzer > Labels: correctness > > https://datastax-oss.atlassian.net/browse/SPARKC-429 > Was brought to my attention where the following code produces incorrect > results > {code} > val data = List(TestData("A", 1, 7)) > val frame = > session.sqlContext.createDataFrame(session.sparkContext.parallelize(data)) > frame.createCassandraTable( > keySpaceName, > table, > partitionKeyColumns = Some(Seq("id"))) > frame > .write > .format("org.apache.spark.sql.cassandra") > .mode(SaveMode.Append) > .options(Map("table" -> table, "keyspace" -> keySpaceName)) > .save() > val loaded = sparkSession.sqlContext > .read > .format("org.apache.spark.sql.cassandra") > .options(Map("table" -> table, "keyspace" -> ks)) > .load() > .select("id", "col1", "col2") > val min1 = loaded.groupBy("id").agg(min("col1").as("min")) > val min2 = loaded.groupBy("id").agg(min("col2").as("min")) > min1.union(min2).show() > /* prints: > +---+---+ > | id|min| > +---+---+ > | A| 1| > | A| 1| > +---+---+ > Should be > | A| 1| > | A| 7| > */ > {code} > I looked into the explain pattern and saw > {code} > Union > :- *HashAggregate(keys=[id#93], functions=[min(col1#94)]) > : +- Exchange hashpartitioning(id#93, 200) > : +- *HashAggregate(keys=[id#93], functions=[partial_min(col1#94)]) > :+- *Scan > org.apache.spark.sql.cassandra.CassandraSourceRelation@7ec20844 > [id#93,col1#94] > +- *HashAggregate(keys=[id#93], functions=[min(col2#95)]) >+- ReusedExchange [id#93, min#153], Exchange hashpartitioning(id#93, 200) > {code} > Which was different than using a parallelized collection as the DF backing. > So I tested the same code with a Parquet backed DF and saw the same results. > {code} > frame.write.parquet("garbagetest") > val parquet = sparkSession.read.parquet("garbagetest").select("id", > "col1", "col2") > println("PDF") > parquetmin1.union(parquetmin2).explain() > parquetmin1.union(parquetmin2).show() > /* prints: > +---+---+ > | id|min| > +---+---+ > | A| 1| > | A| 1| > +---+---+ > */ > {code} > Which leads me to believe there is something wrong with the reused exchange. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17673) Reused Exchange Aggregations Produce Incorrect Results
[ https://issues.apache.org/jira/browse/SPARK-17673?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15524641#comment-15524641 ] Russell Spitzer commented on SPARK-17673: - I only ran this on 2.0.0 and 2.0.1 > Reused Exchange Aggregations Produce Incorrect Results > -- > > Key: SPARK-17673 > URL: https://issues.apache.org/jira/browse/SPARK-17673 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.0, 2.0.1 >Reporter: Russell Spitzer > Labels: correctness > > https://datastax-oss.atlassian.net/browse/SPARKC-429 > Was brought to my attention where the following code produces incorrect > results > {code} > val data = List(TestData("A", 1, 7)) > val frame = > session.sqlContext.createDataFrame(session.sparkContext.parallelize(data)) > frame.createCassandraTable( > keySpaceName, > table, > partitionKeyColumns = Some(Seq("id"))) > frame > .write > .format("org.apache.spark.sql.cassandra") > .mode(SaveMode.Append) > .options(Map("table" -> table, "keyspace" -> keySpaceName)) > .save() > val loaded = sparkSession.sqlContext > .read > .format("org.apache.spark.sql.cassandra") > .options(Map("table" -> table, "keyspace" -> ks)) > .load() > .select("id", "col1", "col2") > val min1 = loaded.groupBy("id").agg(min("col1").as("min")) > val min2 = loaded.groupBy("id").agg(min("col2").as("min")) > min1.union(min2).show() > /* prints: > +---+---+ > | id|min| > +---+---+ > | A| 1| > | A| 1| > +---+---+ > Should be > | A| 1| > | A| 7| > */ > {code} > I looked into the explain pattern and saw > {code} > Union > :- *HashAggregate(keys=[id#93], functions=[min(col1#94)]) > : +- Exchange hashpartitioning(id#93, 200) > : +- *HashAggregate(keys=[id#93], functions=[partial_min(col1#94)]) > :+- *Scan > org.apache.spark.sql.cassandra.CassandraSourceRelation@7ec20844 > [id#93,col1#94] > +- *HashAggregate(keys=[id#93], functions=[min(col2#95)]) >+- ReusedExchange [id#93, min#153], Exchange hashpartitioning(id#93, 200) > {code} > Which was different than using a parallelized collection as the DF backing. > So I tested the same code with a Parquet backed DF and saw the same results. > {code} > frame.write.parquet("garbagetest") > val parquet = sparkSession.read.parquet("garbagetest").select("id", > "col1", "col2") > println("PDF") > parquetmin1.union(parquetmin2).explain() > parquetmin1.union(parquetmin2).show() > /* prints: > +---+---+ > | id|min| > +---+---+ > | A| 1| > | A| 1| > +---+---+ > */ > {code} > Which leads me to believe there is something wrong with the reused exchange. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17673) Reused Exchange Aggregations Produce Incorrect Results
[ https://issues.apache.org/jira/browse/SPARK-17673?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15524581#comment-15524581 ] Herman van Hovell commented on SPARK-17673: --- [~russell spitzer] Are you using Spark 2.0 or the latest master? > Reused Exchange Aggregations Produce Incorrect Results > -- > > Key: SPARK-17673 > URL: https://issues.apache.org/jira/browse/SPARK-17673 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.0, 2.0.1 >Reporter: Russell Spitzer > Labels: correctness > > https://datastax-oss.atlassian.net/browse/SPARKC-429 > Was brought to my attention where the following code produces incorrect > results > {code} > val data = List(TestData("A", 1, 7)) > val frame = > session.sqlContext.createDataFrame(session.sparkContext.parallelize(data)) > frame.createCassandraTable( > keySpaceName, > table, > partitionKeyColumns = Some(Seq("id"))) > frame > .write > .format("org.apache.spark.sql.cassandra") > .mode(SaveMode.Append) > .options(Map("table" -> table, "keyspace" -> keySpaceName)) > .save() > val loaded = sparkSession.sqlContext > .read > .format("org.apache.spark.sql.cassandra") > .options(Map("table" -> table, "keyspace" -> ks)) > .load() > .select("id", "col1", "col2") > val min1 = loaded.groupBy("id").agg(min("col1").as("min")) > val min2 = loaded.groupBy("id").agg(min("col2").as("min")) > min1.union(min2).show() > /* prints: > +---+---+ > | id|min| > +---+---+ > | A| 1| > | A| 1| > +---+---+ > Should be > | A| 1| > | A| 7| > */ > {code} > I looked into the explain pattern and saw > {code} > Union > :- *HashAggregate(keys=[id#93], functions=[min(col1#94)]) > : +- Exchange hashpartitioning(id#93, 200) > : +- *HashAggregate(keys=[id#93], functions=[partial_min(col1#94)]) > :+- *Scan > org.apache.spark.sql.cassandra.CassandraSourceRelation@7ec20844 > [id#93,col1#94] > +- *HashAggregate(keys=[id#93], functions=[min(col2#95)]) >+- ReusedExchange [id#93, min#153], Exchange hashpartitioning(id#93, 200) > {code} > Which was different than using a parallelized collection as the DF backing. > So I tested the same code with a Parquet backed DF and saw the same results. > {code} > frame.write.parquet("garbagetest") > val parquet = sparkSession.read.parquet("garbagetest").select("id", > "col1", "col2") > println("PDF") > parquetmin1.union(parquetmin2).explain() > parquetmin1.union(parquetmin2).show() > /* prints: > +---+---+ > | id|min| > +---+---+ > | A| 1| > | A| 1| > +---+---+ > */ > {code} > Which leads me to believe there is something wrong with the reused exchange. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org