[jira] [Commented] (SPARK-17673) Reused Exchange Aggregations Produce Incorrect Results

2016-09-28 Thread Russell Spitzer (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17673?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15531265#comment-15531265
 ] 

Russell Spitzer commented on SPARK-17673:
-

Looks good on my end

{code}
scala> min2.union(min1).show()
+---+---+
| id|min|
+---+---+
|  A|  7|
|  A|  1|
+---+---+


scala> min1.union(min2).show()
+---+---+
| id|min|
+---+---+
|  A|  1|
|  A|  7|
+---+---+
{code}

> Reused Exchange Aggregations Produce Incorrect Results
> --
>
> Key: SPARK-17673
> URL: https://issues.apache.org/jira/browse/SPARK-17673
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0, 2.0.1
>Reporter: Russell Spitzer
>Assignee: Eric Liang
>Priority: Blocker
>  Labels: correctness
> Fix For: 2.0.1, 2.1.0
>
>
> https://datastax-oss.atlassian.net/browse/SPARKC-429
> Was brought to my attention where the following code produces incorrect 
> results
> {code}
>  val data = List(TestData("A", 1, 7))
> val frame = 
> session.sqlContext.createDataFrame(session.sparkContext.parallelize(data))
> frame.createCassandraTable(
>   keySpaceName,
>   table,
>   partitionKeyColumns = Some(Seq("id")))
> frame
>   .write
>   .format("org.apache.spark.sql.cassandra")
>   .mode(SaveMode.Append)
>   .options(Map("table" -> table, "keyspace" -> keySpaceName))
>   .save()
> val loaded = sparkSession.sqlContext
>   .read
>   .format("org.apache.spark.sql.cassandra")
>   .options(Map("table" -> table, "keyspace" -> ks))
>   .load()
>   .select("id", "col1", "col2")
> val min1 = loaded.groupBy("id").agg(min("col1").as("min"))
> val min2 = loaded.groupBy("id").agg(min("col2").as("min"))
>  min1.union(min2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
>  Should be 
>   | A| 1|
>   | A| 7|
>  */
> {code}
> I looked into the explain pattern and saw 
> {code}
> Union
> :- *HashAggregate(keys=[id#93], functions=[min(col1#94)])
> :  +- Exchange hashpartitioning(id#93, 200)
> : +- *HashAggregate(keys=[id#93], functions=[partial_min(col1#94)])
> :+- *Scan 
> org.apache.spark.sql.cassandra.CassandraSourceRelation@7ec20844 
> [id#93,col1#94]
> +- *HashAggregate(keys=[id#93], functions=[min(col2#95)])
>+- ReusedExchange [id#93, min#153], Exchange hashpartitioning(id#93, 200)
> {code}
> Which was different than using a parallelized collection as the DF backing. 
> So I tested the same code with a Parquet backed DF and saw the same results.
> {code}
> frame.write.parquet("garbagetest")
> val parquet = sparkSession.read.parquet("garbagetest").select("id", 
> "col1", "col2")
> println("PDF")
> parquetmin1.union(parquetmin2).explain()
> parquetmin1.union(parquetmin2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
> */
> {code}
> Which leads me to believe there is something wrong with the reused exchange. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17673) Reused Exchange Aggregations Produce Incorrect Results

2016-09-28 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17673?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15530797#comment-15530797
 ] 

Apache Spark commented on SPARK-17673:
--

User 'ericl' has created a pull request for this issue:
https://github.com/apache/spark/pull/15282

> Reused Exchange Aggregations Produce Incorrect Results
> --
>
> Key: SPARK-17673
> URL: https://issues.apache.org/jira/browse/SPARK-17673
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0, 2.0.1
>Reporter: Russell Spitzer
>Assignee: Eric Liang
>Priority: Blocker
>  Labels: correctness
> Fix For: 2.1.0
>
>
> https://datastax-oss.atlassian.net/browse/SPARKC-429
> Was brought to my attention where the following code produces incorrect 
> results
> {code}
>  val data = List(TestData("A", 1, 7))
> val frame = 
> session.sqlContext.createDataFrame(session.sparkContext.parallelize(data))
> frame.createCassandraTable(
>   keySpaceName,
>   table,
>   partitionKeyColumns = Some(Seq("id")))
> frame
>   .write
>   .format("org.apache.spark.sql.cassandra")
>   .mode(SaveMode.Append)
>   .options(Map("table" -> table, "keyspace" -> keySpaceName))
>   .save()
> val loaded = sparkSession.sqlContext
>   .read
>   .format("org.apache.spark.sql.cassandra")
>   .options(Map("table" -> table, "keyspace" -> ks))
>   .load()
>   .select("id", "col1", "col2")
> val min1 = loaded.groupBy("id").agg(min("col1").as("min"))
> val min2 = loaded.groupBy("id").agg(min("col2").as("min"))
>  min1.union(min2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
>  Should be 
>   | A| 1|
>   | A| 7|
>  */
> {code}
> I looked into the explain pattern and saw 
> {code}
> Union
> :- *HashAggregate(keys=[id#93], functions=[min(col1#94)])
> :  +- Exchange hashpartitioning(id#93, 200)
> : +- *HashAggregate(keys=[id#93], functions=[partial_min(col1#94)])
> :+- *Scan 
> org.apache.spark.sql.cassandra.CassandraSourceRelation@7ec20844 
> [id#93,col1#94]
> +- *HashAggregate(keys=[id#93], functions=[min(col2#95)])
>+- ReusedExchange [id#93, min#153], Exchange hashpartitioning(id#93, 200)
> {code}
> Which was different than using a parallelized collection as the DF backing. 
> So I tested the same code with a Parquet backed DF and saw the same results.
> {code}
> frame.write.parquet("garbagetest")
> val parquet = sparkSession.read.parquet("garbagetest").select("id", 
> "col1", "col2")
> println("PDF")
> parquetmin1.union(parquetmin2).explain()
> parquetmin1.union(parquetmin2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
> */
> {code}
> Which leads me to believe there is something wrong with the reused exchange. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17673) Reused Exchange Aggregations Produce Incorrect Results

2016-09-28 Thread Reynold Xin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17673?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15530769#comment-15530769
 ] 

Reynold Xin commented on SPARK-17673:
-

I merged this in master. There was a conflict with branch-2.0.

[~ekhliang] can you submit a pr for 2.0?


> Reused Exchange Aggregations Produce Incorrect Results
> --
>
> Key: SPARK-17673
> URL: https://issues.apache.org/jira/browse/SPARK-17673
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0, 2.0.1
>Reporter: Russell Spitzer
>Assignee: Eric Liang
>Priority: Blocker
>  Labels: correctness
> Fix For: 2.1.0
>
>
> https://datastax-oss.atlassian.net/browse/SPARKC-429
> Was brought to my attention where the following code produces incorrect 
> results
> {code}
>  val data = List(TestData("A", 1, 7))
> val frame = 
> session.sqlContext.createDataFrame(session.sparkContext.parallelize(data))
> frame.createCassandraTable(
>   keySpaceName,
>   table,
>   partitionKeyColumns = Some(Seq("id")))
> frame
>   .write
>   .format("org.apache.spark.sql.cassandra")
>   .mode(SaveMode.Append)
>   .options(Map("table" -> table, "keyspace" -> keySpaceName))
>   .save()
> val loaded = sparkSession.sqlContext
>   .read
>   .format("org.apache.spark.sql.cassandra")
>   .options(Map("table" -> table, "keyspace" -> ks))
>   .load()
>   .select("id", "col1", "col2")
> val min1 = loaded.groupBy("id").agg(min("col1").as("min"))
> val min2 = loaded.groupBy("id").agg(min("col2").as("min"))
>  min1.union(min2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
>  Should be 
>   | A| 1|
>   | A| 7|
>  */
> {code}
> I looked into the explain pattern and saw 
> {code}
> Union
> :- *HashAggregate(keys=[id#93], functions=[min(col1#94)])
> :  +- Exchange hashpartitioning(id#93, 200)
> : +- *HashAggregate(keys=[id#93], functions=[partial_min(col1#94)])
> :+- *Scan 
> org.apache.spark.sql.cassandra.CassandraSourceRelation@7ec20844 
> [id#93,col1#94]
> +- *HashAggregate(keys=[id#93], functions=[min(col2#95)])
>+- ReusedExchange [id#93, min#153], Exchange hashpartitioning(id#93, 200)
> {code}
> Which was different than using a parallelized collection as the DF backing. 
> So I tested the same code with a Parquet backed DF and saw the same results.
> {code}
> frame.write.parquet("garbagetest")
> val parquet = sparkSession.read.parquet("garbagetest").select("id", 
> "col1", "col2")
> println("PDF")
> parquetmin1.union(parquetmin2).explain()
> parquetmin1.union(parquetmin2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
> */
> {code}
> Which leads me to believe there is something wrong with the reused exchange. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17673) Reused Exchange Aggregations Produce Incorrect Results

2016-09-27 Thread Eric Liang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17673?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15528145#comment-15528145
 ] 

Eric Liang commented on SPARK-17673:


Russell, could you try applying this patch (wip) to see if it resolves the 
issue? https://github.com/apache/spark/pull/15273/files

It fixes equality comparison for row datasource scans to take into account the 
output schema of the scan.

> Reused Exchange Aggregations Produce Incorrect Results
> --
>
> Key: SPARK-17673
> URL: https://issues.apache.org/jira/browse/SPARK-17673
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0, 2.0.1
>Reporter: Russell Spitzer
>Priority: Blocker
>  Labels: correctness
>
> https://datastax-oss.atlassian.net/browse/SPARKC-429
> Was brought to my attention where the following code produces incorrect 
> results
> {code}
>  val data = List(TestData("A", 1, 7))
> val frame = 
> session.sqlContext.createDataFrame(session.sparkContext.parallelize(data))
> frame.createCassandraTable(
>   keySpaceName,
>   table,
>   partitionKeyColumns = Some(Seq("id")))
> frame
>   .write
>   .format("org.apache.spark.sql.cassandra")
>   .mode(SaveMode.Append)
>   .options(Map("table" -> table, "keyspace" -> keySpaceName))
>   .save()
> val loaded = sparkSession.sqlContext
>   .read
>   .format("org.apache.spark.sql.cassandra")
>   .options(Map("table" -> table, "keyspace" -> ks))
>   .load()
>   .select("id", "col1", "col2")
> val min1 = loaded.groupBy("id").agg(min("col1").as("min"))
> val min2 = loaded.groupBy("id").agg(min("col2").as("min"))
>  min1.union(min2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
>  Should be 
>   | A| 1|
>   | A| 7|
>  */
> {code}
> I looked into the explain pattern and saw 
> {code}
> Union
> :- *HashAggregate(keys=[id#93], functions=[min(col1#94)])
> :  +- Exchange hashpartitioning(id#93, 200)
> : +- *HashAggregate(keys=[id#93], functions=[partial_min(col1#94)])
> :+- *Scan 
> org.apache.spark.sql.cassandra.CassandraSourceRelation@7ec20844 
> [id#93,col1#94]
> +- *HashAggregate(keys=[id#93], functions=[min(col2#95)])
>+- ReusedExchange [id#93, min#153], Exchange hashpartitioning(id#93, 200)
> {code}
> Which was different than using a parallelized collection as the DF backing. 
> So I tested the same code with a Parquet backed DF and saw the same results.
> {code}
> frame.write.parquet("garbagetest")
> val parquet = sparkSession.read.parquet("garbagetest").select("id", 
> "col1", "col2")
> println("PDF")
> parquetmin1.union(parquetmin2).explain()
> parquetmin1.union(parquetmin2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
> */
> {code}
> Which leads me to believe there is something wrong with the reused exchange. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17673) Reused Exchange Aggregations Produce Incorrect Results

2016-09-27 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17673?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15528142#comment-15528142
 ] 

Apache Spark commented on SPARK-17673:
--

User 'ericl' has created a pull request for this issue:
https://github.com/apache/spark/pull/15273

> Reused Exchange Aggregations Produce Incorrect Results
> --
>
> Key: SPARK-17673
> URL: https://issues.apache.org/jira/browse/SPARK-17673
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0, 2.0.1
>Reporter: Russell Spitzer
>Priority: Blocker
>  Labels: correctness
>
> https://datastax-oss.atlassian.net/browse/SPARKC-429
> Was brought to my attention where the following code produces incorrect 
> results
> {code}
>  val data = List(TestData("A", 1, 7))
> val frame = 
> session.sqlContext.createDataFrame(session.sparkContext.parallelize(data))
> frame.createCassandraTable(
>   keySpaceName,
>   table,
>   partitionKeyColumns = Some(Seq("id")))
> frame
>   .write
>   .format("org.apache.spark.sql.cassandra")
>   .mode(SaveMode.Append)
>   .options(Map("table" -> table, "keyspace" -> keySpaceName))
>   .save()
> val loaded = sparkSession.sqlContext
>   .read
>   .format("org.apache.spark.sql.cassandra")
>   .options(Map("table" -> table, "keyspace" -> ks))
>   .load()
>   .select("id", "col1", "col2")
> val min1 = loaded.groupBy("id").agg(min("col1").as("min"))
> val min2 = loaded.groupBy("id").agg(min("col2").as("min"))
>  min1.union(min2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
>  Should be 
>   | A| 1|
>   | A| 7|
>  */
> {code}
> I looked into the explain pattern and saw 
> {code}
> Union
> :- *HashAggregate(keys=[id#93], functions=[min(col1#94)])
> :  +- Exchange hashpartitioning(id#93, 200)
> : +- *HashAggregate(keys=[id#93], functions=[partial_min(col1#94)])
> :+- *Scan 
> org.apache.spark.sql.cassandra.CassandraSourceRelation@7ec20844 
> [id#93,col1#94]
> +- *HashAggregate(keys=[id#93], functions=[min(col2#95)])
>+- ReusedExchange [id#93, min#153], Exchange hashpartitioning(id#93, 200)
> {code}
> Which was different than using a parallelized collection as the DF backing. 
> So I tested the same code with a Parquet backed DF and saw the same results.
> {code}
> frame.write.parquet("garbagetest")
> val parquet = sparkSession.read.parquet("garbagetest").select("id", 
> "col1", "col2")
> println("PDF")
> parquetmin1.union(parquetmin2).explain()
> parquetmin1.union(parquetmin2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
> */
> {code}
> Which leads me to believe there is something wrong with the reused exchange. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17673) Reused Exchange Aggregations Produce Incorrect Results

2016-09-27 Thread Reynold Xin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17673?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15527327#comment-15527327
 ] 

Reynold Xin commented on SPARK-17673:
-

I'm upgrading this to a blocker level issue.


> Reused Exchange Aggregations Produce Incorrect Results
> --
>
> Key: SPARK-17673
> URL: https://issues.apache.org/jira/browse/SPARK-17673
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0, 2.0.1
>Reporter: Russell Spitzer
>Priority: Blocker
>  Labels: correctness
>
> https://datastax-oss.atlassian.net/browse/SPARKC-429
> Was brought to my attention where the following code produces incorrect 
> results
> {code}
>  val data = List(TestData("A", 1, 7))
> val frame = 
> session.sqlContext.createDataFrame(session.sparkContext.parallelize(data))
> frame.createCassandraTable(
>   keySpaceName,
>   table,
>   partitionKeyColumns = Some(Seq("id")))
> frame
>   .write
>   .format("org.apache.spark.sql.cassandra")
>   .mode(SaveMode.Append)
>   .options(Map("table" -> table, "keyspace" -> keySpaceName))
>   .save()
> val loaded = sparkSession.sqlContext
>   .read
>   .format("org.apache.spark.sql.cassandra")
>   .options(Map("table" -> table, "keyspace" -> ks))
>   .load()
>   .select("id", "col1", "col2")
> val min1 = loaded.groupBy("id").agg(min("col1").as("min"))
> val min2 = loaded.groupBy("id").agg(min("col2").as("min"))
>  min1.union(min2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
>  Should be 
>   | A| 1|
>   | A| 7|
>  */
> {code}
> I looked into the explain pattern and saw 
> {code}
> Union
> :- *HashAggregate(keys=[id#93], functions=[min(col1#94)])
> :  +- Exchange hashpartitioning(id#93, 200)
> : +- *HashAggregate(keys=[id#93], functions=[partial_min(col1#94)])
> :+- *Scan 
> org.apache.spark.sql.cassandra.CassandraSourceRelation@7ec20844 
> [id#93,col1#94]
> +- *HashAggregate(keys=[id#93], functions=[min(col2#95)])
>+- ReusedExchange [id#93, min#153], Exchange hashpartitioning(id#93, 200)
> {code}
> Which was different than using a parallelized collection as the DF backing. 
> So I tested the same code with a Parquet backed DF and saw the same results.
> {code}
> frame.write.parquet("garbagetest")
> val parquet = sparkSession.read.parquet("garbagetest").select("id", 
> "col1", "col2")
> println("PDF")
> parquetmin1.union(parquetmin2).explain()
> parquetmin1.union(parquetmin2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
> */
> {code}
> Which leads me to believe there is something wrong with the reused exchange. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17673) Reused Exchange Aggregations Produce Incorrect Results

2016-09-27 Thread Eric Liang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17673?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15527339#comment-15527339
 ] 

Eric Liang commented on SPARK-17673:


I'm looking at this now.

> Reused Exchange Aggregations Produce Incorrect Results
> --
>
> Key: SPARK-17673
> URL: https://issues.apache.org/jira/browse/SPARK-17673
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0, 2.0.1
>Reporter: Russell Spitzer
>Priority: Blocker
>  Labels: correctness
>
> https://datastax-oss.atlassian.net/browse/SPARKC-429
> Was brought to my attention where the following code produces incorrect 
> results
> {code}
>  val data = List(TestData("A", 1, 7))
> val frame = 
> session.sqlContext.createDataFrame(session.sparkContext.parallelize(data))
> frame.createCassandraTable(
>   keySpaceName,
>   table,
>   partitionKeyColumns = Some(Seq("id")))
> frame
>   .write
>   .format("org.apache.spark.sql.cassandra")
>   .mode(SaveMode.Append)
>   .options(Map("table" -> table, "keyspace" -> keySpaceName))
>   .save()
> val loaded = sparkSession.sqlContext
>   .read
>   .format("org.apache.spark.sql.cassandra")
>   .options(Map("table" -> table, "keyspace" -> ks))
>   .load()
>   .select("id", "col1", "col2")
> val min1 = loaded.groupBy("id").agg(min("col1").as("min"))
> val min2 = loaded.groupBy("id").agg(min("col2").as("min"))
>  min1.union(min2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
>  Should be 
>   | A| 1|
>   | A| 7|
>  */
> {code}
> I looked into the explain pattern and saw 
> {code}
> Union
> :- *HashAggregate(keys=[id#93], functions=[min(col1#94)])
> :  +- Exchange hashpartitioning(id#93, 200)
> : +- *HashAggregate(keys=[id#93], functions=[partial_min(col1#94)])
> :+- *Scan 
> org.apache.spark.sql.cassandra.CassandraSourceRelation@7ec20844 
> [id#93,col1#94]
> +- *HashAggregate(keys=[id#93], functions=[min(col2#95)])
>+- ReusedExchange [id#93, min#153], Exchange hashpartitioning(id#93, 200)
> {code}
> Which was different than using a parallelized collection as the DF backing. 
> So I tested the same code with a Parquet backed DF and saw the same results.
> {code}
> frame.write.parquet("garbagetest")
> val parquet = sparkSession.read.parquet("garbagetest").select("id", 
> "col1", "col2")
> println("PDF")
> parquetmin1.union(parquetmin2).explain()
> parquetmin1.union(parquetmin2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
> */
> {code}
> Which leads me to believe there is something wrong with the reused exchange. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17673) Reused Exchange Aggregations Produce Incorrect Results

2016-09-26 Thread Russell Spitzer (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17673?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15525101#comment-15525101
 ] 

Russell Spitzer commented on SPARK-17673:
-

{code}== Parsed Logical Plan ==
Union
:- Aggregate [id#101], [id#101, min(col1#102) AS min#128]
:  +- Project [id#101, col1#102, col2#103]
: +- Relation[id#101,col1#102,col2#103] 
org.apache.spark.sql.cassandra.CassandraSourceRelation@486a9118
+- Aggregate [id#101], [id#101, min(col2#103) AS min#137]
   +- Project [id#101, col1#102, col2#103]
  +- Relation[id#101,col1#102,col2#103] 
org.apache.spark.sql.cassandra.CassandraSourceRelation@486a9118

== Analyzed Logical Plan ==
id: string, min: int
Union
:- Aggregate [id#101], [id#101, min(col1#102) AS min#128]
:  +- Project [id#101, col1#102, col2#103]
: +- Relation[id#101,col1#102,col2#103] 
org.apache.spark.sql.cassandra.CassandraSourceRelation@486a9118
+- Aggregate [id#101], [id#101, min(col2#103) AS min#137]
   +- Project [id#101, col1#102, col2#103]
  +- Relation[id#101,col1#102,col2#103] 
org.apache.spark.sql.cassandra.CassandraSourceRelation@486a9118

== Optimized Logical Plan ==
Union
:- Aggregate [id#101], [id#101, min(col1#102) AS min#128]
:  +- Project [id#101, col1#102]
: +- Relation[id#101,col1#102,col2#103] 
org.apache.spark.sql.cassandra.CassandraSourceRelation@486a9118
+- Aggregate [id#101], [id#101, min(col2#103) AS min#137]
   +- Project [id#101, col2#103]
  +- Relation[id#101,col1#102,col2#103] 
org.apache.spark.sql.cassandra.CassandraSourceRelation@486a9118

== Physical Plan ==
Union
:- *HashAggregate(keys=[id#101], functions=[min(col1#102)], output=[id#101, 
min#128])
:  +- Exchange hashpartitioning(id#101, 200)
: +- *HashAggregate(keys=[id#101], functions=[partial_min(col1#102)], 
output=[id#101, min#182])
:+- *Scan 
org.apache.spark.sql.cassandra.CassandraSourceRelation@486a9118 
[id#101,col1#102]
+- *HashAggregate(keys=[id#101], functions=[min(col2#103)], output=[id#101, 
min#137])
   +- ReusedExchange [id#101, min#190], Exchange hashpartitioning(id#101, 200)
{code}

> Reused Exchange Aggregations Produce Incorrect Results
> --
>
> Key: SPARK-17673
> URL: https://issues.apache.org/jira/browse/SPARK-17673
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0, 2.0.1
>Reporter: Russell Spitzer
>Priority: Critical
>  Labels: correctness
>
> https://datastax-oss.atlassian.net/browse/SPARKC-429
> Was brought to my attention where the following code produces incorrect 
> results
> {code}
>  val data = List(TestData("A", 1, 7))
> val frame = 
> session.sqlContext.createDataFrame(session.sparkContext.parallelize(data))
> frame.createCassandraTable(
>   keySpaceName,
>   table,
>   partitionKeyColumns = Some(Seq("id")))
> frame
>   .write
>   .format("org.apache.spark.sql.cassandra")
>   .mode(SaveMode.Append)
>   .options(Map("table" -> table, "keyspace" -> keySpaceName))
>   .save()
> val loaded = sparkSession.sqlContext
>   .read
>   .format("org.apache.spark.sql.cassandra")
>   .options(Map("table" -> table, "keyspace" -> ks))
>   .load()
>   .select("id", "col1", "col2")
> val min1 = loaded.groupBy("id").agg(min("col1").as("min"))
> val min2 = loaded.groupBy("id").agg(min("col2").as("min"))
>  min1.union(min2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
>  Should be 
>   | A| 1|
>   | A| 7|
>  */
> {code}
> I looked into the explain pattern and saw 
> {code}
> Union
> :- *HashAggregate(keys=[id#93], functions=[min(col1#94)])
> :  +- Exchange hashpartitioning(id#93, 200)
> : +- *HashAggregate(keys=[id#93], functions=[partial_min(col1#94)])
> :+- *Scan 
> org.apache.spark.sql.cassandra.CassandraSourceRelation@7ec20844 
> [id#93,col1#94]
> +- *HashAggregate(keys=[id#93], functions=[min(col2#95)])
>+- ReusedExchange [id#93, min#153], Exchange hashpartitioning(id#93, 200)
> {code}
> Which was different than using a parallelized collection as the DF backing. 
> So I tested the same code with a Parquet backed DF and saw the same results.
> {code}
> frame.write.parquet("garbagetest")
> val parquet = sparkSession.read.parquet("garbagetest").select("id", 
> "col1", "col2")
> println("PDF")
> parquetmin1.union(parquetmin2).explain()
> parquetmin1.union(parquetmin2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
> */
> {code}
> Which leads me to believe there is something wrong with the reused exchange. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To 

[jira] [Commented] (SPARK-17673) Reused Exchange Aggregations Produce Incorrect Results

2016-09-26 Thread Russell Spitzer (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17673?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15525096#comment-15525096
 ] 

Russell Spitzer commented on SPARK-17673:
-

Ah yeah there would definitely be different pruning in both "source"s  Getting 
the optimized plan now

> Reused Exchange Aggregations Produce Incorrect Results
> --
>
> Key: SPARK-17673
> URL: https://issues.apache.org/jira/browse/SPARK-17673
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0, 2.0.1
>Reporter: Russell Spitzer
>Priority: Critical
>  Labels: correctness
>
> https://datastax-oss.atlassian.net/browse/SPARKC-429
> Was brought to my attention where the following code produces incorrect 
> results
> {code}
>  val data = List(TestData("A", 1, 7))
> val frame = 
> session.sqlContext.createDataFrame(session.sparkContext.parallelize(data))
> frame.createCassandraTable(
>   keySpaceName,
>   table,
>   partitionKeyColumns = Some(Seq("id")))
> frame
>   .write
>   .format("org.apache.spark.sql.cassandra")
>   .mode(SaveMode.Append)
>   .options(Map("table" -> table, "keyspace" -> keySpaceName))
>   .save()
> val loaded = sparkSession.sqlContext
>   .read
>   .format("org.apache.spark.sql.cassandra")
>   .options(Map("table" -> table, "keyspace" -> ks))
>   .load()
>   .select("id", "col1", "col2")
> val min1 = loaded.groupBy("id").agg(min("col1").as("min"))
> val min2 = loaded.groupBy("id").agg(min("col2").as("min"))
>  min1.union(min2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
>  Should be 
>   | A| 1|
>   | A| 7|
>  */
> {code}
> I looked into the explain pattern and saw 
> {code}
> Union
> :- *HashAggregate(keys=[id#93], functions=[min(col1#94)])
> :  +- Exchange hashpartitioning(id#93, 200)
> : +- *HashAggregate(keys=[id#93], functions=[partial_min(col1#94)])
> :+- *Scan 
> org.apache.spark.sql.cassandra.CassandraSourceRelation@7ec20844 
> [id#93,col1#94]
> +- *HashAggregate(keys=[id#93], functions=[min(col2#95)])
>+- ReusedExchange [id#93, min#153], Exchange hashpartitioning(id#93, 200)
> {code}
> Which was different than using a parallelized collection as the DF backing. 
> So I tested the same code with a Parquet backed DF and saw the same results.
> {code}
> frame.write.parquet("garbagetest")
> val parquet = sparkSession.read.parquet("garbagetest").select("id", 
> "col1", "col2")
> println("PDF")
> parquetmin1.union(parquetmin2).explain()
> parquetmin1.union(parquetmin2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
> */
> {code}
> Which leads me to believe there is something wrong with the reused exchange. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17673) Reused Exchange Aggregations Produce Incorrect Results

2016-09-26 Thread Herman van Hovell (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17673?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15525036#comment-15525036
 ] 

Herman van Hovell commented on SPARK-17673:
---

Could you also share the optimized plan {{df.explain(true)}? I am wondering if 
the attributes are the same.

> Reused Exchange Aggregations Produce Incorrect Results
> --
>
> Key: SPARK-17673
> URL: https://issues.apache.org/jira/browse/SPARK-17673
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0, 2.0.1
>Reporter: Russell Spitzer
>Priority: Critical
>  Labels: correctness
>
> https://datastax-oss.atlassian.net/browse/SPARKC-429
> Was brought to my attention where the following code produces incorrect 
> results
> {code}
>  val data = List(TestData("A", 1, 7))
> val frame = 
> session.sqlContext.createDataFrame(session.sparkContext.parallelize(data))
> frame.createCassandraTable(
>   keySpaceName,
>   table,
>   partitionKeyColumns = Some(Seq("id")))
> frame
>   .write
>   .format("org.apache.spark.sql.cassandra")
>   .mode(SaveMode.Append)
>   .options(Map("table" -> table, "keyspace" -> keySpaceName))
>   .save()
> val loaded = sparkSession.sqlContext
>   .read
>   .format("org.apache.spark.sql.cassandra")
>   .options(Map("table" -> table, "keyspace" -> ks))
>   .load()
>   .select("id", "col1", "col2")
> val min1 = loaded.groupBy("id").agg(min("col1").as("min"))
> val min2 = loaded.groupBy("id").agg(min("col2").as("min"))
>  min1.union(min2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
>  Should be 
>   | A| 1|
>   | A| 7|
>  */
> {code}
> I looked into the explain pattern and saw 
> {code}
> Union
> :- *HashAggregate(keys=[id#93], functions=[min(col1#94)])
> :  +- Exchange hashpartitioning(id#93, 200)
> : +- *HashAggregate(keys=[id#93], functions=[partial_min(col1#94)])
> :+- *Scan 
> org.apache.spark.sql.cassandra.CassandraSourceRelation@7ec20844 
> [id#93,col1#94]
> +- *HashAggregate(keys=[id#93], functions=[min(col2#95)])
>+- ReusedExchange [id#93, min#153], Exchange hashpartitioning(id#93, 200)
> {code}
> Which was different than using a parallelized collection as the DF backing. 
> So I tested the same code with a Parquet backed DF and saw the same results.
> {code}
> frame.write.parquet("garbagetest")
> val parquet = sparkSession.read.parquet("garbagetest").select("id", 
> "col1", "col2")
> println("PDF")
> parquetmin1.union(parquetmin2).explain()
> parquetmin1.union(parquetmin2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
> */
> {code}
> Which leads me to believe there is something wrong with the reused exchange. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17673) Reused Exchange Aggregations Produce Incorrect Results

2016-09-26 Thread Reynold Xin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17673?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15525011#comment-15525011
 ] 

Reynold Xin commented on SPARK-17673:
-

The only thing differentiating the two sides of the plan is column pruning 
right? It is possible the issue I mentioned earlier is the culprit.


> Reused Exchange Aggregations Produce Incorrect Results
> --
>
> Key: SPARK-17673
> URL: https://issues.apache.org/jira/browse/SPARK-17673
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0, 2.0.1
>Reporter: Russell Spitzer
>Priority: Critical
>  Labels: correctness
>
> https://datastax-oss.atlassian.net/browse/SPARKC-429
> Was brought to my attention where the following code produces incorrect 
> results
> {code}
>  val data = List(TestData("A", 1, 7))
> val frame = 
> session.sqlContext.createDataFrame(session.sparkContext.parallelize(data))
> frame.createCassandraTable(
>   keySpaceName,
>   table,
>   partitionKeyColumns = Some(Seq("id")))
> frame
>   .write
>   .format("org.apache.spark.sql.cassandra")
>   .mode(SaveMode.Append)
>   .options(Map("table" -> table, "keyspace" -> keySpaceName))
>   .save()
> val loaded = sparkSession.sqlContext
>   .read
>   .format("org.apache.spark.sql.cassandra")
>   .options(Map("table" -> table, "keyspace" -> ks))
>   .load()
>   .select("id", "col1", "col2")
> val min1 = loaded.groupBy("id").agg(min("col1").as("min"))
> val min2 = loaded.groupBy("id").agg(min("col2").as("min"))
>  min1.union(min2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
>  Should be 
>   | A| 1|
>   | A| 7|
>  */
> {code}
> I looked into the explain pattern and saw 
> {code}
> Union
> :- *HashAggregate(keys=[id#93], functions=[min(col1#94)])
> :  +- Exchange hashpartitioning(id#93, 200)
> : +- *HashAggregate(keys=[id#93], functions=[partial_min(col1#94)])
> :+- *Scan 
> org.apache.spark.sql.cassandra.CassandraSourceRelation@7ec20844 
> [id#93,col1#94]
> +- *HashAggregate(keys=[id#93], functions=[min(col2#95)])
>+- ReusedExchange [id#93, min#153], Exchange hashpartitioning(id#93, 200)
> {code}
> Which was different than using a parallelized collection as the DF backing. 
> So I tested the same code with a Parquet backed DF and saw the same results.
> {code}
> frame.write.parquet("garbagetest")
> val parquet = sparkSession.read.parquet("garbagetest").select("id", 
> "col1", "col2")
> println("PDF")
> parquetmin1.union(parquetmin2).explain()
> parquetmin1.union(parquetmin2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
> */
> {code}
> Which leads me to believe there is something wrong with the reused exchange. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17673) Reused Exchange Aggregations Produce Incorrect Results

2016-09-26 Thread Russell Spitzer (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17673?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15525007#comment-15525007
 ] 

Russell Spitzer commented on SPARK-17673:
-

Looking at this plan 
```
Union
:- *HashAggregate(keys=[id#93], functions=[min(col1#94)])
:  +- Exchange hashpartitioning(id#93, 200)
: +- *HashAggregate(keys=[id#93], functions=[partial_min(col1#94)])
:+- *Scan 
org.apache.spark.sql.cassandra.CassandraSourceRelation@7ec20844 [id#93,col1#94]
+- *HashAggregate(keys=[id#93], functions=[min(col2#95)])
   +- ReusedExchange [id#93, min#153], Exchange hashpartitioning(id#93, 200)```

I see it reuses the hash aggregate from the partial min on col1 even though the 
hash aggregate that runs it does min col2. Am I reading that right? The column 
names don't even match so I'm confused how that gets through?

> Reused Exchange Aggregations Produce Incorrect Results
> --
>
> Key: SPARK-17673
> URL: https://issues.apache.org/jira/browse/SPARK-17673
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0, 2.0.1
>Reporter: Russell Spitzer
>Priority: Critical
>  Labels: correctness
>
> https://datastax-oss.atlassian.net/browse/SPARKC-429
> Was brought to my attention where the following code produces incorrect 
> results
> {code}
>  val data = List(TestData("A", 1, 7))
> val frame = 
> session.sqlContext.createDataFrame(session.sparkContext.parallelize(data))
> frame.createCassandraTable(
>   keySpaceName,
>   table,
>   partitionKeyColumns = Some(Seq("id")))
> frame
>   .write
>   .format("org.apache.spark.sql.cassandra")
>   .mode(SaveMode.Append)
>   .options(Map("table" -> table, "keyspace" -> keySpaceName))
>   .save()
> val loaded = sparkSession.sqlContext
>   .read
>   .format("org.apache.spark.sql.cassandra")
>   .options(Map("table" -> table, "keyspace" -> ks))
>   .load()
>   .select("id", "col1", "col2")
> val min1 = loaded.groupBy("id").agg(min("col1").as("min"))
> val min2 = loaded.groupBy("id").agg(min("col2").as("min"))
>  min1.union(min2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
>  Should be 
>   | A| 1|
>   | A| 7|
>  */
> {code}
> I looked into the explain pattern and saw 
> {code}
> Union
> :- *HashAggregate(keys=[id#93], functions=[min(col1#94)])
> :  +- Exchange hashpartitioning(id#93, 200)
> : +- *HashAggregate(keys=[id#93], functions=[partial_min(col1#94)])
> :+- *Scan 
> org.apache.spark.sql.cassandra.CassandraSourceRelation@7ec20844 
> [id#93,col1#94]
> +- *HashAggregate(keys=[id#93], functions=[min(col2#95)])
>+- ReusedExchange [id#93, min#153], Exchange hashpartitioning(id#93, 200)
> {code}
> Which was different than using a parallelized collection as the DF backing. 
> So I tested the same code with a Parquet backed DF and saw the same results.
> {code}
> frame.write.parquet("garbagetest")
> val parquet = sparkSession.read.parquet("garbagetest").select("id", 
> "col1", "col2")
> println("PDF")
> parquetmin1.union(parquetmin2).explain()
> parquetmin1.union(parquetmin2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
> */
> {code}
> Which leads me to believe there is something wrong with the reused exchange. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17673) Reused Exchange Aggregations Produce Incorrect Results

2016-09-26 Thread Reynold Xin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17673?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15525000#comment-15525000
 ] 

Reynold Xin commented on SPARK-17673:
-

RowDataSourceScanExec.sameResult is probably the problem:

{code}
  // Ignore rdd when checking results
  override def sameResult(plan: SparkPlan): Boolean = plan match {
case other: RowDataSourceScanExec => relation == other.relation && metadata 
== other.metadata
case _ => false
  }
{code}

First glance -- it looks like it ignores predicate pushdown entirely.


> Reused Exchange Aggregations Produce Incorrect Results
> --
>
> Key: SPARK-17673
> URL: https://issues.apache.org/jira/browse/SPARK-17673
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0, 2.0.1
>Reporter: Russell Spitzer
>Priority: Critical
>  Labels: correctness
>
> https://datastax-oss.atlassian.net/browse/SPARKC-429
> Was brought to my attention where the following code produces incorrect 
> results
> {code}
>  val data = List(TestData("A", 1, 7))
> val frame = 
> session.sqlContext.createDataFrame(session.sparkContext.parallelize(data))
> frame.createCassandraTable(
>   keySpaceName,
>   table,
>   partitionKeyColumns = Some(Seq("id")))
> frame
>   .write
>   .format("org.apache.spark.sql.cassandra")
>   .mode(SaveMode.Append)
>   .options(Map("table" -> table, "keyspace" -> keySpaceName))
>   .save()
> val loaded = sparkSession.sqlContext
>   .read
>   .format("org.apache.spark.sql.cassandra")
>   .options(Map("table" -> table, "keyspace" -> ks))
>   .load()
>   .select("id", "col1", "col2")
> val min1 = loaded.groupBy("id").agg(min("col1").as("min"))
> val min2 = loaded.groupBy("id").agg(min("col2").as("min"))
>  min1.union(min2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
>  Should be 
>   | A| 1|
>   | A| 7|
>  */
> {code}
> I looked into the explain pattern and saw 
> {code}
> Union
> :- *HashAggregate(keys=[id#93], functions=[min(col1#94)])
> :  +- Exchange hashpartitioning(id#93, 200)
> : +- *HashAggregate(keys=[id#93], functions=[partial_min(col1#94)])
> :+- *Scan 
> org.apache.spark.sql.cassandra.CassandraSourceRelation@7ec20844 
> [id#93,col1#94]
> +- *HashAggregate(keys=[id#93], functions=[min(col2#95)])
>+- ReusedExchange [id#93, min#153], Exchange hashpartitioning(id#93, 200)
> {code}
> Which was different than using a parallelized collection as the DF backing. 
> So I tested the same code with a Parquet backed DF and saw the same results.
> {code}
> frame.write.parquet("garbagetest")
> val parquet = sparkSession.read.parquet("garbagetest").select("id", 
> "col1", "col2")
> println("PDF")
> parquetmin1.union(parquetmin2).explain()
> parquetmin1.union(parquetmin2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
> */
> {code}
> Which leads me to believe there is something wrong with the reused exchange. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17673) Reused Exchange Aggregations Produce Incorrect Results

2016-09-26 Thread Russell Spitzer (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17673?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15524999#comment-15524999
 ] 

Russell Spitzer commented on SPARK-17673:
-

We shouldn't be ... The only thing we cache are underlying database connections 
and queries which shouldn't factor into this I would think :/

> Reused Exchange Aggregations Produce Incorrect Results
> --
>
> Key: SPARK-17673
> URL: https://issues.apache.org/jira/browse/SPARK-17673
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0, 2.0.1
>Reporter: Russell Spitzer
>Priority: Critical
>  Labels: correctness
>
> https://datastax-oss.atlassian.net/browse/SPARKC-429
> Was brought to my attention where the following code produces incorrect 
> results
> {code}
>  val data = List(TestData("A", 1, 7))
> val frame = 
> session.sqlContext.createDataFrame(session.sparkContext.parallelize(data))
> frame.createCassandraTable(
>   keySpaceName,
>   table,
>   partitionKeyColumns = Some(Seq("id")))
> frame
>   .write
>   .format("org.apache.spark.sql.cassandra")
>   .mode(SaveMode.Append)
>   .options(Map("table" -> table, "keyspace" -> keySpaceName))
>   .save()
> val loaded = sparkSession.sqlContext
>   .read
>   .format("org.apache.spark.sql.cassandra")
>   .options(Map("table" -> table, "keyspace" -> ks))
>   .load()
>   .select("id", "col1", "col2")
> val min1 = loaded.groupBy("id").agg(min("col1").as("min"))
> val min2 = loaded.groupBy("id").agg(min("col2").as("min"))
>  min1.union(min2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
>  Should be 
>   | A| 1|
>   | A| 7|
>  */
> {code}
> I looked into the explain pattern and saw 
> {code}
> Union
> :- *HashAggregate(keys=[id#93], functions=[min(col1#94)])
> :  +- Exchange hashpartitioning(id#93, 200)
> : +- *HashAggregate(keys=[id#93], functions=[partial_min(col1#94)])
> :+- *Scan 
> org.apache.spark.sql.cassandra.CassandraSourceRelation@7ec20844 
> [id#93,col1#94]
> +- *HashAggregate(keys=[id#93], functions=[min(col2#95)])
>+- ReusedExchange [id#93, min#153], Exchange hashpartitioning(id#93, 200)
> {code}
> Which was different than using a parallelized collection as the DF backing. 
> So I tested the same code with a Parquet backed DF and saw the same results.
> {code}
> frame.write.parquet("garbagetest")
> val parquet = sparkSession.read.parquet("garbagetest").select("id", 
> "col1", "col2")
> println("PDF")
> parquetmin1.union(parquetmin2).explain()
> parquetmin1.union(parquetmin2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
> */
> {code}
> Which leads me to believe there is something wrong with the reused exchange. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17673) Reused Exchange Aggregations Produce Incorrect Results

2016-09-26 Thread Herman van Hovell (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17673?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15524993#comment-15524993
 ] 

Herman van Hovell commented on SPARK-17673:
---

[~rspitzer] we only reuse an exchange when they produce the same result. You 
can find the implementation for row datasource scans (what you are probably 
using) here 
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala#L124-L128

I have taken a look at the cassandra datasource code, but that does not seem to 
implement equals at all and should not cause a problem. Do you cache instances 
of CassandraRelation at some point?

> Reused Exchange Aggregations Produce Incorrect Results
> --
>
> Key: SPARK-17673
> URL: https://issues.apache.org/jira/browse/SPARK-17673
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0, 2.0.1
>Reporter: Russell Spitzer
>Priority: Critical
>  Labels: correctness
>
> https://datastax-oss.atlassian.net/browse/SPARKC-429
> Was brought to my attention where the following code produces incorrect 
> results
> {code}
>  val data = List(TestData("A", 1, 7))
> val frame = 
> session.sqlContext.createDataFrame(session.sparkContext.parallelize(data))
> frame.createCassandraTable(
>   keySpaceName,
>   table,
>   partitionKeyColumns = Some(Seq("id")))
> frame
>   .write
>   .format("org.apache.spark.sql.cassandra")
>   .mode(SaveMode.Append)
>   .options(Map("table" -> table, "keyspace" -> keySpaceName))
>   .save()
> val loaded = sparkSession.sqlContext
>   .read
>   .format("org.apache.spark.sql.cassandra")
>   .options(Map("table" -> table, "keyspace" -> ks))
>   .load()
>   .select("id", "col1", "col2")
> val min1 = loaded.groupBy("id").agg(min("col1").as("min"))
> val min2 = loaded.groupBy("id").agg(min("col2").as("min"))
>  min1.union(min2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
>  Should be 
>   | A| 1|
>   | A| 7|
>  */
> {code}
> I looked into the explain pattern and saw 
> {code}
> Union
> :- *HashAggregate(keys=[id#93], functions=[min(col1#94)])
> :  +- Exchange hashpartitioning(id#93, 200)
> : +- *HashAggregate(keys=[id#93], functions=[partial_min(col1#94)])
> :+- *Scan 
> org.apache.spark.sql.cassandra.CassandraSourceRelation@7ec20844 
> [id#93,col1#94]
> +- *HashAggregate(keys=[id#93], functions=[min(col2#95)])
>+- ReusedExchange [id#93, min#153], Exchange hashpartitioning(id#93, 200)
> {code}
> Which was different than using a parallelized collection as the DF backing. 
> So I tested the same code with a Parquet backed DF and saw the same results.
> {code}
> frame.write.parquet("garbagetest")
> val parquet = sparkSession.read.parquet("garbagetest").select("id", 
> "col1", "col2")
> println("PDF")
> parquetmin1.union(parquetmin2).explain()
> parquetmin1.union(parquetmin2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
> */
> {code}
> Which leads me to believe there is something wrong with the reused exchange. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17673) Reused Exchange Aggregations Produce Incorrect Results

2016-09-26 Thread Russell Spitzer (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17673?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15524848#comment-15524848
 ] 

Russell Spitzer commented on SPARK-17673:
-

I couldn't get this to happen without C*, hopefully tomorrow I can get some 
guidance tomorrow :/ It could be a hashcode / equals thing but we don't 
override those in the base class. Also i'm a little confused because this 
should be the same "grouping" operation on the RDD just with a different 
aggregate. I don't know enough about the ReusedExchange to know when it's 
applied and why.

> Reused Exchange Aggregations Produce Incorrect Results
> --
>
> Key: SPARK-17673
> URL: https://issues.apache.org/jira/browse/SPARK-17673
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0, 2.0.1
>Reporter: Russell Spitzer
>Priority: Critical
>  Labels: correctness
>
> https://datastax-oss.atlassian.net/browse/SPARKC-429
> Was brought to my attention where the following code produces incorrect 
> results
> {code}
>  val data = List(TestData("A", 1, 7))
> val frame = 
> session.sqlContext.createDataFrame(session.sparkContext.parallelize(data))
> frame.createCassandraTable(
>   keySpaceName,
>   table,
>   partitionKeyColumns = Some(Seq("id")))
> frame
>   .write
>   .format("org.apache.spark.sql.cassandra")
>   .mode(SaveMode.Append)
>   .options(Map("table" -> table, "keyspace" -> keySpaceName))
>   .save()
> val loaded = sparkSession.sqlContext
>   .read
>   .format("org.apache.spark.sql.cassandra")
>   .options(Map("table" -> table, "keyspace" -> ks))
>   .load()
>   .select("id", "col1", "col2")
> val min1 = loaded.groupBy("id").agg(min("col1").as("min"))
> val min2 = loaded.groupBy("id").agg(min("col2").as("min"))
>  min1.union(min2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
>  Should be 
>   | A| 1|
>   | A| 7|
>  */
> {code}
> I looked into the explain pattern and saw 
> {code}
> Union
> :- *HashAggregate(keys=[id#93], functions=[min(col1#94)])
> :  +- Exchange hashpartitioning(id#93, 200)
> : +- *HashAggregate(keys=[id#93], functions=[partial_min(col1#94)])
> :+- *Scan 
> org.apache.spark.sql.cassandra.CassandraSourceRelation@7ec20844 
> [id#93,col1#94]
> +- *HashAggregate(keys=[id#93], functions=[min(col2#95)])
>+- ReusedExchange [id#93, min#153], Exchange hashpartitioning(id#93, 200)
> {code}
> Which was different than using a parallelized collection as the DF backing. 
> So I tested the same code with a Parquet backed DF and saw the same results.
> {code}
> frame.write.parquet("garbagetest")
> val parquet = sparkSession.read.parquet("garbagetest").select("id", 
> "col1", "col2")
> println("PDF")
> parquetmin1.union(parquetmin2).explain()
> parquetmin1.union(parquetmin2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
> */
> {code}
> Which leads me to believe there is something wrong with the reused exchange. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17673) Reused Exchange Aggregations Produce Incorrect Results

2016-09-26 Thread Russell Spitzer (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17673?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15524734#comment-15524734
 ] 

Russell Spitzer commented on SPARK-17673:
-

Well in this case they are equal correct?

> Reused Exchange Aggregations Produce Incorrect Results
> --
>
> Key: SPARK-17673
> URL: https://issues.apache.org/jira/browse/SPARK-17673
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0, 2.0.1
>Reporter: Russell Spitzer
>Priority: Critical
>  Labels: correctness
>
> https://datastax-oss.atlassian.net/browse/SPARKC-429
> Was brought to my attention where the following code produces incorrect 
> results
> {code}
>  val data = List(TestData("A", 1, 7))
> val frame = 
> session.sqlContext.createDataFrame(session.sparkContext.parallelize(data))
> frame.createCassandraTable(
>   keySpaceName,
>   table,
>   partitionKeyColumns = Some(Seq("id")))
> frame
>   .write
>   .format("org.apache.spark.sql.cassandra")
>   .mode(SaveMode.Append)
>   .options(Map("table" -> table, "keyspace" -> keySpaceName))
>   .save()
> val loaded = sparkSession.sqlContext
>   .read
>   .format("org.apache.spark.sql.cassandra")
>   .options(Map("table" -> table, "keyspace" -> ks))
>   .load()
>   .select("id", "col1", "col2")
> val min1 = loaded.groupBy("id").agg(min("col1").as("min"))
> val min2 = loaded.groupBy("id").agg(min("col2").as("min"))
>  min1.union(min2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
>  Should be 
>   | A| 1|
>   | A| 7|
>  */
> {code}
> I looked into the explain pattern and saw 
> {code}
> Union
> :- *HashAggregate(keys=[id#93], functions=[min(col1#94)])
> :  +- Exchange hashpartitioning(id#93, 200)
> : +- *HashAggregate(keys=[id#93], functions=[partial_min(col1#94)])
> :+- *Scan 
> org.apache.spark.sql.cassandra.CassandraSourceRelation@7ec20844 
> [id#93,col1#94]
> +- *HashAggregate(keys=[id#93], functions=[min(col2#95)])
>+- ReusedExchange [id#93, min#153], Exchange hashpartitioning(id#93, 200)
> {code}
> Which was different than using a parallelized collection as the DF backing. 
> So I tested the same code with a Parquet backed DF and saw the same results.
> {code}
> frame.write.parquet("garbagetest")
> val parquet = sparkSession.read.parquet("garbagetest").select("id", 
> "col1", "col2")
> println("PDF")
> parquetmin1.union(parquetmin2).explain()
> parquetmin1.union(parquetmin2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
> */
> {code}
> Which leads me to believe there is something wrong with the reused exchange. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17673) Reused Exchange Aggregations Produce Incorrect Results

2016-09-26 Thread Reynold Xin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17673?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15524725#comment-15524725
 ] 

Reynold Xin commented on SPARK-17673:
-

It's possible if hashCode and equals are not defined properly in the Cassandra 
data source.


> Reused Exchange Aggregations Produce Incorrect Results
> --
>
> Key: SPARK-17673
> URL: https://issues.apache.org/jira/browse/SPARK-17673
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0, 2.0.1
>Reporter: Russell Spitzer
>Priority: Critical
>  Labels: correctness
>
> https://datastax-oss.atlassian.net/browse/SPARKC-429
> Was brought to my attention where the following code produces incorrect 
> results
> {code}
>  val data = List(TestData("A", 1, 7))
> val frame = 
> session.sqlContext.createDataFrame(session.sparkContext.parallelize(data))
> frame.createCassandraTable(
>   keySpaceName,
>   table,
>   partitionKeyColumns = Some(Seq("id")))
> frame
>   .write
>   .format("org.apache.spark.sql.cassandra")
>   .mode(SaveMode.Append)
>   .options(Map("table" -> table, "keyspace" -> keySpaceName))
>   .save()
> val loaded = sparkSession.sqlContext
>   .read
>   .format("org.apache.spark.sql.cassandra")
>   .options(Map("table" -> table, "keyspace" -> ks))
>   .load()
>   .select("id", "col1", "col2")
> val min1 = loaded.groupBy("id").agg(min("col1").as("min"))
> val min2 = loaded.groupBy("id").agg(min("col2").as("min"))
>  min1.union(min2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
>  Should be 
>   | A| 1|
>   | A| 7|
>  */
> {code}
> I looked into the explain pattern and saw 
> {code}
> Union
> :- *HashAggregate(keys=[id#93], functions=[min(col1#94)])
> :  +- Exchange hashpartitioning(id#93, 200)
> : +- *HashAggregate(keys=[id#93], functions=[partial_min(col1#94)])
> :+- *Scan 
> org.apache.spark.sql.cassandra.CassandraSourceRelation@7ec20844 
> [id#93,col1#94]
> +- *HashAggregate(keys=[id#93], functions=[min(col2#95)])
>+- ReusedExchange [id#93, min#153], Exchange hashpartitioning(id#93, 200)
> {code}
> Which was different than using a parallelized collection as the DF backing. 
> So I tested the same code with a Parquet backed DF and saw the same results.
> {code}
> frame.write.parquet("garbagetest")
> val parquet = sparkSession.read.parquet("garbagetest").select("id", 
> "col1", "col2")
> println("PDF")
> parquetmin1.union(parquetmin2).explain()
> parquetmin1.union(parquetmin2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
> */
> {code}
> Which leads me to believe there is something wrong with the reused exchange. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17673) Reused Exchange Aggregations Produce Incorrect Results

2016-09-26 Thread Russell Spitzer (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17673?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15524722#comment-15524722
 ] 

Russell Spitzer commented on SPARK-17673:
-

Ugh I made a typo in my Parquet Example I don't see it repoing there now. Let 
me run investigate a little more as to why this would affect the C* Source...

> Reused Exchange Aggregations Produce Incorrect Results
> --
>
> Key: SPARK-17673
> URL: https://issues.apache.org/jira/browse/SPARK-17673
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0, 2.0.1
>Reporter: Russell Spitzer
>Priority: Critical
>  Labels: correctness
>
> https://datastax-oss.atlassian.net/browse/SPARKC-429
> Was brought to my attention where the following code produces incorrect 
> results
> {code}
>  val data = List(TestData("A", 1, 7))
> val frame = 
> session.sqlContext.createDataFrame(session.sparkContext.parallelize(data))
> frame.createCassandraTable(
>   keySpaceName,
>   table,
>   partitionKeyColumns = Some(Seq("id")))
> frame
>   .write
>   .format("org.apache.spark.sql.cassandra")
>   .mode(SaveMode.Append)
>   .options(Map("table" -> table, "keyspace" -> keySpaceName))
>   .save()
> val loaded = sparkSession.sqlContext
>   .read
>   .format("org.apache.spark.sql.cassandra")
>   .options(Map("table" -> table, "keyspace" -> ks))
>   .load()
>   .select("id", "col1", "col2")
> val min1 = loaded.groupBy("id").agg(min("col1").as("min"))
> val min2 = loaded.groupBy("id").agg(min("col2").as("min"))
>  min1.union(min2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
>  Should be 
>   | A| 1|
>   | A| 7|
>  */
> {code}
> I looked into the explain pattern and saw 
> {code}
> Union
> :- *HashAggregate(keys=[id#93], functions=[min(col1#94)])
> :  +- Exchange hashpartitioning(id#93, 200)
> : +- *HashAggregate(keys=[id#93], functions=[partial_min(col1#94)])
> :+- *Scan 
> org.apache.spark.sql.cassandra.CassandraSourceRelation@7ec20844 
> [id#93,col1#94]
> +- *HashAggregate(keys=[id#93], functions=[min(col2#95)])
>+- ReusedExchange [id#93, min#153], Exchange hashpartitioning(id#93, 200)
> {code}
> Which was different than using a parallelized collection as the DF backing. 
> So I tested the same code with a Parquet backed DF and saw the same results.
> {code}
> frame.write.parquet("garbagetest")
> val parquet = sparkSession.read.parquet("garbagetest").select("id", 
> "col1", "col2")
> println("PDF")
> parquetmin1.union(parquetmin2).explain()
> parquetmin1.union(parquetmin2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
> */
> {code}
> Which leads me to believe there is something wrong with the reused exchange. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17673) Reused Exchange Aggregations Produce Incorrect Results

2016-09-26 Thread Reynold Xin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17673?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15524687#comment-15524687
 ] 

Reynold Xin commented on SPARK-17673:
-

Can you help create a repro (without the need to connect Cassandra)?

> Reused Exchange Aggregations Produce Incorrect Results
> --
>
> Key: SPARK-17673
> URL: https://issues.apache.org/jira/browse/SPARK-17673
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0, 2.0.1
>Reporter: Russell Spitzer
>  Labels: correctness
>
> https://datastax-oss.atlassian.net/browse/SPARKC-429
> Was brought to my attention where the following code produces incorrect 
> results
> {code}
>  val data = List(TestData("A", 1, 7))
> val frame = 
> session.sqlContext.createDataFrame(session.sparkContext.parallelize(data))
> frame.createCassandraTable(
>   keySpaceName,
>   table,
>   partitionKeyColumns = Some(Seq("id")))
> frame
>   .write
>   .format("org.apache.spark.sql.cassandra")
>   .mode(SaveMode.Append)
>   .options(Map("table" -> table, "keyspace" -> keySpaceName))
>   .save()
> val loaded = sparkSession.sqlContext
>   .read
>   .format("org.apache.spark.sql.cassandra")
>   .options(Map("table" -> table, "keyspace" -> ks))
>   .load()
>   .select("id", "col1", "col2")
> val min1 = loaded.groupBy("id").agg(min("col1").as("min"))
> val min2 = loaded.groupBy("id").agg(min("col2").as("min"))
>  min1.union(min2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
>  Should be 
>   | A| 1|
>   | A| 7|
>  */
> {code}
> I looked into the explain pattern and saw 
> {code}
> Union
> :- *HashAggregate(keys=[id#93], functions=[min(col1#94)])
> :  +- Exchange hashpartitioning(id#93, 200)
> : +- *HashAggregate(keys=[id#93], functions=[partial_min(col1#94)])
> :+- *Scan 
> org.apache.spark.sql.cassandra.CassandraSourceRelation@7ec20844 
> [id#93,col1#94]
> +- *HashAggregate(keys=[id#93], functions=[min(col2#95)])
>+- ReusedExchange [id#93, min#153], Exchange hashpartitioning(id#93, 200)
> {code}
> Which was different than using a parallelized collection as the DF backing. 
> So I tested the same code with a Parquet backed DF and saw the same results.
> {code}
> frame.write.parquet("garbagetest")
> val parquet = sparkSession.read.parquet("garbagetest").select("id", 
> "col1", "col2")
> println("PDF")
> parquetmin1.union(parquetmin2).explain()
> parquetmin1.union(parquetmin2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
> */
> {code}
> Which leads me to believe there is something wrong with the reused exchange. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17673) Reused Exchange Aggregations Produce Incorrect Results

2016-09-26 Thread Russell Spitzer (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17673?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15524641#comment-15524641
 ] 

Russell Spitzer commented on SPARK-17673:
-

I only ran this on 2.0.0 and 2.0.1

> Reused Exchange Aggregations Produce Incorrect Results
> --
>
> Key: SPARK-17673
> URL: https://issues.apache.org/jira/browse/SPARK-17673
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0, 2.0.1
>Reporter: Russell Spitzer
>  Labels: correctness
>
> https://datastax-oss.atlassian.net/browse/SPARKC-429
> Was brought to my attention where the following code produces incorrect 
> results
> {code}
>  val data = List(TestData("A", 1, 7))
> val frame = 
> session.sqlContext.createDataFrame(session.sparkContext.parallelize(data))
> frame.createCassandraTable(
>   keySpaceName,
>   table,
>   partitionKeyColumns = Some(Seq("id")))
> frame
>   .write
>   .format("org.apache.spark.sql.cassandra")
>   .mode(SaveMode.Append)
>   .options(Map("table" -> table, "keyspace" -> keySpaceName))
>   .save()
> val loaded = sparkSession.sqlContext
>   .read
>   .format("org.apache.spark.sql.cassandra")
>   .options(Map("table" -> table, "keyspace" -> ks))
>   .load()
>   .select("id", "col1", "col2")
> val min1 = loaded.groupBy("id").agg(min("col1").as("min"))
> val min2 = loaded.groupBy("id").agg(min("col2").as("min"))
>  min1.union(min2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
>  Should be 
>   | A| 1|
>   | A| 7|
>  */
> {code}
> I looked into the explain pattern and saw 
> {code}
> Union
> :- *HashAggregate(keys=[id#93], functions=[min(col1#94)])
> :  +- Exchange hashpartitioning(id#93, 200)
> : +- *HashAggregate(keys=[id#93], functions=[partial_min(col1#94)])
> :+- *Scan 
> org.apache.spark.sql.cassandra.CassandraSourceRelation@7ec20844 
> [id#93,col1#94]
> +- *HashAggregate(keys=[id#93], functions=[min(col2#95)])
>+- ReusedExchange [id#93, min#153], Exchange hashpartitioning(id#93, 200)
> {code}
> Which was different than using a parallelized collection as the DF backing. 
> So I tested the same code with a Parquet backed DF and saw the same results.
> {code}
> frame.write.parquet("garbagetest")
> val parquet = sparkSession.read.parquet("garbagetest").select("id", 
> "col1", "col2")
> println("PDF")
> parquetmin1.union(parquetmin2).explain()
> parquetmin1.union(parquetmin2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
> */
> {code}
> Which leads me to believe there is something wrong with the reused exchange. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17673) Reused Exchange Aggregations Produce Incorrect Results

2016-09-26 Thread Herman van Hovell (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17673?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15524581#comment-15524581
 ] 

Herman van Hovell commented on SPARK-17673:
---

[~russell spitzer] Are you using Spark 2.0 or the latest master?

> Reused Exchange Aggregations Produce Incorrect Results
> --
>
> Key: SPARK-17673
> URL: https://issues.apache.org/jira/browse/SPARK-17673
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0, 2.0.1
>Reporter: Russell Spitzer
>  Labels: correctness
>
> https://datastax-oss.atlassian.net/browse/SPARKC-429
> Was brought to my attention where the following code produces incorrect 
> results
> {code}
>  val data = List(TestData("A", 1, 7))
> val frame = 
> session.sqlContext.createDataFrame(session.sparkContext.parallelize(data))
> frame.createCassandraTable(
>   keySpaceName,
>   table,
>   partitionKeyColumns = Some(Seq("id")))
> frame
>   .write
>   .format("org.apache.spark.sql.cassandra")
>   .mode(SaveMode.Append)
>   .options(Map("table" -> table, "keyspace" -> keySpaceName))
>   .save()
> val loaded = sparkSession.sqlContext
>   .read
>   .format("org.apache.spark.sql.cassandra")
>   .options(Map("table" -> table, "keyspace" -> ks))
>   .load()
>   .select("id", "col1", "col2")
> val min1 = loaded.groupBy("id").agg(min("col1").as("min"))
> val min2 = loaded.groupBy("id").agg(min("col2").as("min"))
>  min1.union(min2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
>  Should be 
>   | A| 1|
>   | A| 7|
>  */
> {code}
> I looked into the explain pattern and saw 
> {code}
> Union
> :- *HashAggregate(keys=[id#93], functions=[min(col1#94)])
> :  +- Exchange hashpartitioning(id#93, 200)
> : +- *HashAggregate(keys=[id#93], functions=[partial_min(col1#94)])
> :+- *Scan 
> org.apache.spark.sql.cassandra.CassandraSourceRelation@7ec20844 
> [id#93,col1#94]
> +- *HashAggregate(keys=[id#93], functions=[min(col2#95)])
>+- ReusedExchange [id#93, min#153], Exchange hashpartitioning(id#93, 200)
> {code}
> Which was different than using a parallelized collection as the DF backing. 
> So I tested the same code with a Parquet backed DF and saw the same results.
> {code}
> frame.write.parquet("garbagetest")
> val parquet = sparkSession.read.parquet("garbagetest").select("id", 
> "col1", "col2")
> println("PDF")
> parquetmin1.union(parquetmin2).explain()
> parquetmin1.union(parquetmin2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
> */
> {code}
> Which leads me to believe there is something wrong with the reused exchange. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org