Russell Spitzer created SPARK-17673: ---------------------------------------
Summary: Reused Exchange Aggregations Produce Incorrect Results Key: SPARK-17673 URL: https://issues.apache.org/jira/browse/SPARK-17673 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.0.0, 2.0.1 Reporter: Russell Spitzer https://datastax-oss.atlassian.net/browse/SPARKC-429 Was brought to my attention where the following code produces incorrect results {code} val data = List(TestData("A", 1, 7)) val frame = session.sqlContext.createDataFrame(session.sparkContext.parallelize(data)) frame.createCassandraTable( keySpaceName, table, partitionKeyColumns = Some(Seq("id"))) frame .write .format("org.apache.spark.sql.cassandra") .mode(SaveMode.Append) .options(Map("table" -> table, "keyspace" -> keySpaceName)) .save() val loaded = sparkSession.sqlContext .read .format("org.apache.spark.sql.cassandra") .options(Map("table" -> table, "keyspace" -> ks)) .load() .select("id", "col1", "col2") val min1 = loaded.groupBy("id").agg(min("col1").as("min")) val min2 = loaded.groupBy("id").agg(min("col2").as("min")) min1.union(min2).show() /* prints: +---+---+ | id|min| +---+---+ | A| 1| | A| 1| +---+---+ Should be | A| 1| | A| 7| */ {code} I looked into the explain pattern and saw {code} Union :- *HashAggregate(keys=[id#93], functions=[min(col1#94)]) : +- Exchange hashpartitioning(id#93, 200) : +- *HashAggregate(keys=[id#93], functions=[partial_min(col1#94)]) : +- *Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@7ec20844 [id#93,col1#94] +- *HashAggregate(keys=[id#93], functions=[min(col2#95)]) +- ReusedExchange [id#93, min#153], Exchange hashpartitioning(id#93, 200) {code} Which was different than using a parallelized collection as the DF backing. So I tested the same code with a Parquet backed DF and saw the same results. {code} frame.write.parquet("garbagetest") val parquet = sparkSession.read.parquet("garbagetest").select("id", "col1", "col2") println("PDF") parquetmin1.union(parquetmin2).explain() parquetmin1.union(parquetmin2).show() /* prints: +---+---+ | id|min| +---+---+ | A| 1| | A| 1| +---+---+ */ {code} Which leads me to believe there is something wrong with the reused exchange. -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org