Repository: spark
Updated Branches:
  refs/heads/branch-2.0 c864e8a80 -> 399597b04


[SPARK-17337][SPARK-16804][SQL][BRANCH-2.0] Backport subquery related PRs

## What changes were proposed in this pull request?
This PR backports two subquery related PRs to branch-2.0:

- https://github.com/apache/spark/pull/14411
- https://github.com/apache/spark/pull/15761

## How was this patch tested?
Added a tests  to `SubquerySuite`.

Author: Nattavut Sutyanyong <nsy....@gmail.com>
Author: Herman van Hovell <hvanhov...@databricks.com>

Closes #15772 from hvanhovell/SPARK-17337-2.0.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/399597b0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/399597b0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/399597b0

Branch: refs/heads/branch-2.0
Commit: 399597b04a83bbe3cc748c21446de0d808d08155
Parents: c864e8a
Author: Herman van Hovell <hvanhov...@databricks.com>
Authored: Fri Nov 4 15:54:58 2016 -0700
Committer: Reynold Xin <r...@databricks.com>
Committed: Fri Nov 4 15:54:58 2016 -0700

----------------------------------------------------------------------
 .../spark/sql/catalyst/analysis/Analyzer.scala  | 13 ++++++
 .../sql/catalyst/optimizer/Optimizer.scala      | 16 ++++++-
 .../catalyst/analysis/AnalysisErrorSuite.scala  | 17 ++++++++
 .../org/apache/spark/sql/SubquerySuite.scala    | 44 ++++++++++++++++++++
 4 files changed, 89 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/399597b0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 617f3e0..6332f92 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -1025,6 +1025,19 @@ class Analyzer(
         case e: Expand =>
           failOnOuterReferenceInSubTree(e, "an EXPAND")
           e
+        case l : LocalLimit =>
+          failOnOuterReferenceInSubTree(l, "a LIMIT")
+          l
+        // Since LIMIT <n> is represented as GlobalLimit(<n>, (LocalLimit 
(<n>, child))
+        // and we are walking bottom up, we will fail on LocalLimit before
+        // reaching GlobalLimit.
+        // The code below is just a safety net.
+        case g : GlobalLimit =>
+          failOnOuterReferenceInSubTree(g, "a LIMIT")
+          g
+        case s : Sample =>
+          failOnOuterReferenceInSubTree(s, "a TABLESAMPLE")
+          s
         case p =>
           failOnOuterReference(p)
           p

http://git-wip-us.apache.org/repos/asf/spark/blob/399597b0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 4c06038..f0992b3 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -1020,7 +1020,7 @@ object PushDownPredicate extends Rule[LogicalPlan] with 
PredicateHelper {
     // state and all the input rows processed before. In another word, the 
order of input rows
     // matters for non-deterministic expressions, while pushing down 
predicates changes the order.
     case filter @ Filter(condition, project @ Project(fields, grandChild))
-      if fields.forall(_.deterministic) =>
+      if fields.forall(_.deterministic) && canPushThroughCondition(grandChild, 
condition) =>
 
       // Create a map of Aliases to their values from the child projection.
       // e.g., 'SELECT a + b AS c, d ...' produces Map(c -> a + b).
@@ -1161,6 +1161,20 @@ object PushDownPredicate extends Rule[LogicalPlan] with 
PredicateHelper {
       filter
     }
   }
+
+  /**
+   * Check if we can safely push a filter through a projection, by making sure 
that predicate
+   * subqueries in the condition do not contain the same attributes as the 
plan they are moved
+   * into. This can happen when the plan and predicate subquery have the same 
source.
+   */
+  private def canPushThroughCondition(plan: LogicalPlan, condition: 
Expression): Boolean = {
+    val attributes = plan.outputSet
+    val matched = condition.find {
+      case PredicateSubquery(p, _, _, _) => 
p.outputSet.intersect(attributes).nonEmpty
+      case _ => false
+    }
+    matched.isEmpty
+  }
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/399597b0/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
index ff112c5..6438065 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
@@ -533,5 +533,22 @@ class AnalysisErrorSuite extends AnalysisTest {
       Exists(Union(LocalRelation(b), Filter(EqualTo(OuterReference(a), c), 
LocalRelation(c)))),
       LocalRelation(a))
     assertAnalysisError(plan3, "Accessing outer query column is not allowed 
in" :: Nil)
+
+    val plan4 = Filter(
+      Exists(
+        Limit(1,
+          Filter(EqualTo(OuterReference(a), b), LocalRelation(b)))
+      ),
+      LocalRelation(a))
+    assertAnalysisError(plan4, "Accessing outer query column is not allowed in 
a LIMIT" :: Nil)
+
+    val plan5 = Filter(
+      Exists(
+        Sample(0.0, 0.5, false, 1L,
+          Filter(EqualTo(OuterReference(a), b), LocalRelation(b)))().select('b)
+      ),
+      LocalRelation(a))
+    assertAnalysisError(plan5,
+                        "Accessing outer query column is not allowed in a 
TABLESAMPLE" :: Nil)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/399597b0/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
index afed342..184d1f7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
@@ -571,4 +571,48 @@ class SubquerySuite extends QueryTest with 
SharedSQLContext {
       Row(1.0, false) :: Row(1.0, false) :: Row(2.0, true) :: Row(2.0, true) ::
         Row(3.0, false) :: Row(5.0, true) :: Row(null, false) :: Row(null, 
true) :: Nil)
   }
+
+  test("SPARK-16804: Correlated subqueries containing LIMIT - 1") {
+    withTempView("onerow") {
+      Seq(1).toDF("c1").createOrReplaceTempView("onerow")
+
+      checkAnswer(
+        sql(
+          """
+            | select c1 from onerow t1
+            | where exists (select 1 from onerow t2 where t1.c1=t2.c1)
+            | and   exists (select 1 from onerow LIMIT 1)""".stripMargin),
+        Row(1) :: Nil)
+    }
+  }
+
+  test("SPARK-16804: Correlated subqueries containing LIMIT - 2") {
+    withTempView("onerow") {
+      Seq(1).toDF("c1").createOrReplaceTempView("onerow")
+
+      checkAnswer(
+        sql(
+          """
+            | select c1 from onerow t1
+            | where exists (select 1
+            |               from   (select 1 from onerow t2 LIMIT 1)
+            |               where  t1.c1=t2.c1)""".stripMargin),
+        Row(1) :: Nil)
+    }
+  }
+
+  test("SPARK-17337: Incorrect column resolution leads to incorrect results") {
+    withTempView("t1", "t2") {
+      Seq(1, 2).toDF("c1").createOrReplaceTempView("t1")
+      Seq(1).toDF("c2").createOrReplaceTempView("t2")
+        checkAnswer(
+        sql(
+          """
+            | select *
+            | from   (select t2.c2+1 as c3
+            |         from   t1 left join t2 on t1.c1=t2.c2) t3
+            | where  c3 not in (select c2 from t2)""".stripMargin),
+        Row(2) :: Nil)
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to