spark git commit: [SPARK-20718][SQL] FileSourceScanExec with different filter orders should be the same after canonicalization

2017-05-11 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 2cac317a8 -> a8d981dc5


[SPARK-20718][SQL] FileSourceScanExec with different filter orders should be 
the same after canonicalization

## What changes were proposed in this pull request?

Since `constraints` in `QueryPlan` is a set, the order of filters can differ. 
Usually this is ok because of canonicalization. However, in 
`FileSourceScanExec`, its data filters and partition filters are sequences, and 
their orders are not canonicalized. So `def sameResult` returns different 
results for different orders of data/partition filters. This leads to, e.g. 
different decision for `ReuseExchange`, and thus results in unstable 
performance.

## How was this patch tested?

Added a new test for `FileSourceScanExec.sameResult`.

Author: wangzhenhua 

Closes #17959 from wzhfy/canonicalizeFileSourceScanExec.

(cherry picked from commit c8da5356000c8e4ff9141e4a2892ebe0b9641d63)
Signed-off-by: Wenchen Fan 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a8d981dc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a8d981dc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a8d981dc

Branch: refs/heads/branch-2.2
Commit: a8d981dc5d11d65a4bd3a68aa57455b34a2649f9
Parents: 2cac317
Author: wangzhenhua 
Authored: Fri May 12 13:42:48 2017 +0800
Committer: Wenchen Fan 
Committed: Fri May 12 13:43:04 2017 +0800

--
 .../sql/execution/DataSourceScanExec.scala  | 16 +--
 .../spark/sql/execution/SameResultSuite.scala   | 49 
 2 files changed, 62 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/a8d981dc/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index 866fa98..251098c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -38,7 +38,7 @@ import org.apache.spark.sql.sources.BaseRelation
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.Utils
 
-trait DataSourceScanExec extends LeafExecNode with CodegenSupport {
+trait DataSourceScanExec extends LeafExecNode with CodegenSupport with 
PredicateHelper {
   val relation: BaseRelation
   val metastoreTableIdentifier: Option[TableIdentifier]
 
@@ -519,8 +519,18 @@ case class FileSourceScanExec(
   relation,
   output.map(QueryPlan.normalizeExprId(_, output)),
   requiredSchema,
-  partitionFilters.map(QueryPlan.normalizeExprId(_, output)),
-  dataFilters.map(QueryPlan.normalizeExprId(_, output)),
+  canonicalizeFilters(partitionFilters, output),
+  canonicalizeFilters(dataFilters, output),
   None)
   }
+
+  private def canonicalizeFilters(filters: Seq[Expression], output: 
Seq[Attribute])
+: Seq[Expression] = {
+if (filters.nonEmpty) {
+  val normalizedFilters = QueryPlan.normalizeExprId(filters.reduce(And), 
output)
+  splitConjunctivePredicates(normalizedFilters)
+} else {
+  Nil
+}
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/a8d981dc/sql/core/src/test/scala/org/apache/spark/sql/execution/SameResultSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SameResultSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SameResultSuite.scala
new file mode 100644
index 000..25e4ca0
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SameResultSuite.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.{DataFrame, QueryTest}
+import org.apache.spark.sql.test.SharedSQLCon

spark git commit: [SPARK-20718][SQL] FileSourceScanExec with different filter orders should be the same after canonicalization

2017-05-11 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master 2b36eb696 -> c8da53560


[SPARK-20718][SQL] FileSourceScanExec with different filter orders should be 
the same after canonicalization

## What changes were proposed in this pull request?

Since `constraints` in `QueryPlan` is a set, the order of filters can differ. 
Usually this is ok because of canonicalization. However, in 
`FileSourceScanExec`, its data filters and partition filters are sequences, and 
their orders are not canonicalized. So `def sameResult` returns different 
results for different orders of data/partition filters. This leads to, e.g. 
different decision for `ReuseExchange`, and thus results in unstable 
performance.

## How was this patch tested?

Added a new test for `FileSourceScanExec.sameResult`.

Author: wangzhenhua 

Closes #17959 from wzhfy/canonicalizeFileSourceScanExec.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c8da5356
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c8da5356
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c8da5356

Branch: refs/heads/master
Commit: c8da5356000c8e4ff9141e4a2892ebe0b9641d63
Parents: 2b36eb6
Author: wangzhenhua 
Authored: Fri May 12 13:42:48 2017 +0800
Committer: Wenchen Fan 
Committed: Fri May 12 13:42:48 2017 +0800

--
 .../sql/execution/DataSourceScanExec.scala  | 16 +--
 .../spark/sql/execution/SameResultSuite.scala   | 49 
 2 files changed, 62 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/c8da5356/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index 866fa98..251098c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -38,7 +38,7 @@ import org.apache.spark.sql.sources.BaseRelation
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.Utils
 
-trait DataSourceScanExec extends LeafExecNode with CodegenSupport {
+trait DataSourceScanExec extends LeafExecNode with CodegenSupport with 
PredicateHelper {
   val relation: BaseRelation
   val metastoreTableIdentifier: Option[TableIdentifier]
 
@@ -519,8 +519,18 @@ case class FileSourceScanExec(
   relation,
   output.map(QueryPlan.normalizeExprId(_, output)),
   requiredSchema,
-  partitionFilters.map(QueryPlan.normalizeExprId(_, output)),
-  dataFilters.map(QueryPlan.normalizeExprId(_, output)),
+  canonicalizeFilters(partitionFilters, output),
+  canonicalizeFilters(dataFilters, output),
   None)
   }
+
+  private def canonicalizeFilters(filters: Seq[Expression], output: 
Seq[Attribute])
+: Seq[Expression] = {
+if (filters.nonEmpty) {
+  val normalizedFilters = QueryPlan.normalizeExprId(filters.reduce(And), 
output)
+  splitConjunctivePredicates(normalizedFilters)
+} else {
+  Nil
+}
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/c8da5356/sql/core/src/test/scala/org/apache/spark/sql/execution/SameResultSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SameResultSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SameResultSuite.scala
new file mode 100644
index 000..25e4ca0
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SameResultSuite.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.{DataFrame, QueryTest}
+import org.apache.spark.sql.test.SharedSQLContext
+
+/**
+ * Tests for the sameResult function for [[SparkPlan]]s.
+ */
+class SameResultSuite extends