[spark] branch branch-2.3 updated: [SPARK-26366][SQL][BACKPORT-2.3] ReplaceExceptWithFilter should consider NULL as False

2018-12-22 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-2.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.3 by this push:
 new acf20d2  [SPARK-26366][SQL][BACKPORT-2.3] ReplaceExceptWithFilter 
should consider NULL as False
acf20d2 is described below

commit acf20d235b6ffe2ca5d8ba433f97f83367fcc160
Author: Marco Gaido 
AuthorDate: Sat Dec 22 16:09:14 2018 -0800

[SPARK-26366][SQL][BACKPORT-2.3] ReplaceExceptWithFilter should consider 
NULL as False

## What changes were proposed in this pull request?

In `ReplaceExceptWithFilter` we do not consider properly the case in which 
the condition returns NULL. Indeed, in that case, since negating NULL still 
returns NULL, so it is not true the assumption that negating the condition 
returns all the rows which didn't satisfy it, rows returning NULL may not be 
returned. This happens when constraints inferred by 
`InferFiltersFromConstraints` are not enough, as it happens with `OR` 
conditions.

The rule had also problems with non-deterministic conditions: in such a 
scenario, this rule would change the probability of the output.

The PR fixes these problem by:
 - returning False for the condition when it is Null (in this way we do 
return all the rows which didn't satisfy it);
 - avoiding any transformation when the condition is non-deterministic.

## How was this patch tested?

added UTs

Closes #23372 from mgaido91/SPARK-26366_2.3_2.

Authored-by: Marco Gaido 
Signed-off-by: Dongjoon Hyun 
---
 .../optimizer/ReplaceExceptWithFilter.scala| 32 ++---
 .../catalyst/optimizer/ReplaceOperatorSuite.scala  | 42 --
 .../scala/org/apache/spark/sql/DatasetSuite.scala  | 11 ++
 .../scala/org/apache/spark/sql/SQLQuerySuite.scala | 38 
 4 files changed, 99 insertions(+), 24 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceExceptWithFilter.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceExceptWithFilter.scala
index 45edf26..08cf160 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceExceptWithFilter.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceExceptWithFilter.scala
@@ -36,7 +36,8 @@ import org.apache.spark.sql.catalyst.rules.Rule
  * Note:
  * Before flipping the filter condition of the right node, we should:
  * 1. Combine all it's [[Filter]].
- * 2. Apply InferFiltersFromConstraints rule (to take into account of NULL 
values in the condition).
+ * 2. Update the attribute references to the left node;
+ * 3. Add a Coalesce(condition, False) (to take into account of NULL values in 
the condition).
  */
 object ReplaceExceptWithFilter extends Rule[LogicalPlan] {
 
@@ -47,23 +48,28 @@ object ReplaceExceptWithFilter extends Rule[LogicalPlan] {
 
 plan.transform {
   case e @ Except(left, right) if isEligible(left, right) =>
-val newCondition = transformCondition(left, skipProject(right))
-newCondition.map { c =>
-  Distinct(Filter(Not(c), left))
-}.getOrElse {
+val filterCondition = 
combineFilters(skipProject(right)).asInstanceOf[Filter].condition
+if (filterCondition.deterministic) {
+  transformCondition(left, filterCondition).map { c =>
+Distinct(Filter(Not(c), left))
+  }.getOrElse {
+e
+  }
+} else {
   e
 }
 }
   }
 
-  private def transformCondition(left: LogicalPlan, right: LogicalPlan): 
Option[Expression] = {
-val filterCondition =
-  
InferFiltersFromConstraints(combineFilters(right)).asInstanceOf[Filter].condition
-
-val attributeNameMap: Map[String, Attribute] = left.output.map(x => 
(x.name, x)).toMap
-
-if (filterCondition.references.forall(r => 
attributeNameMap.contains(r.name))) {
-  Some(filterCondition.transform { case a: AttributeReference => 
attributeNameMap(a.name) })
+  private def transformCondition(plan: LogicalPlan, condition: Expression): 
Option[Expression] = {
+val attributeNameMap: Map[String, Attribute] = plan.output.map(x => 
(x.name, x)).toMap
+if (condition.references.forall(r => attributeNameMap.contains(r.name))) {
+  val rewrittenCondition = condition.transform {
+case a: AttributeReference => attributeNameMap(a.name)
+  }
+  // We need to consider as False when the condition is NULL, otherwise we 
do not return those
+  // rows containing NULL which are instead filtered in the Except right 
plan
+  Some(Coalesce(Seq(rewrittenCondition, Literal.FalseLiteral)))
 } else {
   None
 }
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala
 

[spark] branch branch-2.3 updated: [SPARK-26366][SQL][BACKPORT-2.3] ReplaceExceptWithFilter should consider NULL as False

2018-12-21 Thread lixiao
This is an automated email from the ASF dual-hosted git repository.

lixiao pushed a commit to branch branch-2.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.3 by this push:
 new a7d50ae  [SPARK-26366][SQL][BACKPORT-2.3] ReplaceExceptWithFilter 
should consider NULL as False
a7d50ae is described below

commit a7d50ae24a5f92e8d9b6622436f0bb4c2e06cbe1
Author: Marco Gaido 
AuthorDate: Fri Dec 21 14:52:29 2018 -0800

[SPARK-26366][SQL][BACKPORT-2.3] ReplaceExceptWithFilter should consider 
NULL as False

## What changes were proposed in this pull request?

In `ReplaceExceptWithFilter` we do not consider properly the case in which 
the condition returns NULL. Indeed, in that case, since negating NULL still 
returns NULL, so it is not true the assumption that negating the condition 
returns all the rows which didn't satisfy it, rows returning NULL may not be 
returned. This happens when constraints inferred by 
`InferFiltersFromConstraints` are not enough, as it happens with `OR` 
conditions.

The rule had also problems with non-deterministic conditions: in such a 
scenario, this rule would change the probability of the output.

The PR fixes these problem by:
 - returning False for the condition when it is Null (in this way we do 
return all the rows which didn't satisfy it);
 - avoiding any transformation when the condition is non-deterministic.

## How was this patch tested?

added UTs

Closes #23350 from mgaido91/SPARK-26366_2.3.

Authored-by: Marco Gaido 
Signed-off-by: gatorsmile 
---
 .../optimizer/ReplaceExceptWithFilter.scala| 32 +---
 .../catalyst/optimizer/ReplaceOperatorSuite.scala  | 44 --
 .../scala/org/apache/spark/sql/DatasetSuite.scala  | 11 ++
 .../scala/org/apache/spark/sql/SQLQuerySuite.scala | 38 +++
 4 files changed, 101 insertions(+), 24 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceExceptWithFilter.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceExceptWithFilter.scala
index 45edf26..08cf160 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceExceptWithFilter.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceExceptWithFilter.scala
@@ -36,7 +36,8 @@ import org.apache.spark.sql.catalyst.rules.Rule
  * Note:
  * Before flipping the filter condition of the right node, we should:
  * 1. Combine all it's [[Filter]].
- * 2. Apply InferFiltersFromConstraints rule (to take into account of NULL 
values in the condition).
+ * 2. Update the attribute references to the left node;
+ * 3. Add a Coalesce(condition, False) (to take into account of NULL values in 
the condition).
  */
 object ReplaceExceptWithFilter extends Rule[LogicalPlan] {
 
@@ -47,23 +48,28 @@ object ReplaceExceptWithFilter extends Rule[LogicalPlan] {
 
 plan.transform {
   case e @ Except(left, right) if isEligible(left, right) =>
-val newCondition = transformCondition(left, skipProject(right))
-newCondition.map { c =>
-  Distinct(Filter(Not(c), left))
-}.getOrElse {
+val filterCondition = 
combineFilters(skipProject(right)).asInstanceOf[Filter].condition
+if (filterCondition.deterministic) {
+  transformCondition(left, filterCondition).map { c =>
+Distinct(Filter(Not(c), left))
+  }.getOrElse {
+e
+  }
+} else {
   e
 }
 }
   }
 
-  private def transformCondition(left: LogicalPlan, right: LogicalPlan): 
Option[Expression] = {
-val filterCondition =
-  
InferFiltersFromConstraints(combineFilters(right)).asInstanceOf[Filter].condition
-
-val attributeNameMap: Map[String, Attribute] = left.output.map(x => 
(x.name, x)).toMap
-
-if (filterCondition.references.forall(r => 
attributeNameMap.contains(r.name))) {
-  Some(filterCondition.transform { case a: AttributeReference => 
attributeNameMap(a.name) })
+  private def transformCondition(plan: LogicalPlan, condition: Expression): 
Option[Expression] = {
+val attributeNameMap: Map[String, Attribute] = plan.output.map(x => 
(x.name, x)).toMap
+if (condition.references.forall(r => attributeNameMap.contains(r.name))) {
+  val rewrittenCondition = condition.transform {
+case a: AttributeReference => attributeNameMap(a.name)
+  }
+  // We need to consider as False when the condition is NULL, otherwise we 
do not return those
+  // rows containing NULL which are instead filtered in the Except right 
plan
+  Some(Coalesce(Seq(rewrittenCondition, Literal.FalseLiteral)))
 } else {
   None
 }
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala