[jira] [Issue Comment Deleted] (SPARK-26366) Except with transform regression

2019-01-04 Thread Reynold Xin (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26366?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Reynold Xin updated SPARK-26366:

Comment: was deleted

(was: mgaido91 opened a new pull request #23372: 
[SPARK-26366][SQL][BACKPORT-2.3] ReplaceExceptWithFilter should consider NULL 
as False
URL: https://github.com/apache/spark/pull/23372
 
 
   ## What changes were proposed in this pull request?
   
   In `ReplaceExceptWithFilter` we do not consider properly the case in which 
the condition returns NULL. Indeed, in that case, since negating NULL still 
returns NULL, so it is not true the assumption that negating the condition 
returns all the rows which didn't satisfy it, rows returning NULL may not be 
returned. This happens when constraints inferred by 
`InferFiltersFromConstraints` are not enough, as it happens with `OR` 
conditions.
   
   The rule had also problems with non-deterministic conditions: in such a 
scenario, this rule would change the probability of the output.
   
   The PR fixes these problem by:
- returning False for the condition when it is Null (in this way we do 
return all the rows which didn't satisfy it);
- avoiding any transformation when the condition is non-deterministic.
   
   ## How was this patch tested?
   
   added UTs
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org
)

> Except with transform regression
> 
>
> Key: SPARK-26366
> URL: https://issues.apache.org/jira/browse/SPARK-26366
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core, SQL
>Affects Versions: 2.3.2
>Reporter: Dan Osipov
>Assignee: Marco Gaido
>Priority: Major
>  Labels: correctness
> Fix For: 2.3.3, 2.4.1, 3.0.0
>
>
> There appears to be a regression between Spark 2.2 and 2.3. Below is the code 
> to reproduce it:
>  
> {code:java}
> import org.apache.spark.sql.functions.col
> import org.apache.spark.sql.Row
> import org.apache.spark.sql.types._
> val inputDF = spark.sqlContext.createDataFrame(
>   spark.sparkContext.parallelize(Seq(
> Row("0", "john", "smith", "j...@smith.com"),
> Row("1", "jane", "doe", "j...@doe.com"),
> Row("2", "apache", "spark", "sp...@apache.org"),
> Row("3", "foo", "bar", null)
>   )),
>   StructType(List(
> StructField("id", StringType, nullable=true),
> StructField("first_name", StringType, nullable=true),
> StructField("last_name", StringType, nullable=true),
> StructField("email", StringType, nullable=true)
>   ))
> )
> val exceptDF = inputDF.transform( toProcessDF =>
>   toProcessDF.filter(
>   (
> col("first_name").isin(Seq("john", "jane"): _*)
>   and col("last_name").isin(Seq("smith", "doe"): _*)
>   )
>   or col("email").isin(List(): _*)
>   )
> )
> inputDF.except(exceptDF).show()
> {code}
> Output with Spark 2.2:
> {noformat}
> +---+--+-++
> | id|first_name|last_name| email|
> +---+--+-++
> | 2| apache| spark|sp...@apache.org|
> | 3| foo| bar| null|
> +---+--+-++{noformat}
> Output with Spark 2.3:
> {noformat}
> +---+--+-++
> | id|first_name|last_name| email|
> +---+--+-++
> | 2| apache| spark|sp...@apache.org|
> +---+--+-++{noformat}
> Note, changing the last line to 
> {code:java}
> inputDF.except(exceptDF.cache()).show()
> {code}
> produces identical output for both Spark 2.3 and 2.2
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Issue Comment Deleted] (SPARK-26366) Except with transform regression

2019-01-04 Thread Reynold Xin (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26366?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Reynold Xin updated SPARK-26366:

Comment: was deleted

(was: mgaido91 opened a new pull request #23350: 
[SPARK-26366][SQL][BACKPORT-2.3] ReplaceExceptWithFilter should consider NULL 
as False
URL: https://github.com/apache/spark/pull/23350
 
 
   ## What changes were proposed in this pull request?
   
   In `ReplaceExceptWithFilter` we do not consider properly the case in which 
the condition returns NULL. Indeed, in that case, since negating NULL still 
returns NULL, so it is not true the assumption that negating the condition 
returns all the rows which didn't satisfy it, rows returning NULL may not be 
returned. This happens when constraints inferred by 
`InferFiltersFromConstraints` are not enough, as it happens with `OR` 
conditions.
   
   The rule had also problems with non-deterministic conditions: in such a 
scenario, this rule would change the probability of the output.
   
   The PR fixes these problem by:
- returning False for the condition when it is Null (in this way we do 
return all the rows which didn't satisfy it);
- avoiding any transformation when the condition is non-deterministic.
   
   ## How was this patch tested?
   
   added UTs
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org
)

> Except with transform regression
> 
>
> Key: SPARK-26366
> URL: https://issues.apache.org/jira/browse/SPARK-26366
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core, SQL
>Affects Versions: 2.3.2
>Reporter: Dan Osipov
>Assignee: Marco Gaido
>Priority: Major
>  Labels: correctness
> Fix For: 2.3.3, 2.4.1, 3.0.0
>
>
> There appears to be a regression between Spark 2.2 and 2.3. Below is the code 
> to reproduce it:
>  
> {code:java}
> import org.apache.spark.sql.functions.col
> import org.apache.spark.sql.Row
> import org.apache.spark.sql.types._
> val inputDF = spark.sqlContext.createDataFrame(
>   spark.sparkContext.parallelize(Seq(
> Row("0", "john", "smith", "j...@smith.com"),
> Row("1", "jane", "doe", "j...@doe.com"),
> Row("2", "apache", "spark", "sp...@apache.org"),
> Row("3", "foo", "bar", null)
>   )),
>   StructType(List(
> StructField("id", StringType, nullable=true),
> StructField("first_name", StringType, nullable=true),
> StructField("last_name", StringType, nullable=true),
> StructField("email", StringType, nullable=true)
>   ))
> )
> val exceptDF = inputDF.transform( toProcessDF =>
>   toProcessDF.filter(
>   (
> col("first_name").isin(Seq("john", "jane"): _*)
>   and col("last_name").isin(Seq("smith", "doe"): _*)
>   )
>   or col("email").isin(List(): _*)
>   )
> )
> inputDF.except(exceptDF).show()
> {code}
> Output with Spark 2.2:
> {noformat}
> +---+--+-++
> | id|first_name|last_name| email|
> +---+--+-++
> | 2| apache| spark|sp...@apache.org|
> | 3| foo| bar| null|
> +---+--+-++{noformat}
> Output with Spark 2.3:
> {noformat}
> +---+--+-++
> | id|first_name|last_name| email|
> +---+--+-++
> | 2| apache| spark|sp...@apache.org|
> +---+--+-++{noformat}
> Note, changing the last line to 
> {code:java}
> inputDF.except(exceptDF.cache()).show()
> {code}
> produces identical output for both Spark 2.3 and 2.2
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Issue Comment Deleted] (SPARK-26366) Except with transform regression

2019-01-04 Thread Reynold Xin (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26366?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Reynold Xin updated SPARK-26366:

Comment: was deleted

(was: mgaido91 opened a new pull request #23315: [SPARK-26366][SQL] 
ReplaceExceptWithFilter should consider NULL as False
URL: https://github.com/apache/spark/pull/23315
 
 
   ## What changes were proposed in this pull request?
   
   In `ReplaceExceptWithFilter` we do not consider the case in which the 
condition returns NULL. Indeed, in that case, negating NULL still returns NULL, 
so it is not true the assumption that negating the condition returns all the 
rows which didn't satisfy it: rows returning NULL are not returned.
   
   The PR fixes this problem by returning False for the condition when it is 
Null. In this way we do return all the rows which didn't satisfy it.
   
   ## How was this patch tested?
   
   added UTs


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org
)

> Except with transform regression
> 
>
> Key: SPARK-26366
> URL: https://issues.apache.org/jira/browse/SPARK-26366
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core, SQL
>Affects Versions: 2.3.2
>Reporter: Dan Osipov
>Assignee: Marco Gaido
>Priority: Major
>  Labels: correctness
> Fix For: 2.3.3, 2.4.1, 3.0.0
>
>
> There appears to be a regression between Spark 2.2 and 2.3. Below is the code 
> to reproduce it:
>  
> {code:java}
> import org.apache.spark.sql.functions.col
> import org.apache.spark.sql.Row
> import org.apache.spark.sql.types._
> val inputDF = spark.sqlContext.createDataFrame(
>   spark.sparkContext.parallelize(Seq(
> Row("0", "john", "smith", "j...@smith.com"),
> Row("1", "jane", "doe", "j...@doe.com"),
> Row("2", "apache", "spark", "sp...@apache.org"),
> Row("3", "foo", "bar", null)
>   )),
>   StructType(List(
> StructField("id", StringType, nullable=true),
> StructField("first_name", StringType, nullable=true),
> StructField("last_name", StringType, nullable=true),
> StructField("email", StringType, nullable=true)
>   ))
> )
> val exceptDF = inputDF.transform( toProcessDF =>
>   toProcessDF.filter(
>   (
> col("first_name").isin(Seq("john", "jane"): _*)
>   and col("last_name").isin(Seq("smith", "doe"): _*)
>   )
>   or col("email").isin(List(): _*)
>   )
> )
> inputDF.except(exceptDF).show()
> {code}
> Output with Spark 2.2:
> {noformat}
> +---+--+-++
> | id|first_name|last_name| email|
> +---+--+-++
> | 2| apache| spark|sp...@apache.org|
> | 3| foo| bar| null|
> +---+--+-++{noformat}
> Output with Spark 2.3:
> {noformat}
> +---+--+-++
> | id|first_name|last_name| email|
> +---+--+-++
> | 2| apache| spark|sp...@apache.org|
> +---+--+-++{noformat}
> Note, changing the last line to 
> {code:java}
> inputDF.except(exceptDF.cache()).show()
> {code}
> produces identical output for both Spark 2.3 and 2.2
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Issue Comment Deleted] (SPARK-26366) Except with transform regression

2019-01-04 Thread Reynold Xin (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26366?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Reynold Xin updated SPARK-26366:

Comment: was deleted

(was: gatorsmile closed pull request #23350: [SPARK-26366][SQL][BACKPORT-2.3] 
ReplaceExceptWithFilter should consider NULL as False
URL: https://github.com/apache/spark/pull/23350
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceExceptWithFilter.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceExceptWithFilter.scala
index 45edf266bbce4..08cf16038a654 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceExceptWithFilter.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceExceptWithFilter.scala
@@ -36,7 +36,8 @@ import org.apache.spark.sql.catalyst.rules.Rule
  * Note:
  * Before flipping the filter condition of the right node, we should:
  * 1. Combine all it's [[Filter]].
- * 2. Apply InferFiltersFromConstraints rule (to take into account of NULL 
values in the condition).
+ * 2. Update the attribute references to the left node;
+ * 3. Add a Coalesce(condition, False) (to take into account of NULL values in 
the condition).
  */
 object ReplaceExceptWithFilter extends Rule[LogicalPlan] {
 
@@ -47,23 +48,28 @@ object ReplaceExceptWithFilter extends Rule[LogicalPlan] {
 
 plan.transform {
   case e @ Except(left, right) if isEligible(left, right) =>
-val newCondition = transformCondition(left, skipProject(right))
-newCondition.map { c =>
-  Distinct(Filter(Not(c), left))
-}.getOrElse {
+val filterCondition = 
combineFilters(skipProject(right)).asInstanceOf[Filter].condition
+if (filterCondition.deterministic) {
+  transformCondition(left, filterCondition).map { c =>
+Distinct(Filter(Not(c), left))
+  }.getOrElse {
+e
+  }
+} else {
   e
 }
 }
   }
 
-  private def transformCondition(left: LogicalPlan, right: LogicalPlan): 
Option[Expression] = {
-val filterCondition =
-  
InferFiltersFromConstraints(combineFilters(right)).asInstanceOf[Filter].condition
-
-val attributeNameMap: Map[String, Attribute] = left.output.map(x => 
(x.name, x)).toMap
-
-if (filterCondition.references.forall(r => 
attributeNameMap.contains(r.name))) {
-  Some(filterCondition.transform { case a: AttributeReference => 
attributeNameMap(a.name) })
+  private def transformCondition(plan: LogicalPlan, condition: Expression): 
Option[Expression] = {
+val attributeNameMap: Map[String, Attribute] = plan.output.map(x => 
(x.name, x)).toMap
+if (condition.references.forall(r => attributeNameMap.contains(r.name))) {
+  val rewrittenCondition = condition.transform {
+case a: AttributeReference => attributeNameMap(a.name)
+  }
+  // We need to consider as False when the condition is NULL, otherwise we 
do not return those
+  // rows containing NULL which are instead filtered in the Except right 
plan
+  Some(Coalesce(Seq(rewrittenCondition, Literal.FalseLiteral)))
 } else {
   None
 }
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala
index 52dc2e9fb076c..78d3969906e99 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala
@@ -20,11 +20,12 @@ package org.apache.spark.sql.catalyst.optimizer
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.dsl.plans._
-import org.apache.spark.sql.catalyst.expressions.{Alias, Literal, Not}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Coalesce, If, 
Literal, Not}
 import org.apache.spark.sql.catalyst.expressions.aggregate.First
 import org.apache.spark.sql.catalyst.plans.{LeftAnti, LeftSemi, PlanTest}
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.RuleExecutor
+import org.apache.spark.sql.types.BooleanType
 
 class ReplaceOperatorSuite extends PlanTest {
 
@@ -65,8 +66,7 @@ class ReplaceOperatorSuite extends PlanTest {
 
 val correctAnswer =
   Aggregate(table1.output, table1.output,
-Filter(Not((attributeA.isNotNull && attributeB.isNotNull) &&
-  (attributeA >= 2 && attributeB < 1)),
+

[jira] [Issue Comment Deleted] (SPARK-26366) Except with transform regression

2019-01-04 Thread Reynold Xin (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26366?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Reynold Xin updated SPARK-26366:

Comment: was deleted

(was: asfgit closed pull request #23315: [SPARK-26366][SQL] 
ReplaceExceptWithFilter should consider NULL as False
URL: https://github.com/apache/spark/pull/23315
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceExceptWithFilter.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceExceptWithFilter.scala
index efd3944eba7f5..4996d24dfd298 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceExceptWithFilter.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceExceptWithFilter.scala
@@ -36,7 +36,8 @@ import org.apache.spark.sql.catalyst.rules.Rule
  * Note:
  * Before flipping the filter condition of the right node, we should:
  * 1. Combine all it's [[Filter]].
- * 2. Apply InferFiltersFromConstraints rule (to take into account of NULL 
values in the condition).
+ * 2. Update the attribute references to the left node;
+ * 3. Add a Coalesce(condition, False) (to take into account of NULL values in 
the condition).
  */
 object ReplaceExceptWithFilter extends Rule[LogicalPlan] {
 
@@ -47,23 +48,28 @@ object ReplaceExceptWithFilter extends Rule[LogicalPlan] {
 
 plan.transform {
   case e @ Except(left, right, false) if isEligible(left, right) =>
-val newCondition = transformCondition(left, skipProject(right))
-newCondition.map { c =>
-  Distinct(Filter(Not(c), left))
-}.getOrElse {
+val filterCondition = 
combineFilters(skipProject(right)).asInstanceOf[Filter].condition
+if (filterCondition.deterministic) {
+  transformCondition(left, filterCondition).map { c =>
+Distinct(Filter(Not(c), left))
+  }.getOrElse {
+e
+  }
+} else {
   e
 }
 }
   }
 
-  private def transformCondition(left: LogicalPlan, right: LogicalPlan): 
Option[Expression] = {
-val filterCondition =
-  
InferFiltersFromConstraints(combineFilters(right)).asInstanceOf[Filter].condition
-
-val attributeNameMap: Map[String, Attribute] = left.output.map(x => 
(x.name, x)).toMap
-
-if (filterCondition.references.forall(r => 
attributeNameMap.contains(r.name))) {
-  Some(filterCondition.transform { case a: AttributeReference => 
attributeNameMap(a.name) })
+  private def transformCondition(plan: LogicalPlan, condition: Expression): 
Option[Expression] = {
+val attributeNameMap: Map[String, Attribute] = plan.output.map(x => 
(x.name, x)).toMap
+if (condition.references.forall(r => attributeNameMap.contains(r.name))) {
+  val rewrittenCondition = condition.transform {
+case a: AttributeReference => attributeNameMap(a.name)
+  }
+  // We need to consider as False when the condition is NULL, otherwise we 
do not return those
+  // rows containing NULL which are instead filtered in the Except right 
plan
+  Some(Coalesce(Seq(rewrittenCondition, Literal.FalseLiteral)))
 } else {
   None
 }
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala
index 3b1b2d588ef67..c8e15c7da763e 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala
@@ -20,11 +20,12 @@ package org.apache.spark.sql.catalyst.optimizer
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.dsl.plans._
-import org.apache.spark.sql.catalyst.expressions.{Alias, Literal, Not}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Coalesce, If, 
Literal, Not}
 import org.apache.spark.sql.catalyst.expressions.aggregate.First
 import org.apache.spark.sql.catalyst.plans.{LeftAnti, LeftSemi, PlanTest}
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.RuleExecutor
+import org.apache.spark.sql.types.BooleanType
 
 class ReplaceOperatorSuite extends PlanTest {
 
@@ -65,8 +66,7 @@ class ReplaceOperatorSuite extends PlanTest {
 
 val correctAnswer =
   Aggregate(table1.output, table1.output,
-Filter(Not((attributeA.isNotNull && attributeB.isNotNull) &&
-  (attributeA >= 2 && attributeB < 1)),
+