spark git commit: [SPARK-17673][SQL] Incorrect exchange reuse with RowDataSourceScan (backport)

2016-09-28 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 4c694e452 -> d358298f1


[SPARK-17673][SQL] Incorrect exchange reuse with RowDataSourceScan (backport)

This backports https://github.com/apache/spark/pull/15273 to branch-2.0

Also verified the test passes after the patch was applied. rxin

Author: Eric Liang 

Closes #15282 from ericl/spark-17673-2.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d358298f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d358298f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d358298f

Branch: refs/heads/branch-2.0
Commit: d358298f1082edd31489a1b08f428c8e60278d69
Parents: 4c694e4
Author: Eric Liang 
Authored: Wed Sep 28 16:19:06 2016 -0700
Committer: Reynold Xin 
Committed: Wed Sep 28 16:19:06 2016 -0700

--
 .../spark/sql/execution/datasources/DataSourceStrategy.scala | 5 -
 .../src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala | 8 
 2 files changed, 12 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/d358298f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 6b4b3b8..2779694 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -347,13 +347,16 @@ object DataSourceStrategy extends Strategy with Logging {
 // `Filter`s or cannot be handled by `relation`.
 val filterCondition = unhandledPredicates.reduceLeftOption(expressions.And)
 
+// These metadata values make scan plans uniquely identifiable for 
equality checking.
+// TODO(SPARK-17701) using strings for equality checking is brittle
 val metadata: Map[String, String] = {
   val pairs = ArrayBuffer.empty[(String, String)]
 
   if (pushedFilters.nonEmpty) {
 pairs += (PUSHED_FILTERS -> pushedFilters.mkString("[", ", ", "]"))
   }
-
+  pairs += ("ReadSchema" ->
+StructType.fromAttributes(projects.map(_.toAttribute)).catalogString)
   pairs.toMap
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d358298f/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
--
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index ec419e4..1a6dba8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -770,4 +770,12 @@ class JDBCSuite extends SparkFunSuite
 val schema = JdbcUtils.schemaString(df, "jdbc:mysql://localhost:3306/temp")
 assert(schema.contains("`order` TEXT"))
   }
+
+  test("SPARK-17673: Exchange reuse respects differences in output schema") {
+val df = sql("SELECT * FROM inttypes WHERE a IS NOT NULL")
+val df1 = df.groupBy("a").agg("c" -> "min")
+val df2 = df.groupBy("a").agg("d" -> "min")
+val res = df1.union(df2)
+assert(res.distinct().count() == 2)  // would be 1 if the exchange was 
incorrectly reused
+  }
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-17673][SQL] Incorrect exchange reuse with RowDataSourceScan

2016-09-28 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 46d1203bf -> a6cfa3f38


[SPARK-17673][SQL] Incorrect exchange reuse with RowDataSourceScan

## What changes were proposed in this pull request?

It seems the equality check for reuse of `RowDataSourceScanExec` nodes doesn't 
respect the output schema. This can cause self-joins or unions over the same 
underlying data source to return incorrect results if they select different 
fields.

## How was this patch tested?

New unit test passes after the fix.

Author: Eric Liang 

Closes #15273 from ericl/spark-17673.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a6cfa3f3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a6cfa3f3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a6cfa3f3

Branch: refs/heads/master
Commit: a6cfa3f38bcf6ba154d5ed2a53748fbc90c8872a
Parents: 46d1203
Author: Eric Liang 
Authored: Wed Sep 28 13:22:45 2016 -0700
Committer: Reynold Xin 
Committed: Wed Sep 28 13:22:45 2016 -0700

--
 .../spark/sql/execution/datasources/DataSourceStrategy.scala | 4 
 .../src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala | 8 
 2 files changed, 12 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/a6cfa3f3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 63f01c5..693b4c4 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -340,6 +340,8 @@ object DataSourceStrategy extends Strategy with Logging {
 // `Filter`s or cannot be handled by `relation`.
 val filterCondition = unhandledPredicates.reduceLeftOption(expressions.And)
 
+// These metadata values make scan plans uniquely identifiable for 
equality checking.
+// TODO(SPARK-17701) using strings for equality checking is brittle
 val metadata: Map[String, String] = {
   val pairs = ArrayBuffer.empty[(String, String)]
 
@@ -350,6 +352,8 @@ object DataSourceStrategy extends Strategy with Logging {
 }
 pairs += ("PushedFilters" -> markedFilters.mkString("[", ", ", "]"))
   }
+  pairs += ("ReadSchema" ->
+StructType.fromAttributes(projects.map(_.toAttribute)).catalogString)
   pairs.toMap
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/a6cfa3f3/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
--
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index 10f15ca..c94cb3b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -791,4 +791,12 @@ class JDBCSuite extends SparkFunSuite
 val schema = JdbcUtils.schemaString(df, "jdbc:mysql://localhost:3306/temp")
 assert(schema.contains("`order` TEXT"))
   }
+
+  test("SPARK-17673: Exchange reuse respects differences in output schema") {
+val df = sql("SELECT * FROM inttypes WHERE a IS NOT NULL")
+val df1 = df.groupBy("a").agg("c" -> "min")
+val df2 = df.groupBy("a").agg("d" -> "min")
+val res = df1.union(df2)
+assert(res.distinct().count() == 2)  // would be 1 if the exchange was 
incorrectly reused
+  }
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org