[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-03-14 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/16954


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-03-13 Thread dilipbiswal
Github user dilipbiswal commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r105831346
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
 ---
@@ -123,19 +123,36 @@ case class Not(child: Expression)
  */
 @ExpressionDescription(
   usage = "expr1 _FUNC_(expr2, expr3, ...) - Returns true if `expr` equals 
to any valN.")
-case class In(value: Expression, list: Seq[Expression]) extends Predicate
-with ImplicitCastInputTypes {
+case class In(value: Expression, list: Seq[Expression]) extends Predicate {
 
   require(list != null, "list should not be null")
-
-  override def inputTypes: Seq[AbstractDataType] = value.dataType +: 
list.map(_.dataType)
-
   override def checkInputDataTypes(): TypeCheckResult = {
-if (list.exists(l => l.dataType != value.dataType)) {
-  TypeCheckResult.TypeCheckFailure(
-"Arguments must be same type")
-} else {
-  TypeCheckResult.TypeCheckSuccess
+list match {
+  case ListQuery(sub, _, _) :: Nil =>
+val valExprs = value match {
+  case cns: CreateNamedStruct => cns.valExprs
+  case expr => Seq(expr)
+}
+val isTypeMismatched = valExprs.zip(sub.output).exists {
+  case (l, r) => l.dataType != r.dataType
+}
+if (isTypeMismatched) {
--- End diff --

@hvanhovell The new error message looks like following. Does this look okay 
to you ?

```
Error in query: cannot resolve '(named_struct('c1', at1.`c1`, 'c2', 
at1.`c2`) IN (listquery()))' due to data type mismatch: 
The data type of one or more elements in the left hand side of an IN 
subquery
is not compatible with the data type of the output of the subquery
Mismatched columns:
[(at1.`c1`:decimal(10,0), at2.`c1`:timestamp), (at1.`c2`:timestamp, 
at2.`c2`:decimal(10,0))]
Left side:
[decimal(10,0), timestamp].
Right side:
[timestamp, decimal(10,0)].
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-03-13 Thread dilipbiswal
Github user dilipbiswal commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r105830367
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
 ---
@@ -365,17 +368,73 @@ object TypeCoercion {
   }
 
   /**
-   * Convert the value and in list expressions to the common operator type
-   * by looking at all the argument types and finding the closest one that
-   * all the arguments can be cast to. When no common operator type is 
found
-   * the original expression will be returned and an Analysis Exception 
will
-   * be raised at type checking phase.
+   * Handles type coercion for both IN expression with subquery and IN
+   * expressions without subquery.
+   * 1. In the first case, find the common type by comparing the left hand 
side (LHS)
+   *expression types against corresponding right hand side (RHS) 
expression derived
+   *from the subquery expression's plan output. Inject appropriate 
casts in the
+   *LHS and RHS side of IN expression.
+   *
+   * 2. In the second case, convert the value and in list expressions to 
the
+   *common operator type by looking at all the argument types and 
finding
+   *the closest one that all the arguments can be cast to. When no 
common
+   *operator type is found the original expression will be returned 
and an
+   *Analysis Exception will be raised at the type checking phase.
*/
   object InConversion extends Rule[LogicalPlan] {
 def apply(plan: LogicalPlan): LogicalPlan = plan resolveExpressions {
   // Skip nodes who's children have not been resolved yet.
   case e if !e.childrenResolved => e
 
+  // Handle type casting required between value expression and 
subquery output
+  // in IN subquery.
+  case i @ In(a, Seq(ListQuery(sub, children, exprId))) if !i.resolved 
=>
+// LHS is the value expression of IN subquery.
+val lhs = a match {
+  // Multi columns in IN clause is represented as a 
CreateNamedStruct.
+  // flatten the named struct to get the list of expressions.
+  case cns: CreateNamedStruct => cns.valExprs
+  case expr => Seq(expr)
+}
+
+// RHS is the subquery output.
+val rhs = sub.output
+require(lhs.length == rhs.length)
+
+val commonTypes = lhs.zip(rhs).flatMap { case (l, r) =>
+  findCommonTypeForBinaryComparison(l.dataType, r.dataType) match {
+case d @ Some(_) => d
+case _ => findTightestCommonType(l.dataType, r.dataType)
+  }
+}
+
+// The number of columns/expressions must match between LHS and 
RHS of an
+// IN subquery expression.
+if (commonTypes.length == lhs.length) {
+  val castedRhs = rhs.zip(commonTypes).map {
+case (e, dt) if e.dataType != dt => Alias(Cast(e, dt), 
e.name)()
+case (e, _) => e
+  }
+  val castedLhs = lhs.zip(commonTypes).map {
+case (e, dt) if e.dataType != dt => Cast(e, dt)
+case (e, _) => e
+  }
+
+  // Before constructing the In expression, wrap the multi values 
in LHS
+  // in a CreatedNamedStruct.
+  val newLhs = a match {
--- End diff --

@hvanhovell Thanks a lot. You are right, we don't care about the names. 
This looks much better.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-03-13 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r105780981
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala
 ---
@@ -40,19 +43,189 @@ abstract class PlanExpression[T <: QueryPlan[_]] 
extends Expression {
 /**
  * A base interface for expressions that contain a [[LogicalPlan]].
  */
-abstract class SubqueryExpression extends PlanExpression[LogicalPlan] {
+abstract class SubqueryExpression(
+plan: LogicalPlan,
+children: Seq[Expression],
+exprId: ExprId) extends PlanExpression[LogicalPlan] {
+
+  override lazy val resolved: Boolean = childrenResolved && plan.resolved
+  override lazy val references: AttributeSet =
+if (plan.resolved) super.references -- plan.outputSet else 
super.references
   override def withNewPlan(plan: LogicalPlan): SubqueryExpression
+  override def semanticEquals(o: Expression): Boolean = o match {
+case p: SubqueryExpression =>
+  this.getClass.getName.equals(p.getClass.getName) && 
plan.sameResult(p.plan) &&
+children.length == p.children.length &&
+children.zip(p.children).forall(p => p._1.semanticEquals(p._2))
+case _ => false
+  }
 }
 
 object SubqueryExpression {
+  /**
+   * Returns true when an expression contains an IN or EXISTS subquery and 
false otherwise.
+   */
+  def hasInOrExistsSubquery(e: Expression): Boolean = {
+e.find {
+  case _: ListQuery | _: Exists => true
+  case _ => false
+}.isDefined
+  }
+
+  /**
+   * Returns true when an expression contains a subquery that has outer 
reference(s). The outer
+   * reference attributes are kept as children of subquery expression by
+   * [[org.apache.spark.sql.catalyst.analysis.Analyzer.ResolveSubquery]]
+   */
   def hasCorrelatedSubquery(e: Expression): Boolean = {
 e.find {
-  case e: SubqueryExpression if e.children.nonEmpty => true
+  case s: SubqueryExpression => s.children.nonEmpty
   case _ => false
 }.isDefined
   }
 }
 
+object SubExprUtils extends PredicateHelper {
+  /**
+   * Returns true when an expression contains correlated predicates i.e 
outer references and
+   * returns false otherwise.
+   */
+  def containsOuter(e: Expression): Boolean = {
+e.find(_.isInstanceOf[OuterReference]).isDefined
+  }
+
+  /**
+   * Returns whether there are any null-aware predicate subqueries inside 
Not. If not, we could
+   * turn the null-aware predicate into not-null-aware predicate.
+   */
+  def hasNullAwarePredicateWithinNot(condition: Expression): Boolean = {
+splitConjunctivePredicates(condition).exists {
+  case _: Exists | Not(_: Exists) | In(_, Seq(_: ListQuery)) | 
Not(In(_, Seq(_: ListQuery))) =>
+false
+  case e => e.find { x =>
+x.isInstanceOf[Not] && e.find {
+  case In(_, Seq(_: ListQuery)) => true
+  case _ => false
+}.isDefined
+  }.isDefined
+}
+
+  }
+
+  /**
+   * Returns an expression after removing the OuterReference shell.
+   */
+  def stripOuterReference(e: Expression): Expression = e.transform { case 
OuterReference(r) => r }
+
+  /**
+   * Returns the list of expressions after removing the OuterReference 
shell from each of
+   * the expression.
+   */
+  def stripOuterReferences(e: Seq[Expression]): Seq[Expression] = 
e.map(stripOuterReference)
+
+  /**
+   * Returns the logical plan after removing the OuterReference shell from 
all the expressions
+   * of the input logical plan.
+   */
+  def stripOuterReferences(p: LogicalPlan): LogicalPlan = {
+p.transformAllExpressions {
+  case OuterReference(a) => a
+}
+  }
+
+  /**
+   * Given a logical plan, returns TRUE if it has an outer reference and 
false otherwise.
+   */
+  def hasOuterReferences(plan: LogicalPlan): Boolean = {
+plan.find {
+  case f: Filter => containsOuter(f.condition)
+  case other => false
+}.isDefined
+  }
+
+  /**
+   * Given a list of expressions, returns the expressions which have outer 
references. Aggregate
+   * expressions are treated in a special way. If the children of 
aggregate expression contains an
+   * outer reference, then the entire aggregate expression is marked as an 
outer reference.
+   * Example (SQL):
+   * {{{
+   *   SELECT a FROM l GROUP by 1 HAVING EXISTS (SELECT 1 FROM r WHERE d < 
min(b))
+   * }}}
+   * In the above case, we want to mark the entire min(b) as an outer 
reference
+   * OuterReference(min(b)) instead of min(OuterReference(b)).
 

[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-03-13 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r105780845
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala
 ---
@@ -40,19 +43,189 @@ abstract class PlanExpression[T <: QueryPlan[_]] 
extends Expression {
 /**
  * A base interface for expressions that contain a [[LogicalPlan]].
  */
-abstract class SubqueryExpression extends PlanExpression[LogicalPlan] {
+abstract class SubqueryExpression(
+plan: LogicalPlan,
+children: Seq[Expression],
+exprId: ExprId) extends PlanExpression[LogicalPlan] {
+
+  override lazy val resolved: Boolean = childrenResolved && plan.resolved
+  override lazy val references: AttributeSet =
+if (plan.resolved) super.references -- plan.outputSet else 
super.references
   override def withNewPlan(plan: LogicalPlan): SubqueryExpression
+  override def semanticEquals(o: Expression): Boolean = o match {
+case p: SubqueryExpression =>
+  this.getClass.getName.equals(p.getClass.getName) && 
plan.sameResult(p.plan) &&
+children.length == p.children.length &&
+children.zip(p.children).forall(p => p._1.semanticEquals(p._2))
+case _ => false
+  }
 }
 
 object SubqueryExpression {
+  /**
+   * Returns true when an expression contains an IN or EXISTS subquery and 
false otherwise.
+   */
+  def hasInOrExistsSubquery(e: Expression): Boolean = {
+e.find {
+  case _: ListQuery | _: Exists => true
+  case _ => false
+}.isDefined
+  }
+
+  /**
+   * Returns true when an expression contains a subquery that has outer 
reference(s). The outer
+   * reference attributes are kept as children of subquery expression by
+   * [[org.apache.spark.sql.catalyst.analysis.Analyzer.ResolveSubquery]]
+   */
   def hasCorrelatedSubquery(e: Expression): Boolean = {
 e.find {
-  case e: SubqueryExpression if e.children.nonEmpty => true
+  case s: SubqueryExpression => s.children.nonEmpty
   case _ => false
 }.isDefined
   }
 }
 
+object SubExprUtils extends PredicateHelper {
--- End diff --

Ok, so the the `SubExprUtils` should be moved to the `OuterReference` 
companion :)...

Let's leave it for now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-03-13 Thread dilipbiswal
Github user dilipbiswal commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r105779054
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala
 ---
@@ -40,19 +43,189 @@ abstract class PlanExpression[T <: QueryPlan[_]] 
extends Expression {
 /**
  * A base interface for expressions that contain a [[LogicalPlan]].
  */
-abstract class SubqueryExpression extends PlanExpression[LogicalPlan] {
+abstract class SubqueryExpression(
+plan: LogicalPlan,
+children: Seq[Expression],
+exprId: ExprId) extends PlanExpression[LogicalPlan] {
+
+  override lazy val resolved: Boolean = childrenResolved && plan.resolved
+  override lazy val references: AttributeSet =
+if (plan.resolved) super.references -- plan.outputSet else 
super.references
   override def withNewPlan(plan: LogicalPlan): SubqueryExpression
+  override def semanticEquals(o: Expression): Boolean = o match {
+case p: SubqueryExpression =>
+  this.getClass.getName.equals(p.getClass.getName) && 
plan.sameResult(p.plan) &&
+children.length == p.children.length &&
+children.zip(p.children).forall(p => p._1.semanticEquals(p._2))
+case _ => false
+  }
 }
 
 object SubqueryExpression {
+  /**
+   * Returns true when an expression contains an IN or EXISTS subquery and 
false otherwise.
+   */
+  def hasInOrExistsSubquery(e: Expression): Boolean = {
+e.find {
+  case _: ListQuery | _: Exists => true
+  case _ => false
+}.isDefined
+  }
+
+  /**
+   * Returns true when an expression contains a subquery that has outer 
reference(s). The outer
+   * reference attributes are kept as children of subquery expression by
+   * [[org.apache.spark.sql.catalyst.analysis.Analyzer.ResolveSubquery]]
+   */
   def hasCorrelatedSubquery(e: Expression): Boolean = {
 e.find {
-  case e: SubqueryExpression if e.children.nonEmpty => true
+  case s: SubqueryExpression => s.children.nonEmpty
   case _ => false
 }.isDefined
   }
 }
 
+object SubExprUtils extends PredicateHelper {
+  /**
+   * Returns true when an expression contains correlated predicates i.e 
outer references and
+   * returns false otherwise.
+   */
+  def containsOuter(e: Expression): Boolean = {
+e.find(_.isInstanceOf[OuterReference]).isDefined
+  }
+
+  /**
+   * Returns whether there are any null-aware predicate subqueries inside 
Not. If not, we could
+   * turn the null-aware predicate into not-null-aware predicate.
+   */
+  def hasNullAwarePredicateWithinNot(condition: Expression): Boolean = {
+splitConjunctivePredicates(condition).exists {
+  case _: Exists | Not(_: Exists) | In(_, Seq(_: ListQuery)) | 
Not(In(_, Seq(_: ListQuery))) =>
+false
+  case e => e.find { x =>
+x.isInstanceOf[Not] && e.find {
+  case In(_, Seq(_: ListQuery)) => true
+  case _ => false
+}.isDefined
+  }.isDefined
+}
+
+  }
+
+  /**
+   * Returns an expression after removing the OuterReference shell.
+   */
+  def stripOuterReference(e: Expression): Expression = e.transform { case 
OuterReference(r) => r }
+
+  /**
+   * Returns the list of expressions after removing the OuterReference 
shell from each of
+   * the expression.
+   */
+  def stripOuterReferences(e: Seq[Expression]): Seq[Expression] = 
e.map(stripOuterReference)
+
+  /**
+   * Returns the logical plan after removing the OuterReference shell from 
all the expressions
+   * of the input logical plan.
+   */
+  def stripOuterReferences(p: LogicalPlan): LogicalPlan = {
+p.transformAllExpressions {
+  case OuterReference(a) => a
+}
+  }
+
+  /**
+   * Given a logical plan, returns TRUE if it has an outer reference and 
false otherwise.
+   */
+  def hasOuterReferences(plan: LogicalPlan): Boolean = {
+plan.find {
+  case f: Filter => containsOuter(f.condition)
+  case other => false
+}.isDefined
+  }
+
+  /**
+   * Given a list of expressions, returns the expressions which have outer 
references. Aggregate
+   * expressions are treated in a special way. If the children of 
aggregate expression contains an
+   * outer reference, then the entire aggregate expression is marked as an 
outer reference.
+   * Example (SQL):
+   * {{{
+   *   SELECT a FROM l GROUP by 1 HAVING EXISTS (SELECT 1 FROM r WHERE d < 
min(b))
+   * }}}
+   * In the above case, we want to mark the entire min(b) as an outer 
reference
+   * OuterReference(min(b)) instead of min(OuterReference(b)).

[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-03-13 Thread dilipbiswal
Github user dilipbiswal commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r105778315
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala
 ---
@@ -40,19 +43,189 @@ abstract class PlanExpression[T <: QueryPlan[_]] 
extends Expression {
 /**
  * A base interface for expressions that contain a [[LogicalPlan]].
  */
-abstract class SubqueryExpression extends PlanExpression[LogicalPlan] {
+abstract class SubqueryExpression(
+plan: LogicalPlan,
+children: Seq[Expression],
+exprId: ExprId) extends PlanExpression[LogicalPlan] {
+
+  override lazy val resolved: Boolean = childrenResolved && plan.resolved
+  override lazy val references: AttributeSet =
+if (plan.resolved) super.references -- plan.outputSet else 
super.references
   override def withNewPlan(plan: LogicalPlan): SubqueryExpression
+  override def semanticEquals(o: Expression): Boolean = o match {
+case p: SubqueryExpression =>
+  this.getClass.getName.equals(p.getClass.getName) && 
plan.sameResult(p.plan) &&
+children.length == p.children.length &&
+children.zip(p.children).forall(p => p._1.semanticEquals(p._2))
+case _ => false
+  }
 }
 
 object SubqueryExpression {
+  /**
+   * Returns true when an expression contains an IN or EXISTS subquery and 
false otherwise.
+   */
+  def hasInOrExistsSubquery(e: Expression): Boolean = {
+e.find {
+  case _: ListQuery | _: Exists => true
+  case _ => false
+}.isDefined
+  }
+
+  /**
+   * Returns true when an expression contains a subquery that has outer 
reference(s). The outer
+   * reference attributes are kept as children of subquery expression by
+   * [[org.apache.spark.sql.catalyst.analysis.Analyzer.ResolveSubquery]]
+   */
   def hasCorrelatedSubquery(e: Expression): Boolean = {
 e.find {
-  case e: SubqueryExpression if e.children.nonEmpty => true
+  case s: SubqueryExpression => s.children.nonEmpty
   case _ => false
 }.isDefined
   }
 }
 
+object SubExprUtils extends PredicateHelper {
--- End diff --

@hvanhovell Actually @nsyca and i had discussed a bit about this. 
SubqueryExpression object has methods that operates strictly in the context of 
SubqueryExpression where as the utils has methods that mostly deals with 
OuterReferences.. So they can operate on the subquery plans referred to from 
SubqueryExpression or may be in the future if we support  queries of the form.
```
select * from t1 left outer join (select * from t2 where t2.c1 = t1.c1) on 
…
```
If you think we should merge this two then let me know and i will do it 
:-). 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-03-13 Thread dilipbiswal
Github user dilipbiswal commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r105778426
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -1204,80 +1250,44 @@ class Analyzer(
 
 // Category 1:
 // BroadcastHint, Distinct, LeafNode, Repartition, and 
SubqueryAlias
-case p: BroadcastHint =>
-  p
-case p: Distinct =>
-  p
-case p: LeafNode =>
-  p
-case p: Repartition =>
-  p
-case p: SubqueryAlias =>
-  p
+case _: BroadcastHint | _: Distinct | _: LeafNode | _: Repartition 
| _: SubqueryAlias =>
 
 // Category 2:
 // These operators can be anywhere in a correlated subquery.
 // so long as they do not host outer references in the operators.
-case p: Sort =>
-  failOnOuterReference(p)
-  p
-case p: RepartitionByExpression =>
-  failOnOuterReference(p)
-  p
+case s: Sort =>
+  failOnOuterReference(s)
+case r: RepartitionByExpression =>
+  failOnOuterReference(r)
 
 // Category 3:
 // Filter is one of the two operators allowed to host correlated 
expressions.
 // The other operator is Join. Filter can be anywhere in a 
correlated subquery.
-case f @ Filter(cond, child) =>
+case f: Filter =>
   // Find all predicates with an outer reference.
-  val (correlated, local) = 
splitConjunctivePredicates(cond).partition(containsOuter)
+  val (correlated, _) = 
splitConjunctivePredicates(f.condition).partition(containsOuter)
 
   // Find any non-equality correlated predicates
   foundNonEqualCorrelatedPred = foundNonEqualCorrelatedPred || 
correlated.exists {
 case _: EqualTo | _: EqualNullSafe => false
 case _ => true
   }
-
-  // Rewrite the filter without the correlated predicates if any.
-  correlated match {
-case Nil => f
-case xs if local.nonEmpty =>
-  val newFilter = Filter(local.reduce(And), child)
-  predicateMap += newFilter -> xs
-  newFilter
-case xs =>
-  predicateMap += child -> xs
-  child
-  }
+  outerReferences ++= getOuterReferences(correlated)
--- End diff --

@hvanhovell Will do.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-03-13 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r105766906
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
 ---
@@ -365,17 +368,73 @@ object TypeCoercion {
   }
 
   /**
-   * Convert the value and in list expressions to the common operator type
-   * by looking at all the argument types and finding the closest one that
-   * all the arguments can be cast to. When no common operator type is 
found
-   * the original expression will be returned and an Analysis Exception 
will
-   * be raised at type checking phase.
+   * Handles type coercion for both IN expression with subquery and IN
+   * expressions without subquery.
+   * 1. In the first case, find the common type by comparing the left hand 
side (LHS)
+   *expression types against corresponding right hand side (RHS) 
expression derived
+   *from the subquery expression's plan output. Inject appropriate 
casts in the
+   *LHS and RHS side of IN expression.
+   *
+   * 2. In the second case, convert the value and in list expressions to 
the
+   *common operator type by looking at all the argument types and 
finding
+   *the closest one that all the arguments can be cast to. When no 
common
+   *operator type is found the original expression will be returned 
and an
+   *Analysis Exception will be raised at the type checking phase.
*/
   object InConversion extends Rule[LogicalPlan] {
 def apply(plan: LogicalPlan): LogicalPlan = plan resolveExpressions {
   // Skip nodes who's children have not been resolved yet.
   case e if !e.childrenResolved => e
 
+  // Handle type casting required between value expression and 
subquery output
+  // in IN subquery.
+  case i @ In(a, Seq(ListQuery(sub, children, exprId))) if !i.resolved 
=>
+// LHS is the value expression of IN subquery.
+val lhs = a match {
+  // Multi columns in IN clause is represented as a 
CreateNamedStruct.
+  // flatten the named struct to get the list of expressions.
+  case cns: CreateNamedStruct => cns.valExprs
+  case expr => Seq(expr)
+}
+
+// RHS is the subquery output.
+val rhs = sub.output
+require(lhs.length == rhs.length)
+
+val commonTypes = lhs.zip(rhs).flatMap { case (l, r) =>
+  findCommonTypeForBinaryComparison(l.dataType, r.dataType) match {
+case d @ Some(_) => d
+case _ => findTightestCommonType(l.dataType, r.dataType)
+  }
+}
+
+// The number of columns/expressions must match between LHS and 
RHS of an
+// IN subquery expression.
+if (commonTypes.length == lhs.length) {
+  val castedRhs = rhs.zip(commonTypes).map {
+case (e, dt) if e.dataType != dt => Alias(Cast(e, dt), 
e.name)()
+case (e, _) => e
+  }
+  val castedLhs = lhs.zip(commonTypes).map {
+case (e, dt) if e.dataType != dt => Cast(e, dt)
+case (e, _) => e
+  }
+
+  // Before constructing the In expression, wrap the multi values 
in LHS
+  // in a CreatedNamedStruct.
+  val newLhs = a match {
--- End diff --

My bad, I never compile these snippets. You have a point there. We could 
just use CreateStruct (since we really don't care about the name). So that 
would look something like this (again not compiled):
```scala
val newLhs = castedLhs match {
  case Seq(lhs) => lhs
  case _ => CreateStruct(castedLhs)
}
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-03-13 Thread dilipbiswal
Github user dilipbiswal commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r105734975
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala
 ---
@@ -40,19 +43,189 @@ abstract class PlanExpression[T <: QueryPlan[_]] 
extends Expression {
 /**
  * A base interface for expressions that contain a [[LogicalPlan]].
  */
-abstract class SubqueryExpression extends PlanExpression[LogicalPlan] {
+abstract class SubqueryExpression(
+plan: LogicalPlan,
+children: Seq[Expression],
+exprId: ExprId) extends PlanExpression[LogicalPlan] {
+
+  override lazy val resolved: Boolean = childrenResolved && plan.resolved
+  override lazy val references: AttributeSet =
+if (plan.resolved) super.references -- plan.outputSet else 
super.references
   override def withNewPlan(plan: LogicalPlan): SubqueryExpression
+  override def semanticEquals(o: Expression): Boolean = o match {
+case p: SubqueryExpression =>
+  this.getClass.getName.equals(p.getClass.getName) && 
plan.sameResult(p.plan) &&
+children.length == p.children.length &&
+children.zip(p.children).forall(p => p._1.semanticEquals(p._2))
+case _ => false
+  }
 }
 
 object SubqueryExpression {
+  /**
+   * Returns true when an expression contains an IN or EXISTS subquery and 
false otherwise.
+   */
+  def hasInOrExistsSubquery(e: Expression): Boolean = {
+e.find {
+  case _: ListQuery | _: Exists => true
+  case _ => false
+}.isDefined
+  }
+
+  /**
+   * Returns true when an expression contains a subquery that has outer 
reference(s). The outer
+   * reference attributes are kept as children of subquery expression by
+   * [[org.apache.spark.sql.catalyst.analysis.Analyzer.ResolveSubquery]]
+   */
   def hasCorrelatedSubquery(e: Expression): Boolean = {
 e.find {
-  case e: SubqueryExpression if e.children.nonEmpty => true
+  case s: SubqueryExpression => s.children.nonEmpty
   case _ => false
 }.isDefined
   }
 }
 
+object SubExprUtils extends PredicateHelper {
+  /**
+   * Returns true when an expression contains correlated predicates i.e 
outer references and
+   * returns false otherwise.
+   */
+  def containsOuter(e: Expression): Boolean = {
+e.find(_.isInstanceOf[OuterReference]).isDefined
+  }
+
+  /**
+   * Returns whether there are any null-aware predicate subqueries inside 
Not. If not, we could
+   * turn the null-aware predicate into not-null-aware predicate.
+   */
+  def hasNullAwarePredicateWithinNot(condition: Expression): Boolean = {
+splitConjunctivePredicates(condition).exists {
+  case _: Exists | Not(_: Exists) | In(_, Seq(_: ListQuery)) | 
Not(In(_, Seq(_: ListQuery))) =>
+false
+  case e => e.find { x =>
+x.isInstanceOf[Not] && e.find {
--- End diff --

@hvanhovell Sure. will do


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-03-13 Thread dilipbiswal
Github user dilipbiswal commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r105732875
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
 ---
@@ -365,17 +368,73 @@ object TypeCoercion {
   }
 
   /**
-   * Convert the value and in list expressions to the common operator type
-   * by looking at all the argument types and finding the closest one that
-   * all the arguments can be cast to. When no common operator type is 
found
-   * the original expression will be returned and an Analysis Exception 
will
-   * be raised at type checking phase.
+   * Handles type coercion for both IN expression with subquery and IN
+   * expressions without subquery.
+   * 1. In the first case, find the common type by comparing the left hand 
side (LHS)
+   *expression types against corresponding right hand side (RHS) 
expression derived
+   *from the subquery expression's plan output. Inject appropriate 
casts in the
+   *LHS and RHS side of IN expression.
+   *
+   * 2. In the second case, convert the value and in list expressions to 
the
+   *common operator type by looking at all the argument types and 
finding
+   *the closest one that all the arguments can be cast to. When no 
common
+   *operator type is found the original expression will be returned 
and an
+   *Analysis Exception will be raised at the type checking phase.
*/
   object InConversion extends Rule[LogicalPlan] {
 def apply(plan: LogicalPlan): LogicalPlan = plan resolveExpressions {
   // Skip nodes who's children have not been resolved yet.
   case e if !e.childrenResolved => e
 
+  // Handle type casting required between value expression and 
subquery output
+  // in IN subquery.
+  case i @ In(a, Seq(ListQuery(sub, children, exprId))) if !i.resolved 
=>
+// LHS is the value expression of IN subquery.
+val lhs = a match {
+  // Multi columns in IN clause is represented as a 
CreateNamedStruct.
+  // flatten the named struct to get the list of expressions.
+  case cns: CreateNamedStruct => cns.valExprs
+  case expr => Seq(expr)
+}
+
+// RHS is the subquery output.
+val rhs = sub.output
+require(lhs.length == rhs.length)
+
+val commonTypes = lhs.zip(rhs).flatMap { case (l, r) =>
+  findCommonTypeForBinaryComparison(l.dataType, r.dataType) match {
+case d @ Some(_) => d
+case _ => findTightestCommonType(l.dataType, r.dataType)
+  }
+}
+
+// The number of columns/expressions must match between LHS and 
RHS of an
+// IN subquery expression.
+if (commonTypes.length == lhs.length) {
+  val castedRhs = rhs.zip(commonTypes).map {
+case (e, dt) if e.dataType != dt => Alias(Cast(e, dt), 
e.name)()
+case (e, _) => e
+  }
+  val castedLhs = lhs.zip(commonTypes).map {
+case (e, dt) if e.dataType != dt => Cast(e, dt)
+case (e, _) => e
+  }
+
+  // Before constructing the In expression, wrap the multi values 
in LHS
+  // in a CreatedNamedStruct.
+  val newLhs = a match {
--- End diff --

@hvanhovell Can you please double check the above code ? I am unable to 
compile the code as cns is not defined. In this block, i was looking at the 
original left hand side expression to see if its a named struct and if so i 
construct a named struct back. castedLhs would always match a Seq(lhs) , no ? 
Please let me know if i am missing something here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-03-13 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r105724836
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala
 ---
@@ -40,19 +43,189 @@ abstract class PlanExpression[T <: QueryPlan[_]] 
extends Expression {
 /**
  * A base interface for expressions that contain a [[LogicalPlan]].
  */
-abstract class SubqueryExpression extends PlanExpression[LogicalPlan] {
+abstract class SubqueryExpression(
+plan: LogicalPlan,
+children: Seq[Expression],
+exprId: ExprId) extends PlanExpression[LogicalPlan] {
+
+  override lazy val resolved: Boolean = childrenResolved && plan.resolved
+  override lazy val references: AttributeSet =
+if (plan.resolved) super.references -- plan.outputSet else 
super.references
   override def withNewPlan(plan: LogicalPlan): SubqueryExpression
+  override def semanticEquals(o: Expression): Boolean = o match {
+case p: SubqueryExpression =>
+  this.getClass.getName.equals(p.getClass.getName) && 
plan.sameResult(p.plan) &&
+children.length == p.children.length &&
+children.zip(p.children).forall(p => p._1.semanticEquals(p._2))
+case _ => false
+  }
 }
 
 object SubqueryExpression {
+  /**
+   * Returns true when an expression contains an IN or EXISTS subquery and 
false otherwise.
+   */
+  def hasInOrExistsSubquery(e: Expression): Boolean = {
+e.find {
+  case _: ListQuery | _: Exists => true
+  case _ => false
+}.isDefined
+  }
+
+  /**
+   * Returns true when an expression contains a subquery that has outer 
reference(s). The outer
+   * reference attributes are kept as children of subquery expression by
+   * [[org.apache.spark.sql.catalyst.analysis.Analyzer.ResolveSubquery]]
+   */
   def hasCorrelatedSubquery(e: Expression): Boolean = {
 e.find {
-  case e: SubqueryExpression if e.children.nonEmpty => true
+  case s: SubqueryExpression => s.children.nonEmpty
   case _ => false
 }.isDefined
   }
 }
 
+object SubExprUtils extends PredicateHelper {
--- End diff --

Why make create two objects `SubqueryExpression` and `SubExprUtils`? Are 
they that different?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-03-13 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r105723410
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -1204,80 +1250,44 @@ class Analyzer(
 
 // Category 1:
 // BroadcastHint, Distinct, LeafNode, Repartition, and 
SubqueryAlias
-case p: BroadcastHint =>
-  p
-case p: Distinct =>
-  p
-case p: LeafNode =>
-  p
-case p: Repartition =>
-  p
-case p: SubqueryAlias =>
-  p
+case _: BroadcastHint | _: Distinct | _: LeafNode | _: Repartition 
| _: SubqueryAlias =>
 
 // Category 2:
 // These operators can be anywhere in a correlated subquery.
 // so long as they do not host outer references in the operators.
-case p: Sort =>
-  failOnOuterReference(p)
-  p
-case p: RepartitionByExpression =>
-  failOnOuterReference(p)
-  p
+case s: Sort =>
+  failOnOuterReference(s)
+case r: RepartitionByExpression =>
+  failOnOuterReference(r)
 
 // Category 3:
 // Filter is one of the two operators allowed to host correlated 
expressions.
 // The other operator is Join. Filter can be anywhere in a 
correlated subquery.
-case f @ Filter(cond, child) =>
+case f: Filter =>
   // Find all predicates with an outer reference.
-  val (correlated, local) = 
splitConjunctivePredicates(cond).partition(containsOuter)
+  val (correlated, _) = 
splitConjunctivePredicates(f.condition).partition(containsOuter)
 
   // Find any non-equality correlated predicates
   foundNonEqualCorrelatedPred = foundNonEqualCorrelatedPred || 
correlated.exists {
 case _: EqualTo | _: EqualNullSafe => false
 case _ => true
   }
-
-  // Rewrite the filter without the correlated predicates if any.
-  correlated match {
-case Nil => f
-case xs if local.nonEmpty =>
-  val newFilter = Filter(local.reduce(And), child)
-  predicateMap += newFilter -> xs
-  newFilter
-case xs =>
-  predicateMap += child -> xs
-  child
-  }
+  outerReferences ++= getOuterReferences(correlated)
--- End diff --

`getOuterReferences` is kind of magical in the sense that it also isolates 
the aggregate expression. Could you add a single line of comment to highlight 
this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-03-13 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r105719841
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala
 ---
@@ -40,19 +43,189 @@ abstract class PlanExpression[T <: QueryPlan[_]] 
extends Expression {
 /**
  * A base interface for expressions that contain a [[LogicalPlan]].
  */
-abstract class SubqueryExpression extends PlanExpression[LogicalPlan] {
+abstract class SubqueryExpression(
+plan: LogicalPlan,
+children: Seq[Expression],
+exprId: ExprId) extends PlanExpression[LogicalPlan] {
+
+  override lazy val resolved: Boolean = childrenResolved && plan.resolved
+  override lazy val references: AttributeSet =
+if (plan.resolved) super.references -- plan.outputSet else 
super.references
   override def withNewPlan(plan: LogicalPlan): SubqueryExpression
+  override def semanticEquals(o: Expression): Boolean = o match {
+case p: SubqueryExpression =>
+  this.getClass.getName.equals(p.getClass.getName) && 
plan.sameResult(p.plan) &&
+children.length == p.children.length &&
+children.zip(p.children).forall(p => p._1.semanticEquals(p._2))
+case _ => false
+  }
 }
 
 object SubqueryExpression {
+  /**
+   * Returns true when an expression contains an IN or EXISTS subquery and 
false otherwise.
+   */
+  def hasInOrExistsSubquery(e: Expression): Boolean = {
+e.find {
+  case _: ListQuery | _: Exists => true
+  case _ => false
+}.isDefined
+  }
+
+  /**
+   * Returns true when an expression contains a subquery that has outer 
reference(s). The outer
+   * reference attributes are kept as children of subquery expression by
+   * [[org.apache.spark.sql.catalyst.analysis.Analyzer.ResolveSubquery]]
+   */
   def hasCorrelatedSubquery(e: Expression): Boolean = {
 e.find {
-  case e: SubqueryExpression if e.children.nonEmpty => true
+  case s: SubqueryExpression => s.children.nonEmpty
   case _ => false
 }.isDefined
   }
 }
 
+object SubExprUtils extends PredicateHelper {
+  /**
+   * Returns true when an expression contains correlated predicates i.e 
outer references and
+   * returns false otherwise.
+   */
+  def containsOuter(e: Expression): Boolean = {
+e.find(_.isInstanceOf[OuterReference]).isDefined
+  }
+
+  /**
+   * Returns whether there are any null-aware predicate subqueries inside 
Not. If not, we could
+   * turn the null-aware predicate into not-null-aware predicate.
+   */
+  def hasNullAwarePredicateWithinNot(condition: Expression): Boolean = {
+splitConjunctivePredicates(condition).exists {
+  case _: Exists | Not(_: Exists) | In(_, Seq(_: ListQuery)) | 
Not(In(_, Seq(_: ListQuery))) =>
+false
+  case e => e.find { x =>
+x.isInstanceOf[Not] && e.find {
+  case In(_, Seq(_: ListQuery)) => true
+  case _ => false
+}.isDefined
+  }.isDefined
+}
+
+  }
+
+  /**
+   * Returns an expression after removing the OuterReference shell.
+   */
+  def stripOuterReference(e: Expression): Expression = e.transform { case 
OuterReference(r) => r }
+
+  /**
+   * Returns the list of expressions after removing the OuterReference 
shell from each of
+   * the expression.
+   */
+  def stripOuterReferences(e: Seq[Expression]): Seq[Expression] = 
e.map(stripOuterReference)
+
+  /**
+   * Returns the logical plan after removing the OuterReference shell from 
all the expressions
+   * of the input logical plan.
+   */
+  def stripOuterReferences(p: LogicalPlan): LogicalPlan = {
+p.transformAllExpressions {
+  case OuterReference(a) => a
+}
+  }
+
+  /**
+   * Given a logical plan, returns TRUE if it has an outer reference and 
false otherwise.
+   */
+  def hasOuterReferences(plan: LogicalPlan): Boolean = {
+plan.find {
+  case f: Filter => containsOuter(f.condition)
+  case other => false
+}.isDefined
+  }
+
+  /**
+   * Given a list of expressions, returns the expressions which have outer 
references. Aggregate
+   * expressions are treated in a special way. If the children of 
aggregate expression contains an
+   * outer reference, then the entire aggregate expression is marked as an 
outer reference.
+   * Example (SQL):
+   * {{{
+   *   SELECT a FROM l GROUP by 1 HAVING EXISTS (SELECT 1 FROM r WHERE d < 
min(b))
+   * }}}
+   * In the above case, we want to mark the entire min(b) as an outer 
reference
+   * OuterReference(min(b)) instead of min(OuterReference(b)).
 

[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-03-13 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r105718880
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala
 ---
@@ -40,19 +43,189 @@ abstract class PlanExpression[T <: QueryPlan[_]] 
extends Expression {
 /**
  * A base interface for expressions that contain a [[LogicalPlan]].
  */
-abstract class SubqueryExpression extends PlanExpression[LogicalPlan] {
+abstract class SubqueryExpression(
+plan: LogicalPlan,
+children: Seq[Expression],
+exprId: ExprId) extends PlanExpression[LogicalPlan] {
+
+  override lazy val resolved: Boolean = childrenResolved && plan.resolved
+  override lazy val references: AttributeSet =
+if (plan.resolved) super.references -- plan.outputSet else 
super.references
   override def withNewPlan(plan: LogicalPlan): SubqueryExpression
+  override def semanticEquals(o: Expression): Boolean = o match {
+case p: SubqueryExpression =>
+  this.getClass.getName.equals(p.getClass.getName) && 
plan.sameResult(p.plan) &&
+children.length == p.children.length &&
+children.zip(p.children).forall(p => p._1.semanticEquals(p._2))
+case _ => false
+  }
 }
 
 object SubqueryExpression {
+  /**
+   * Returns true when an expression contains an IN or EXISTS subquery and 
false otherwise.
+   */
+  def hasInOrExistsSubquery(e: Expression): Boolean = {
+e.find {
+  case _: ListQuery | _: Exists => true
+  case _ => false
+}.isDefined
+  }
+
+  /**
+   * Returns true when an expression contains a subquery that has outer 
reference(s). The outer
+   * reference attributes are kept as children of subquery expression by
+   * [[org.apache.spark.sql.catalyst.analysis.Analyzer.ResolveSubquery]]
+   */
   def hasCorrelatedSubquery(e: Expression): Boolean = {
 e.find {
-  case e: SubqueryExpression if e.children.nonEmpty => true
+  case s: SubqueryExpression => s.children.nonEmpty
   case _ => false
 }.isDefined
   }
 }
 
+object SubExprUtils extends PredicateHelper {
+  /**
+   * Returns true when an expression contains correlated predicates i.e 
outer references and
+   * returns false otherwise.
+   */
+  def containsOuter(e: Expression): Boolean = {
+e.find(_.isInstanceOf[OuterReference]).isDefined
+  }
+
+  /**
+   * Returns whether there are any null-aware predicate subqueries inside 
Not. If not, we could
+   * turn the null-aware predicate into not-null-aware predicate.
+   */
+  def hasNullAwarePredicateWithinNot(condition: Expression): Boolean = {
+splitConjunctivePredicates(condition).exists {
+  case _: Exists | Not(_: Exists) | In(_, Seq(_: ListQuery)) | 
Not(In(_, Seq(_: ListQuery))) =>
+false
+  case e => e.find { x =>
+x.isInstanceOf[Not] && e.find {
+  case In(_, Seq(_: ListQuery)) => true
+  case _ => false
+}.isDefined
+  }.isDefined
+}
+
+  }
+
+  /**
+   * Returns an expression after removing the OuterReference shell.
+   */
+  def stripOuterReference(e: Expression): Expression = e.transform { case 
OuterReference(r) => r }
+
+  /**
+   * Returns the list of expressions after removing the OuterReference 
shell from each of
+   * the expression.
+   */
+  def stripOuterReferences(e: Seq[Expression]): Seq[Expression] = 
e.map(stripOuterReference)
+
+  /**
+   * Returns the logical plan after removing the OuterReference shell from 
all the expressions
+   * of the input logical plan.
+   */
+  def stripOuterReferences(p: LogicalPlan): LogicalPlan = {
+p.transformAllExpressions {
+  case OuterReference(a) => a
+}
+  }
+
+  /**
+   * Given a logical plan, returns TRUE if it has an outer reference and 
false otherwise.
+   */
+  def hasOuterReferences(plan: LogicalPlan): Boolean = {
+plan.find {
+  case f: Filter => containsOuter(f.condition)
+  case other => false
+}.isDefined
+  }
+
+  /**
+   * Given a list of expressions, returns the expressions which have outer 
references. Aggregate
+   * expressions are treated in a special way. If the children of 
aggregate expression contains an
+   * outer reference, then the entire aggregate expression is marked as an 
outer reference.
+   * Example (SQL):
+   * {{{
+   *   SELECT a FROM l GROUP by 1 HAVING EXISTS (SELECT 1 FROM r WHERE d < 
min(b))
+   * }}}
+   * In the above case, we want to mark the entire min(b) as an outer 
reference
+   * OuterReference(min(b)) instead of min(OuterReference(b)).
 

[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-03-13 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r105717565
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala
 ---
@@ -40,19 +43,189 @@ abstract class PlanExpression[T <: QueryPlan[_]] 
extends Expression {
 /**
  * A base interface for expressions that contain a [[LogicalPlan]].
  */
-abstract class SubqueryExpression extends PlanExpression[LogicalPlan] {
+abstract class SubqueryExpression(
+plan: LogicalPlan,
+children: Seq[Expression],
+exprId: ExprId) extends PlanExpression[LogicalPlan] {
+
+  override lazy val resolved: Boolean = childrenResolved && plan.resolved
+  override lazy val references: AttributeSet =
+if (plan.resolved) super.references -- plan.outputSet else 
super.references
   override def withNewPlan(plan: LogicalPlan): SubqueryExpression
+  override def semanticEquals(o: Expression): Boolean = o match {
+case p: SubqueryExpression =>
+  this.getClass.getName.equals(p.getClass.getName) && 
plan.sameResult(p.plan) &&
+children.length == p.children.length &&
+children.zip(p.children).forall(p => p._1.semanticEquals(p._2))
+case _ => false
+  }
 }
 
 object SubqueryExpression {
+  /**
+   * Returns true when an expression contains an IN or EXISTS subquery and 
false otherwise.
+   */
+  def hasInOrExistsSubquery(e: Expression): Boolean = {
+e.find {
+  case _: ListQuery | _: Exists => true
+  case _ => false
+}.isDefined
+  }
+
+  /**
+   * Returns true when an expression contains a subquery that has outer 
reference(s). The outer
+   * reference attributes are kept as children of subquery expression by
+   * [[org.apache.spark.sql.catalyst.analysis.Analyzer.ResolveSubquery]]
+   */
   def hasCorrelatedSubquery(e: Expression): Boolean = {
 e.find {
-  case e: SubqueryExpression if e.children.nonEmpty => true
+  case s: SubqueryExpression => s.children.nonEmpty
   case _ => false
 }.isDefined
   }
 }
 
+object SubExprUtils extends PredicateHelper {
+  /**
+   * Returns true when an expression contains correlated predicates i.e 
outer references and
+   * returns false otherwise.
+   */
+  def containsOuter(e: Expression): Boolean = {
+e.find(_.isInstanceOf[OuterReference]).isDefined
+  }
+
+  /**
+   * Returns whether there are any null-aware predicate subqueries inside 
Not. If not, we could
+   * turn the null-aware predicate into not-null-aware predicate.
+   */
+  def hasNullAwarePredicateWithinNot(condition: Expression): Boolean = {
+splitConjunctivePredicates(condition).exists {
+  case _: Exists | Not(_: Exists) | In(_, Seq(_: ListQuery)) | 
Not(In(_, Seq(_: ListQuery))) =>
+false
+  case e => e.find { x =>
+x.isInstanceOf[Not] && e.find {
--- End diff --

This still looks fishy to me. It is also in the original code base, so we 
don't have to fix this now. Can you open a JIRA to track this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-03-13 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r105717078
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
 ---
@@ -123,19 +123,36 @@ case class Not(child: Expression)
  */
 @ExpressionDescription(
   usage = "expr1 _FUNC_(expr2, expr3, ...) - Returns true if `expr` equals 
to any valN.")
-case class In(value: Expression, list: Seq[Expression]) extends Predicate
-with ImplicitCastInputTypes {
+case class In(value: Expression, list: Seq[Expression]) extends Predicate {
 
   require(list != null, "list should not be null")
-
-  override def inputTypes: Seq[AbstractDataType] = value.dataType +: 
list.map(_.dataType)
-
   override def checkInputDataTypes(): TypeCheckResult = {
-if (list.exists(l => l.dataType != value.dataType)) {
-  TypeCheckResult.TypeCheckFailure(
-"Arguments must be same type")
-} else {
-  TypeCheckResult.TypeCheckSuccess
+list match {
+  case ListQuery(sub, _, _) :: Nil =>
+val valExprs = value match {
+  case cns: CreateNamedStruct => cns.valExprs
+  case expr => Seq(expr)
+}
+val isTypeMismatched = valExprs.zip(sub.output).exists {
+  case (l, r) => l.dataType != r.dataType
+}
+if (isTypeMismatched) {
+  TypeCheckResult.TypeCheckFailure(
+s"""
+  |The data type of one or more elements in the LHS of an IN 
subquery
+  |[${valExprs.map(_.dataType).mkString(", ")}]
--- End diff --

Use catalog strings please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-03-13 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r105716983
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
 ---
@@ -123,19 +123,36 @@ case class Not(child: Expression)
  */
 @ExpressionDescription(
   usage = "expr1 _FUNC_(expr2, expr3, ...) - Returns true if `expr` equals 
to any valN.")
-case class In(value: Expression, list: Seq[Expression]) extends Predicate
-with ImplicitCastInputTypes {
+case class In(value: Expression, list: Seq[Expression]) extends Predicate {
 
   require(list != null, "list should not be null")
-
-  override def inputTypes: Seq[AbstractDataType] = value.dataType +: 
list.map(_.dataType)
-
   override def checkInputDataTypes(): TypeCheckResult = {
-if (list.exists(l => l.dataType != value.dataType)) {
-  TypeCheckResult.TypeCheckFailure(
-"Arguments must be same type")
-} else {
-  TypeCheckResult.TypeCheckSuccess
+list match {
+  case ListQuery(sub, _, _) :: Nil =>
+val valExprs = value match {
+  case cns: CreateNamedStruct => cns.valExprs
+  case expr => Seq(expr)
+}
+val isTypeMismatched = valExprs.zip(sub.output).exists {
+  case (l, r) => l.dataType != r.dataType
+}
+if (isTypeMismatched) {
--- End diff --

It is better to display the offending columns. Now the user has to find the 
offending columns by herself/himself.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-03-13 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r105715265
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
 ---
@@ -365,17 +368,73 @@ object TypeCoercion {
   }
 
   /**
-   * Convert the value and in list expressions to the common operator type
-   * by looking at all the argument types and finding the closest one that
-   * all the arguments can be cast to. When no common operator type is 
found
-   * the original expression will be returned and an Analysis Exception 
will
-   * be raised at type checking phase.
+   * Handles type coercion for both IN expression with subquery and IN
+   * expressions without subquery.
+   * 1. In the first case, find the common type by comparing the left hand 
side (LHS)
+   *expression types against corresponding right hand side (RHS) 
expression derived
+   *from the subquery expression's plan output. Inject appropriate 
casts in the
+   *LHS and RHS side of IN expression.
+   *
+   * 2. In the second case, convert the value and in list expressions to 
the
+   *common operator type by looking at all the argument types and 
finding
+   *the closest one that all the arguments can be cast to. When no 
common
+   *operator type is found the original expression will be returned 
and an
+   *Analysis Exception will be raised at the type checking phase.
*/
   object InConversion extends Rule[LogicalPlan] {
 def apply(plan: LogicalPlan): LogicalPlan = plan resolveExpressions {
   // Skip nodes who's children have not been resolved yet.
   case e if !e.childrenResolved => e
 
+  // Handle type casting required between value expression and 
subquery output
+  // in IN subquery.
+  case i @ In(a, Seq(ListQuery(sub, children, exprId))) if !i.resolved 
=>
+// LHS is the value expression of IN subquery.
+val lhs = a match {
+  // Multi columns in IN clause is represented as a 
CreateNamedStruct.
+  // flatten the named struct to get the list of expressions.
+  case cns: CreateNamedStruct => cns.valExprs
+  case expr => Seq(expr)
+}
+
+// RHS is the subquery output.
+val rhs = sub.output
+require(lhs.length == rhs.length)
+
+val commonTypes = lhs.zip(rhs).flatMap { case (l, r) =>
+  findCommonTypeForBinaryComparison(l.dataType, r.dataType) match {
--- End diff --

Use `orElse`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-03-13 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r105714984
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
 ---
@@ -365,17 +385,66 @@ object TypeCoercion {
   }
 
   /**
-   * Convert the value and in list expressions to the common operator type
-   * by looking at all the argument types and finding the closest one that
-   * all the arguments can be cast to. When no common operator type is 
found
-   * the original expression will be returned and an Analysis Exception 
will
-   * be raised at type checking phase.
+   * Handles type coercion for both IN expression with subquery and IN
+   * expressions without subquery.
+   * 1. In the first case, find the common type by comparing the left hand 
side
+   *expression types against corresponding right hand side expression 
derived
+   *from the subquery expression's plan output. Inject appropriate 
casts in the
+   *LHS and RHS side of IN expression.
+   *
+   * 2. In the second case, convert the value and in list expressions to 
the
+   *common operator type by looking at all the argument types and 
finding
+   *the closest one that all the arguments can be cast to. When no 
common
+   *operator type is found the original expression will be returned 
and an
+   *Analysis Exception will be raised at the type checking phase.
*/
   object InConversion extends Rule[LogicalPlan] {
 def apply(plan: LogicalPlan): LogicalPlan = plan resolveExpressions {
   // Skip nodes who's children have not been resolved yet.
   case e if !e.childrenResolved => e
 
+  // Handle type casting required between value expression and 
subquery output
+  // in IN subquery.
+  case i @ In(a, Seq(ListQuery(sub, children, exprId))) if !i.resolved 
=>
--- End diff --

Maybe we should create a special InSubQuery expression. This looks like a 
lot of work for what we are doing here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-03-13 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r105716052
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
 ---
@@ -365,17 +368,73 @@ object TypeCoercion {
   }
 
   /**
-   * Convert the value and in list expressions to the common operator type
-   * by looking at all the argument types and finding the closest one that
-   * all the arguments can be cast to. When no common operator type is 
found
-   * the original expression will be returned and an Analysis Exception 
will
-   * be raised at type checking phase.
+   * Handles type coercion for both IN expression with subquery and IN
+   * expressions without subquery.
+   * 1. In the first case, find the common type by comparing the left hand 
side (LHS)
+   *expression types against corresponding right hand side (RHS) 
expression derived
+   *from the subquery expression's plan output. Inject appropriate 
casts in the
+   *LHS and RHS side of IN expression.
+   *
+   * 2. In the second case, convert the value and in list expressions to 
the
+   *common operator type by looking at all the argument types and 
finding
+   *the closest one that all the arguments can be cast to. When no 
common
+   *operator type is found the original expression will be returned 
and an
+   *Analysis Exception will be raised at the type checking phase.
*/
   object InConversion extends Rule[LogicalPlan] {
 def apply(plan: LogicalPlan): LogicalPlan = plan resolveExpressions {
   // Skip nodes who's children have not been resolved yet.
   case e if !e.childrenResolved => e
 
+  // Handle type casting required between value expression and 
subquery output
+  // in IN subquery.
+  case i @ In(a, Seq(ListQuery(sub, children, exprId))) if !i.resolved 
=>
+// LHS is the value expression of IN subquery.
+val lhs = a match {
+  // Multi columns in IN clause is represented as a 
CreateNamedStruct.
+  // flatten the named struct to get the list of expressions.
+  case cns: CreateNamedStruct => cns.valExprs
+  case expr => Seq(expr)
+}
+
+// RHS is the subquery output.
+val rhs = sub.output
+require(lhs.length == rhs.length)
+
+val commonTypes = lhs.zip(rhs).flatMap { case (l, r) =>
+  findCommonTypeForBinaryComparison(l.dataType, r.dataType) match {
+case d @ Some(_) => d
+case _ => findTightestCommonType(l.dataType, r.dataType)
+  }
+}
+
+// The number of columns/expressions must match between LHS and 
RHS of an
+// IN subquery expression.
+if (commonTypes.length == lhs.length) {
+  val castedRhs = rhs.zip(commonTypes).map {
+case (e, dt) if e.dataType != dt => Alias(Cast(e, dt), 
e.name)()
+case (e, _) => e
+  }
+  val castedLhs = lhs.zip(commonTypes).map {
+case (e, dt) if e.dataType != dt => Cast(e, dt)
+case (e, _) => e
+  }
+
+  // Before constructing the In expression, wrap the multi values 
in LHS
+  // in a CreatedNamedStruct.
+  val newLhs = a match {
--- End diff --

I think it is better if you cast on the `castedLhs` here:
```scala
val newLhs = castedLhs match {
  case Seq(lhs) => lhs
  case _ =>
CreateNamedStruct(cns.nameExprs.zip(castedLhs).flatMap {
  case (name, value) => Seq(name, value)
})
}
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-03-13 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r105714546
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
 ---
@@ -365,17 +368,73 @@ object TypeCoercion {
   }
 
   /**
-   * Convert the value and in list expressions to the common operator type
-   * by looking at all the argument types and finding the closest one that
-   * all the arguments can be cast to. When no common operator type is 
found
-   * the original expression will be returned and an Analysis Exception 
will
-   * be raised at type checking phase.
+   * Handles type coercion for both IN expression with subquery and IN
+   * expressions without subquery.
+   * 1. In the first case, find the common type by comparing the left hand 
side (LHS)
+   *expression types against corresponding right hand side (RHS) 
expression derived
+   *from the subquery expression's plan output. Inject appropriate 
casts in the
+   *LHS and RHS side of IN expression.
+   *
+   * 2. In the second case, convert the value and in list expressions to 
the
+   *common operator type by looking at all the argument types and 
finding
+   *the closest one that all the arguments can be cast to. When no 
common
+   *operator type is found the original expression will be returned 
and an
+   *Analysis Exception will be raised at the type checking phase.
*/
   object InConversion extends Rule[LogicalPlan] {
 def apply(plan: LogicalPlan): LogicalPlan = plan resolveExpressions {
   // Skip nodes who's children have not been resolved yet.
   case e if !e.childrenResolved => e
 
+  // Handle type casting required between value expression and 
subquery output
+  // in IN subquery.
+  case i @ In(a, Seq(ListQuery(sub, children, exprId))) if !i.resolved 
=>
+// LHS is the value expression of IN subquery.
+val lhs = a match {
+  // Multi columns in IN clause is represented as a 
CreateNamedStruct.
+  // flatten the named struct to get the list of expressions.
+  case cns: CreateNamedStruct => cns.valExprs
+  case expr => Seq(expr)
+}
+
+// RHS is the subquery output.
+val rhs = sub.output
+require(lhs.length == rhs.length)
--- End diff --

Please check this in the guard of the rule. This currently throws a very 
hard to understand error.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-03-13 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r105712270
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
 ---
@@ -109,6 +109,26 @@ object TypeCoercion {
   }
 
   /**
+   * This function determines the target type of a comparison operator 
when one operand
+   * is a String and the other is not. It also handles when one op is a 
Date and the
+   * other is a Timestamp by making the target type to be String. 
Currently this is used
+   * to coerce types between LHS and RHS of the IN expression.
+   */
+  val findCommonTypeForBinaryComparison: (DataType, DataType) => 
Option[DataType] = {
+case (StringType, DateType) => Some(StringType)
+case (DateType, StringType) => Some(StringType)
+case (StringType, TimestampType) => Some(StringType)
+case (TimestampType, StringType) => Some(StringType)
+case (TimestampType, DateType) => Some(StringType)
--- End diff --

O wait, you have changed it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-03-13 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r105711886
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
 ---
@@ -109,6 +109,26 @@ object TypeCoercion {
   }
 
   /**
+   * This function determines the target type of a comparison operator 
when one operand
+   * is a String and the other is not. It also handles when one op is a 
Date and the
+   * other is a Timestamp by making the target type to be String. 
Currently this is used
+   * to coerce types between LHS and RHS of the IN expression.
+   */
+  val findCommonTypeForBinaryComparison: (DataType, DataType) => 
Option[DataType] = {
+case (StringType, DateType) => Some(StringType)
+case (DateType, StringType) => Some(StringType)
+case (StringType, TimestampType) => Some(StringType)
+case (TimestampType, StringType) => Some(StringType)
+case (TimestampType, DateType) => Some(StringType)
--- End diff --

Ok please do this in a follow-up


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-03-04 Thread dilipbiswal
Github user dilipbiswal commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r104301109
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
 ---
@@ -109,6 +109,26 @@ object TypeCoercion {
   }
 
   /**
+   * This function determines the target type of a comparison operator 
when one operand
+   * is a String and the other is not. It also handles when one op is a 
Date and the
+   * other is a Timestamp by making the target type to be String. 
Currently this is used
+   * to coerce types between LHS and RHS of the IN expression.
+   */
+  val findCommonTypeForBinaryComparison: (DataType, DataType) => 
Option[DataType] = {
+case (StringType, DateType) => Some(StringType)
+case (DateType, StringType) => Some(StringType)
+case (StringType, TimestampType) => Some(StringType)
+case (TimestampType, StringType) => Some(StringType)
+case (TimestampType, DateType) => Some(StringType)
--- End diff --

@hvanhovell Thanks!!. I had tried to do this before as well as this came up 
during the internal review. I have made another try. Please let me know what 
you think.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-03-04 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r104287344
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
 ---
@@ -109,6 +109,26 @@ object TypeCoercion {
   }
 
   /**
+   * This function determines the target type of a comparison operator 
when one operand
+   * is a String and the other is not. It also handles when one op is a 
Date and the
+   * other is a Timestamp by making the target type to be String. 
Currently this is used
+   * to coerce types between LHS and RHS of the IN expression.
+   */
+  val findCommonTypeForBinaryComparison: (DataType, DataType) => 
Option[DataType] = {
+case (StringType, DateType) => Some(StringType)
+case (DateType, StringType) => Some(StringType)
+case (StringType, TimestampType) => Some(StringType)
+case (TimestampType, StringType) => Some(StringType)
+case (TimestampType, DateType) => Some(StringType)
--- End diff --

Shouldn't we factor that code out then? Now we have the same logic in two 
places.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-03-02 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r104063308
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -21,12 +21,13 @@ import scala.annotation.tailrec
 import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.{CatalystConf, ScalaReflection, 
SimpleCatalystConf, TableIdentifier}
+import org.apache.spark.sql.catalyst._
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.encoders.OuterScopes
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate._
 import org.apache.spark.sql.catalyst.expressions.objects.NewInstance
+import org.apache.spark.sql.catalyst.expressions.SubExprUtils._
--- End diff --

Ok


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-03-02 Thread dilipbiswal
Github user dilipbiswal commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r103995282
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -21,12 +21,13 @@ import scala.annotation.tailrec
 import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.{CatalystConf, ScalaReflection, 
SimpleCatalystConf, TableIdentifier}
+import org.apache.spark.sql.catalyst._
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.encoders.OuterScopes
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate._
 import org.apache.spark.sql.catalyst.expressions.objects.NewInstance
+import org.apache.spark.sql.catalyst.expressions.SubExprUtils._
--- End diff --

@gatorsmile There is another place in 
analyzer(updateOuterReferencesInSubquery) that also uses SubExprUtils. Thats 
why i moved it to the main imports .. What do you think ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-03-02 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r103985902
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala
 ---
@@ -40,19 +43,194 @@ abstract class PlanExpression[T <: QueryPlan[_]] 
extends Expression {
 /**
  * A base interface for expressions that contain a [[LogicalPlan]].
  */
-abstract class SubqueryExpression extends PlanExpression[LogicalPlan] {
+abstract class SubqueryExpression(
+plan: LogicalPlan,
+children: Seq[Expression],
+exprId: ExprId) extends PlanExpression[LogicalPlan] {
+
+  override lazy val resolved: Boolean = childrenResolved && plan.resolved
+  override lazy val references: AttributeSet =
+if (plan.resolved) super.references -- plan.outputSet else 
super.references
   override def withNewPlan(plan: LogicalPlan): SubqueryExpression
+  override def semanticEquals(o: Expression): Boolean = o match {
+case p: SubqueryExpression =>
+  this.getClass.getName.equals(p.getClass.getName) && 
plan.sameResult(p.plan) &&
+children.length == p.children.length &&
+children.zip(p.children).forall(p => p._1.semanticEquals(p._2))
+case _ => false
+  }
 }
 
 object SubqueryExpression {
+  /**
+   * Returns true when an expression contains an IN or EXISTS subquery and 
false otherwise.
+   */
+  def hasInOrExistsSubquery(e: Expression): Boolean = {
+e.find {
+  case _: ListQuery | _: Exists => true
+  case _ => false
+}.isDefined
+  }
+
+  /**
+   * Returns true when an expression contains a subquery that has outer 
reference(s). The outer
+   * reference attributes are kept as children of subquery expression by
+   * [[org.apache.spark.sql.catalyst.analysis.Analyzer.ResolveSubquery]]
+   */
   def hasCorrelatedSubquery(e: Expression): Boolean = {
 e.find {
-  case e: SubqueryExpression if e.children.nonEmpty => true
+  case s: SubqueryExpression => s.children.nonEmpty
   case _ => false
 }.isDefined
   }
 }
 
+object SubExprUtils extends PredicateHelper {
+  /**
+   * Returns true when an expression contains correlated predicates i.e 
outer references and
+   * returns false otherwise.
+   */
+  def containsOuter(e: Expression): Boolean = {
+e.find(_.isInstanceOf[OuterReference]).isDefined
+  }
+
+  /**
+   * Returns whether there are any null-aware predicate subqueries inside 
Not. If not, we could
+   * turn the null-aware predicate into not-null-aware predicate.
+   */
+  def hasNullAwarePredicateWithinNot(condition: Expression): Boolean = {
+splitConjunctivePredicates(condition).exists {
+  case _: Exists | Not(_: Exists) | In(_, Seq(_: ListQuery)) | 
Not(In(_, Seq(_: ListQuery))) =>
+false
+  case e => e.find { x =>
+x.isInstanceOf[Not] && e.find {
+  case In(_, Seq(_: ListQuery)) => true
+  case _ => false
+}.isDefined
+  }.isDefined
+}
+
+  }
+
+  /**
+   * Returns an expression after removing the OuterReference shell.
+   */
+  def stripOuterReference(e: Expression): Expression = {
+e.transform {
+  case OuterReference(r) => r
+}
+  }
+
+  /**
+   * Returns the list of expressions after removing the OuterReference 
shell from each of
+   * the expression.
+   */
+  def stripOuterReferences(e: Seq[Expression]): Seq[Expression] = 
e.map(stripOuterReference)
+
+  /**
+   * Returns the logical plan after removing the OuterReference shell from 
all the expressions
+   * of the input logical plan.
+   */
+  def stripOuterReferences(p: LogicalPlan): LogicalPlan = {
+p.transformAllExpressions {
+  case OuterReference(a) => a
+}
+  }
+
+  /**
+   * Given a logical plan, returns TRUE if it has an outer reference and 
false otherwise.
+   */
+  def hasOuterReferences(plan: LogicalPlan): Boolean = {
+plan.find {
+  case f: Filter => containsOuter(f.condition)
+  case other => false
+}.isDefined
+  }
+
+  /**
+   * Given a list of expressions, returns the expressions which have outer 
references. Aggregate
+   * expressions are treated in a special way. If the children of 
aggregate expression contains an
+   * outer reference, then the entire aggregate expression is marked as an 
outer reference.
+   * Example (SQL):
+   * {{{
+   *   SELECT a FROM l GROUP by 1 HAVING EXISTS (SELECT 1 FROM r WHERE d < 
min(b))
+   * }}}
+   * In the above case, we want to mark the entire min(b) as an outer 
reference
+   * 

[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-03-02 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r103984027
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala
 ---
@@ -40,19 +43,194 @@ abstract class PlanExpression[T <: QueryPlan[_]] 
extends Expression {
 /**
  * A base interface for expressions that contain a [[LogicalPlan]].
  */
-abstract class SubqueryExpression extends PlanExpression[LogicalPlan] {
+abstract class SubqueryExpression(
+plan: LogicalPlan,
+children: Seq[Expression],
+exprId: ExprId) extends PlanExpression[LogicalPlan] {
+
+  override lazy val resolved: Boolean = childrenResolved && plan.resolved
+  override lazy val references: AttributeSet =
+if (plan.resolved) super.references -- plan.outputSet else 
super.references
   override def withNewPlan(plan: LogicalPlan): SubqueryExpression
+  override def semanticEquals(o: Expression): Boolean = o match {
+case p: SubqueryExpression =>
+  this.getClass.getName.equals(p.getClass.getName) && 
plan.sameResult(p.plan) &&
+children.length == p.children.length &&
+children.zip(p.children).forall(p => p._1.semanticEquals(p._2))
+case _ => false
+  }
 }
 
 object SubqueryExpression {
+  /**
+   * Returns true when an expression contains an IN or EXISTS subquery and 
false otherwise.
+   */
+  def hasInOrExistsSubquery(e: Expression): Boolean = {
+e.find {
+  case _: ListQuery | _: Exists => true
+  case _ => false
+}.isDefined
+  }
+
+  /**
+   * Returns true when an expression contains a subquery that has outer 
reference(s). The outer
+   * reference attributes are kept as children of subquery expression by
+   * [[org.apache.spark.sql.catalyst.analysis.Analyzer.ResolveSubquery]]
+   */
   def hasCorrelatedSubquery(e: Expression): Boolean = {
 e.find {
-  case e: SubqueryExpression if e.children.nonEmpty => true
+  case s: SubqueryExpression => s.children.nonEmpty
   case _ => false
 }.isDefined
   }
 }
 
+object SubExprUtils extends PredicateHelper {
+  /**
+   * Returns true when an expression contains correlated predicates i.e 
outer references and
+   * returns false otherwise.
+   */
+  def containsOuter(e: Expression): Boolean = {
+e.find(_.isInstanceOf[OuterReference]).isDefined
+  }
+
+  /**
+   * Returns whether there are any null-aware predicate subqueries inside 
Not. If not, we could
+   * turn the null-aware predicate into not-null-aware predicate.
+   */
+  def hasNullAwarePredicateWithinNot(condition: Expression): Boolean = {
+splitConjunctivePredicates(condition).exists {
+  case _: Exists | Not(_: Exists) | In(_, Seq(_: ListQuery)) | 
Not(In(_, Seq(_: ListQuery))) =>
+false
+  case e => e.find { x =>
+x.isInstanceOf[Not] && e.find {
+  case In(_, Seq(_: ListQuery)) => true
+  case _ => false
+}.isDefined
+  }.isDefined
+}
+
+  }
+
+  /**
+   * Returns an expression after removing the OuterReference shell.
+   */
+  def stripOuterReference(e: Expression): Expression = {
+e.transform {
+  case OuterReference(r) => r
+}
+  }
--- End diff --

```
def stripOuterReference(e: Expression): Expression = e.transform { case 
OuterReference(r) => r }
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-03-02 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r103983068
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
 ---
@@ -200,14 +191,9 @@ trait CheckAnalysis extends PredicateHelper {
   s"filter expression '${f.condition.sql}' " +
 s"of type ${f.condition.dataType.simpleString} is not a 
boolean.")
 
-  case f @ Filter(condition, child) =>
-splitConjunctivePredicates(condition).foreach {
-  case _: PredicateSubquery | Not(_: PredicateSubquery) =>
-  case e if 
PredicateSubquery.hasNullAwarePredicateWithinNot(e) =>
-failAnalysis(s"Null-aware predicate sub-queries cannot be 
used in nested" +
-  s" conditions: $e")
-  case e =>
-}
+  case Filter(condition, _) if 
hasNullAwarePredicateWithinNot(condition) =>
+failAnalysis(s"Null-aware predicate sub-queries cannot be used 
in nested" +
+  " conditions: $condition")
--- End diff --

```
failAnalysis("Null-aware predicate sub-queries cannot be used in nested " +
  s"conditions: $condition")
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-03-02 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r103981569
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -1204,64 +1250,36 @@ class Analyzer(
 
 // Category 1:
 // BroadcastHint, Distinct, LeafNode, Repartition, and 
SubqueryAlias
-case p: BroadcastHint =>
-  p
-case p: Distinct =>
-  p
-case p: LeafNode =>
-  p
-case p: Repartition =>
-  p
-case p: SubqueryAlias =>
-  p
+case _: BroadcastHint | _: Distinct | _: LeafNode | _: Repartition 
| _: SubqueryAlias =>
 
 // Category 2:
 // These operators can be anywhere in a correlated subquery.
 // so long as they do not host outer references in the operators.
 case p: Sort =>
   failOnOuterReference(p)
-  p
 case p: RepartitionByExpression =>
   failOnOuterReference(p)
-  p
 
 // Category 3:
 // Filter is one of the two operators allowed to host correlated 
expressions.
 // The other operator is Join. Filter can be anywhere in a 
correlated subquery.
 case f @ Filter(cond, child) =>
   // Find all predicates with an outer reference.
-  val (correlated, local) = 
splitConjunctivePredicates(cond).partition(containsOuter)
+  val (correlated, local) =
+splitConjunctivePredicates(cond).partition(containsOuter)
 
   // Find any non-equality correlated predicates
   foundNonEqualCorrelatedPred = foundNonEqualCorrelatedPred || 
correlated.exists {
 case _: EqualTo | _: EqualNullSafe => false
 case _ => true
   }
-
-  // Rewrite the filter without the correlated predicates if any.
-  correlated match {
-case Nil => f
-case xs if local.nonEmpty =>
-  val newFilter = Filter(local.reduce(And), child)
-  predicateMap += newFilter -> xs
-  newFilter
-case xs =>
-  predicateMap += child -> xs
-  child
-  }
+  outerReferences ++= getOuterReferences(correlated)
 
 // Project cannot host any correlated expressions
 // but can be anywhere in a correlated subquery.
 case p @ Project(expressions, child) =>
--- End diff --

`case p: Project =>`. The same issue in the following `Aggregate`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-03-02 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r103982372
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -1317,62 +1327,24 @@ class Analyzer(
 // Generator with join=false is treated as Category 4.
 case p @ Generate(generator, true, _, _, _, _) =>
--- End diff --

`case g: Generate if g.join =>`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-03-02 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r103981872
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -1204,64 +1250,36 @@ class Analyzer(
 
 // Category 1:
 // BroadcastHint, Distinct, LeafNode, Repartition, and 
SubqueryAlias
-case p: BroadcastHint =>
-  p
-case p: Distinct =>
-  p
-case p: LeafNode =>
-  p
-case p: Repartition =>
-  p
-case p: SubqueryAlias =>
-  p
+case _: BroadcastHint | _: Distinct | _: LeafNode | _: Repartition 
| _: SubqueryAlias =>
 
 // Category 2:
 // These operators can be anywhere in a correlated subquery.
 // so long as they do not host outer references in the operators.
 case p: Sort =>
   failOnOuterReference(p)
-  p
 case p: RepartitionByExpression =>
   failOnOuterReference(p)
-  p
 
 // Category 3:
 // Filter is one of the two operators allowed to host correlated 
expressions.
 // The other operator is Join. Filter can be anywhere in a 
correlated subquery.
 case f @ Filter(cond, child) =>
--- End diff --

`case f: Filter =>` 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-03-02 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r103980778
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -1395,42 +1367,45 @@ class Analyzer(
 }
   } while (!current.resolved && !current.fastEquals(previous))
 
-  // Step 2: Pull out the predicates if the plan is resolved.
+  // Step 2: pull the outer references and record them as children of 
SubqueryExpression
--- End diff --

`pull ` -> `Pull`. Also revert back the orignal msg `if the plan is 
resolved.`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-03-02 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r103980267
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -1204,64 +1250,36 @@ class Analyzer(
 
 // Category 1:
 // BroadcastHint, Distinct, LeafNode, Repartition, and 
SubqueryAlias
-case p: BroadcastHint =>
-  p
-case p: Distinct =>
-  p
-case p: LeafNode =>
-  p
-case p: Repartition =>
-  p
-case p: SubqueryAlias =>
-  p
+case _: BroadcastHint | _: Distinct | _: LeafNode | _: Repartition 
| _: SubqueryAlias =>
 
 // Category 2:
 // These operators can be anywhere in a correlated subquery.
 // so long as they do not host outer references in the operators.
 case p: Sort =>
   failOnOuterReference(p)
-  p
 case p: RepartitionByExpression =>
   failOnOuterReference(p)
-  p
 
 // Category 3:
 // Filter is one of the two operators allowed to host correlated 
expressions.
 // The other operator is Join. Filter can be anywhere in a 
correlated subquery.
 case f @ Filter(cond, child) =>
   // Find all predicates with an outer reference.
-  val (correlated, local) = 
splitConjunctivePredicates(cond).partition(containsOuter)
+  val (correlated, local) =
+splitConjunctivePredicates(cond).partition(containsOuter)
--- End diff --

`val (correlated, local) = 
splitConjunctivePredicates(cond).partition(containsOuter)`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-03-02 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r103979911
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -21,12 +21,13 @@ import scala.annotation.tailrec
 import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.{CatalystConf, ScalaReflection, 
SimpleCatalystConf, TableIdentifier}
+import org.apache.spark.sql.catalyst._
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.encoders.OuterScopes
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate._
 import org.apache.spark.sql.catalyst.expressions.objects.NewInstance
+import org.apache.spark.sql.catalyst.expressions.SubExprUtils._
--- End diff --

Move it here? 
https://github.com/dilipbiswal/spark/blob/e43ac08c71c6634b4c1e2cb98bd278a7e36846dc/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala#L1148


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-03-02 Thread dilipbiswal
Github user dilipbiswal commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r103881081
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -1172,20 +1226,11 @@ class Analyzer(
 }
   }
 
-  /** Determine which correlated predicate references are missing from 
this plan. */
-  def missingReferences(p: LogicalPlan): AttributeSet = {
-val localPredicateReferences = p.collect(predicateMap)
-  .flatten
-  .map(_.references)
-  .reduceOption(_ ++ _)
-  .getOrElse(AttributeSet.empty)
-localPredicateReferences -- p.outputSet
-  }
-
   var foundNonEqualCorrelatedPred : Boolean = false
 
-  // Simplify the predicates before pulling them out.
-  val transformed = BooleanSimplification(sub) transformUp {
+  // Simplify the predicates before validating any unsupported 
correlation patterns
+  // in the plan.
+  BooleanSimplification(sub).foreachUp {
--- End diff --

@gatorsmile Currently the code relies on this. I will need to investigate 
this and would like to address this in a followup.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-03-01 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r103855766
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -1172,20 +1226,11 @@ class Analyzer(
 }
   }
 
-  /** Determine which correlated predicate references are missing from 
this plan. */
-  def missingReferences(p: LogicalPlan): AttributeSet = {
-val localPredicateReferences = p.collect(predicateMap)
-  .flatten
-  .map(_.references)
-  .reduceOption(_ ++ _)
-  .getOrElse(AttributeSet.empty)
-localPredicateReferences -- p.outputSet
-  }
-
   var foundNonEqualCorrelatedPred : Boolean = false
 
-  // Simplify the predicates before pulling them out.
-  val transformed = BooleanSimplification(sub) transformUp {
+  // Simplify the predicates before validating any unsupported 
correlation patterns
+  // in the plan.
+  BooleanSimplification(sub).foreachUp {
--- End diff --

You should also remove the import of `BooleanSimplification`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-03-01 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r103855573
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -1208,63 +1253,39 @@ class Analyzer(
 // Category 1:
 // BroadcastHint, Distinct, LeafNode, Repartition, and 
SubqueryAlias
 case p: BroadcastHint =>
-  p
 case p: Distinct =>
-  p
 case p: LeafNode =>
-  p
 case p: Repartition =>
-  p
 case p: SubqueryAlias =>
-  p
 
 // Category 2:
 // These operators can be anywhere in a correlated subquery.
 // so long as they do not host outer references in the operators.
 case p: Sort =>
   failOnOuterReference(p)
-  p
 case p: RepartitionByExpression =>
   failOnOuterReference(p)
-  p
 
 // Category 3:
 // Filter is one of the two operators allowed to host correlated 
expressions.
 // The other operator is Join. Filter can be anywhere in a 
correlated subquery.
 case f @ Filter(cond, child) =>
   // Find all predicates with an outer reference.
-  val (correlated, local) = 
splitConjunctivePredicates(cond).partition(containsOuter)
+  val (correlated, local) =
+
splitConjunctivePredicates(cond).partition(SubExprUtils.containsOuter)
 
   // Find any non-equality correlated predicates
   foundNonEqualCorrelatedPred = foundNonEqualCorrelatedPred || 
correlated.exists {
 case _: EqualTo | _: EqualNullSafe => false
 case _ => true
   }
-
-  // Rewrite the filter without the correlated predicates if any.
-  correlated match {
-case Nil => f
-case xs if local.nonEmpty =>
-  val newFilter = Filter(local.reduce(And), child)
-  predicateMap += newFilter -> xs
-  newFilter
-case xs =>
-  predicateMap += child -> xs
-  child
-  }
+  outerReferences ++= SubExprUtils.getOuterReferences(correlated)
--- End diff --

After the import, we also can clean up this to `outerReferences ++= 
getOuterReferences(correlated)`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-03-01 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r103855544
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -1208,63 +1253,39 @@ class Analyzer(
 // Category 1:
 // BroadcastHint, Distinct, LeafNode, Repartition, and 
SubqueryAlias
 case p: BroadcastHint =>
-  p
 case p: Distinct =>
-  p
 case p: LeafNode =>
-  p
 case p: Repartition =>
-  p
 case p: SubqueryAlias =>
-  p
 
 // Category 2:
 // These operators can be anywhere in a correlated subquery.
 // so long as they do not host outer references in the operators.
 case p: Sort =>
   failOnOuterReference(p)
-  p
 case p: RepartitionByExpression =>
   failOnOuterReference(p)
-  p
 
 // Category 3:
 // Filter is one of the two operators allowed to host correlated 
expressions.
 // The other operator is Join. Filter can be anywhere in a 
correlated subquery.
 case f @ Filter(cond, child) =>
   // Find all predicates with an outer reference.
-  val (correlated, local) = 
splitConjunctivePredicates(cond).partition(containsOuter)
+  val (correlated, local) =
+
splitConjunctivePredicates(cond).partition(SubExprUtils.containsOuter)
--- End diff --

Let us `import org.apache.spark.sql.catalyst.expressions.SubExprUtils._` at 
the beginning of the rule of `ResolveSubquery`

Thus, no change is needed after the import.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-03-01 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r103855200
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -1172,20 +1226,11 @@ class Analyzer(
 }
   }
 
-  /** Determine which correlated predicate references are missing from 
this plan. */
-  def missingReferences(p: LogicalPlan): AttributeSet = {
-val localPredicateReferences = p.collect(predicateMap)
-  .flatten
-  .map(_.references)
-  .reduceOption(_ ++ _)
-  .getOrElse(AttributeSet.empty)
-localPredicateReferences -- p.outputSet
-  }
-
   var foundNonEqualCorrelatedPred : Boolean = false
 
-  // Simplify the predicates before pulling them out.
-  val transformed = BooleanSimplification(sub) transformUp {
+  // Simplify the predicates before validating any unsupported 
correlation patterns
+  // in the plan.
+  BooleanSimplification(sub).foreachUp {
--- End diff --

`BooleanSimplification ` is not needed after the changes of this PR


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-03-01 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r103854902
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -1208,63 +1253,39 @@ class Analyzer(
 // Category 1:
 // BroadcastHint, Distinct, LeafNode, Repartition, and 
SubqueryAlias
 case p: BroadcastHint =>
-  p
 case p: Distinct =>
-  p
 case p: LeafNode =>
-  p
 case p: Repartition =>
-  p
 case p: SubqueryAlias =>
--- End diff --

`case _: BroadcastHint | _: Distinct | _: LeafNode | _: Repartition | _: 
SubqueryAlias =>`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-03-01 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r103854544
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
 ---
@@ -297,8 +284,11 @@ trait CheckAnalysis extends PredicateHelper {
 s"Correlated scalar sub-queries can only be used in a 
Filter/Aggregate/Project: $p")
 }
 
-  case p if 
p.expressions.exists(PredicateSubquery.hasPredicateSubquery) =>
-failAnalysis(s"Predicate sub-queries can only be used in a 
Filter: $p")
+  case p if 
p.expressions.exists(SubqueryExpression.hasInOrExistsSubquery) =>
+p match {
+  case _: Filter => // Ok
+  case other => failAnalysis(s"Predicate sub-queries can only 
be used in a Filter: $p")
--- End diff --

`case _ => failAnalysis(`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-03-01 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r103853851
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala
 ---
@@ -79,57 +251,10 @@ case class ScalarSubquery(
 object ScalarSubquery {
   def hasCorrelatedScalarSubquery(e: Expression): Boolean = {
 e.find {
-  case e: ScalarSubquery if e.children.nonEmpty => true
-  case _ => false
-}.isDefined
-  }
-}
-
-/**
- * A predicate subquery checks the existence of a value in a sub-query. We 
currently only allow
- * [[PredicateSubquery]] expressions within a Filter plan (i.e. WHERE or a 
HAVING clause). This will
- * be rewritten into a left semi/anti join during analysis.
- */
-case class PredicateSubquery(
-plan: LogicalPlan,
-children: Seq[Expression] = Seq.empty,
-nullAware: Boolean = false,
-exprId: ExprId = NamedExpression.newExprId)
-  extends SubqueryExpression with Predicate with Unevaluable {
-  override lazy val resolved = childrenResolved && plan.resolved
-  override lazy val references: AttributeSet = super.references -- 
plan.outputSet
-  override def nullable: Boolean = nullAware
-  override def withNewPlan(plan: LogicalPlan): PredicateSubquery = 
copy(plan = plan)
-  override def semanticEquals(o: Expression): Boolean = o match {
-case p: PredicateSubquery =>
-  plan.sameResult(p.plan) && nullAware == p.nullAware &&
-children.length == p.children.length &&
-children.zip(p.children).forall(p => p._1.semanticEquals(p._2))
-case _ => false
-  }
-  override def toString: String = s"predicate-subquery#${exprId.id} 
$conditionString"
-}
-
-object PredicateSubquery {
-  def hasPredicateSubquery(e: Expression): Boolean = {
-e.find {
-  case _: PredicateSubquery | _: ListQuery | _: Exists => true
+  case s: ScalarSubquery if s.children.nonEmpty => true
--- End diff --

`case s: ScalarSubquery => s.children.nonEmpty`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-03-01 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r103853428
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala
 ---
@@ -83,29 +95,150 @@ object RewritePredicateSubquery extends 
Rule[LogicalPlan] with PredicateHelper {
   }
 
   /**
-   * Given a predicate expression and an input plan, it rewrites
-   * any embedded existential sub-query into an existential join.
-   * It returns the rewritten expression together with the updated plan.
-   * Currently, it does not support null-aware joins. Embedded NOT IN 
predicates
-   * are blocked in the Analyzer.
+   * Given a predicate expression and an input plan, it rewrites any 
embedded existential sub-query
+   * into an existential join. It returns the rewritten expression 
together with the updated plan.
+   * Currently, it does not support NOT IN nested inside a NOT expression. 
This case is blocked in
+   * the Analyzer.
*/
   private def rewriteExistentialExpr(
   exprs: Seq[Expression],
   plan: LogicalPlan): (Option[Expression], LogicalPlan) = {
 var newPlan = plan
 val newExprs = exprs.map { e =>
   e transformUp {
-case PredicateSubquery(sub, conditions, nullAware, _) =>
-  // TODO: support null-aware join
+case Exists(sub, conditions, exprId) =>
   val exists = AttributeReference("exists", BooleanType, nullable 
= false)()
-  newPlan = Join(newPlan, sub, ExistenceJoin(exists), 
conditions.reduceLeftOption(And))
+  newPlan = Join(newPlan, sub,
+ExistenceJoin(exists), conditions.reduceLeftOption(And))
   exists
-}
+case In(e, Seq(l@ ListQuery(sub, conditions, exprId))) =>
+  val exists = AttributeReference("exists", BooleanType, nullable 
= false)()
+  val inConditions = 
getValueExpression(e).zip(sub.output).map(EqualTo.tupled)
+  newPlan = Join(newPlan, sub,
+ExistenceJoin(exists), (inConditions ++ 
conditions).reduceLeftOption(And))
+  exists
+  }
 }
 (newExprs.reduceOption(And), newPlan)
   }
 }
 
+ /**
+  * Pull out all (outer) correlated predicates from a given subquery. This 
method removes the
+  * correlated predicates from subquery [[Filter]]s and adds the 
references of these predicates
+  * to all intermediate [[Project]] and [[Aggregate]] clauses (if they are 
missing) in order to
+  * be able to evaluate the predicates at the top level.
+  *
+  * TODO: Look to merge this rule with RewritePredicateSubquery.
+  */
+object PullupCorrelatedPredicates extends Rule[LogicalPlan] with 
PredicateHelper {
+   /**
+* Returns the correlated predicates and a updated plan that removes 
the outer references.
+*/
+  private def pullOutCorrelatedPredicates(
+  sub: LogicalPlan,
+  outer: Seq[LogicalPlan]): (LogicalPlan, Seq[Expression]) = {
+val predicateMap = scala.collection.mutable.Map.empty[LogicalPlan, 
Seq[Expression]]
+
+/** Determine which correlated predicate references are missing from 
this plan. */
+def missingReferences(p: LogicalPlan): AttributeSet = {
+  val localPredicateReferences = p.collect(predicateMap)
+.flatten
+.map(_.references)
+.reduceOption(_ ++ _)
+.getOrElse(AttributeSet.empty)
+  localPredicateReferences -- p.outputSet
+}
+
+// Simplify the predicates before pulling them out.
+val transformed = BooleanSimplification(sub) transformUp {
+  case f @ Filter(cond, child) =>
+val (correlated, local) =
+  
splitConjunctivePredicates(cond).partition(SubExprUtils.containsOuter)
+
+// Rewrite the filter without the correlated predicates if any.
+correlated match {
+  case Nil => f
+  case xs if local.nonEmpty =>
+val newFilter = Filter(local.reduce(And), child)
+predicateMap += newFilter -> xs
+newFilter
+  case xs =>
+predicateMap += child -> xs
+child
+}
+  case p @ Project(expressions, child) =>
+val referencesToAdd = missingReferences(p)
+if (referencesToAdd.nonEmpty) {
+  Project(expressions ++ referencesToAdd, child)
+} else {
+  p
+}
+  case a @ Aggregate(grouping, expressions, child) =>
+val referencesToAdd = missingReferences(a)
+if (referencesToAdd.nonEmpty) {
+  Aggregate(grouping ++ referencesToAdd, expressions ++ 
referencesToAdd, child)
+} else {
+  a
+  

[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-03-01 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r103853389
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala
 ---
@@ -83,29 +95,150 @@ object RewritePredicateSubquery extends 
Rule[LogicalPlan] with PredicateHelper {
   }
 
   /**
-   * Given a predicate expression and an input plan, it rewrites
-   * any embedded existential sub-query into an existential join.
-   * It returns the rewritten expression together with the updated plan.
-   * Currently, it does not support null-aware joins. Embedded NOT IN 
predicates
-   * are blocked in the Analyzer.
+   * Given a predicate expression and an input plan, it rewrites any 
embedded existential sub-query
+   * into an existential join. It returns the rewritten expression 
together with the updated plan.
+   * Currently, it does not support NOT IN nested inside a NOT expression. 
This case is blocked in
+   * the Analyzer.
*/
   private def rewriteExistentialExpr(
   exprs: Seq[Expression],
   plan: LogicalPlan): (Option[Expression], LogicalPlan) = {
 var newPlan = plan
 val newExprs = exprs.map { e =>
   e transformUp {
-case PredicateSubquery(sub, conditions, nullAware, _) =>
-  // TODO: support null-aware join
+case Exists(sub, conditions, exprId) =>
   val exists = AttributeReference("exists", BooleanType, nullable 
= false)()
-  newPlan = Join(newPlan, sub, ExistenceJoin(exists), 
conditions.reduceLeftOption(And))
+  newPlan = Join(newPlan, sub,
+ExistenceJoin(exists), conditions.reduceLeftOption(And))
   exists
-}
+case In(e, Seq(l@ ListQuery(sub, conditions, exprId))) =>
+  val exists = AttributeReference("exists", BooleanType, nullable 
= false)()
+  val inConditions = 
getValueExpression(e).zip(sub.output).map(EqualTo.tupled)
+  newPlan = Join(newPlan, sub,
+ExistenceJoin(exists), (inConditions ++ 
conditions).reduceLeftOption(And))
+  exists
+  }
 }
 (newExprs.reduceOption(And), newPlan)
   }
 }
 
+ /**
+  * Pull out all (outer) correlated predicates from a given subquery. This 
method removes the
+  * correlated predicates from subquery [[Filter]]s and adds the 
references of these predicates
+  * to all intermediate [[Project]] and [[Aggregate]] clauses (if they are 
missing) in order to
+  * be able to evaluate the predicates at the top level.
+  *
+  * TODO: Look to merge this rule with RewritePredicateSubquery.
+  */
+object PullupCorrelatedPredicates extends Rule[LogicalPlan] with 
PredicateHelper {
+   /**
+* Returns the correlated predicates and a updated plan that removes 
the outer references.
+*/
+  private def pullOutCorrelatedPredicates(
+  sub: LogicalPlan,
+  outer: Seq[LogicalPlan]): (LogicalPlan, Seq[Expression]) = {
+val predicateMap = scala.collection.mutable.Map.empty[LogicalPlan, 
Seq[Expression]]
+
+/** Determine which correlated predicate references are missing from 
this plan. */
+def missingReferences(p: LogicalPlan): AttributeSet = {
+  val localPredicateReferences = p.collect(predicateMap)
+.flatten
+.map(_.references)
+.reduceOption(_ ++ _)
+.getOrElse(AttributeSet.empty)
+  localPredicateReferences -- p.outputSet
+}
+
+// Simplify the predicates before pulling them out.
+val transformed = BooleanSimplification(sub) transformUp {
+  case f @ Filter(cond, child) =>
+val (correlated, local) =
+  
splitConjunctivePredicates(cond).partition(SubExprUtils.containsOuter)
+
+// Rewrite the filter without the correlated predicates if any.
+correlated match {
+  case Nil => f
+  case xs if local.nonEmpty =>
+val newFilter = Filter(local.reduce(And), child)
+predicateMap += newFilter -> xs
+newFilter
+  case xs =>
+predicateMap += child -> xs
+child
+}
+  case p @ Project(expressions, child) =>
+val referencesToAdd = missingReferences(p)
+if (referencesToAdd.nonEmpty) {
+  Project(expressions ++ referencesToAdd, child)
+} else {
+  p
+}
+  case a @ Aggregate(grouping, expressions, child) =>
+val referencesToAdd = missingReferences(a)
+if (referencesToAdd.nonEmpty) {
+  Aggregate(grouping ++ referencesToAdd, expressions ++ 
referencesToAdd, child)
+} else {
+  a
+  

[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-03-01 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r103853275
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala
 ---
@@ -83,29 +95,150 @@ object RewritePredicateSubquery extends 
Rule[LogicalPlan] with PredicateHelper {
   }
 
   /**
-   * Given a predicate expression and an input plan, it rewrites
-   * any embedded existential sub-query into an existential join.
-   * It returns the rewritten expression together with the updated plan.
-   * Currently, it does not support null-aware joins. Embedded NOT IN 
predicates
-   * are blocked in the Analyzer.
+   * Given a predicate expression and an input plan, it rewrites any 
embedded existential sub-query
+   * into an existential join. It returns the rewritten expression 
together with the updated plan.
+   * Currently, it does not support NOT IN nested inside a NOT expression. 
This case is blocked in
+   * the Analyzer.
*/
   private def rewriteExistentialExpr(
   exprs: Seq[Expression],
   plan: LogicalPlan): (Option[Expression], LogicalPlan) = {
 var newPlan = plan
 val newExprs = exprs.map { e =>
   e transformUp {
-case PredicateSubquery(sub, conditions, nullAware, _) =>
-  // TODO: support null-aware join
+case Exists(sub, conditions, exprId) =>
   val exists = AttributeReference("exists", BooleanType, nullable 
= false)()
-  newPlan = Join(newPlan, sub, ExistenceJoin(exists), 
conditions.reduceLeftOption(And))
+  newPlan = Join(newPlan, sub,
+ExistenceJoin(exists), conditions.reduceLeftOption(And))
   exists
-}
+case In(e, Seq(l@ ListQuery(sub, conditions, exprId))) =>
+  val exists = AttributeReference("exists", BooleanType, nullable 
= false)()
+  val inConditions = 
getValueExpression(e).zip(sub.output).map(EqualTo.tupled)
+  newPlan = Join(newPlan, sub,
+ExistenceJoin(exists), (inConditions ++ 
conditions).reduceLeftOption(And))
+  exists
+  }
 }
 (newExprs.reduceOption(And), newPlan)
   }
 }
 
+ /**
+  * Pull out all (outer) correlated predicates from a given subquery. This 
method removes the
+  * correlated predicates from subquery [[Filter]]s and adds the 
references of these predicates
+  * to all intermediate [[Project]] and [[Aggregate]] clauses (if they are 
missing) in order to
+  * be able to evaluate the predicates at the top level.
+  *
+  * TODO: Look to merge this rule with RewritePredicateSubquery.
+  */
+object PullupCorrelatedPredicates extends Rule[LogicalPlan] with 
PredicateHelper {
+   /**
+* Returns the correlated predicates and a updated plan that removes 
the outer references.
+*/
+  private def pullOutCorrelatedPredicates(
+  sub: LogicalPlan,
+  outer: Seq[LogicalPlan]): (LogicalPlan, Seq[Expression]) = {
+val predicateMap = scala.collection.mutable.Map.empty[LogicalPlan, 
Seq[Expression]]
+
+/** Determine which correlated predicate references are missing from 
this plan. */
+def missingReferences(p: LogicalPlan): AttributeSet = {
+  val localPredicateReferences = p.collect(predicateMap)
+.flatten
+.map(_.references)
+.reduceOption(_ ++ _)
+.getOrElse(AttributeSet.empty)
+  localPredicateReferences -- p.outputSet
+}
+
+// Simplify the predicates before pulling them out.
+val transformed = BooleanSimplification(sub) transformUp {
+  case f @ Filter(cond, child) =>
+val (correlated, local) =
+  
splitConjunctivePredicates(cond).partition(SubExprUtils.containsOuter)
+
+// Rewrite the filter without the correlated predicates if any.
+correlated match {
+  case Nil => f
+  case xs if local.nonEmpty =>
+val newFilter = Filter(local.reduce(And), child)
+predicateMap += newFilter -> xs
+newFilter
+  case xs =>
+predicateMap += child -> xs
+child
+}
+  case p @ Project(expressions, child) =>
+val referencesToAdd = missingReferences(p)
+if (referencesToAdd.nonEmpty) {
+  Project(expressions ++ referencesToAdd, child)
+} else {
+  p
+}
+  case a @ Aggregate(grouping, expressions, child) =>
+val referencesToAdd = missingReferences(a)
+if (referencesToAdd.nonEmpty) {
+  Aggregate(grouping ++ referencesToAdd, expressions ++ 
referencesToAdd, child)
+} else {
+  a
+  

[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-03-01 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r103853209
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala
 ---
@@ -83,29 +95,150 @@ object RewritePredicateSubquery extends 
Rule[LogicalPlan] with PredicateHelper {
   }
 
   /**
-   * Given a predicate expression and an input plan, it rewrites
-   * any embedded existential sub-query into an existential join.
-   * It returns the rewritten expression together with the updated plan.
-   * Currently, it does not support null-aware joins. Embedded NOT IN 
predicates
-   * are blocked in the Analyzer.
+   * Given a predicate expression and an input plan, it rewrites any 
embedded existential sub-query
+   * into an existential join. It returns the rewritten expression 
together with the updated plan.
+   * Currently, it does not support NOT IN nested inside a NOT expression. 
This case is blocked in
+   * the Analyzer.
*/
   private def rewriteExistentialExpr(
   exprs: Seq[Expression],
   plan: LogicalPlan): (Option[Expression], LogicalPlan) = {
 var newPlan = plan
 val newExprs = exprs.map { e =>
   e transformUp {
-case PredicateSubquery(sub, conditions, nullAware, _) =>
-  // TODO: support null-aware join
+case Exists(sub, conditions, exprId) =>
   val exists = AttributeReference("exists", BooleanType, nullable 
= false)()
-  newPlan = Join(newPlan, sub, ExistenceJoin(exists), 
conditions.reduceLeftOption(And))
+  newPlan = Join(newPlan, sub,
+ExistenceJoin(exists), conditions.reduceLeftOption(And))
   exists
-}
+case In(e, Seq(l@ ListQuery(sub, conditions, exprId))) =>
+  val exists = AttributeReference("exists", BooleanType, nullable 
= false)()
+  val inConditions = 
getValueExpression(e).zip(sub.output).map(EqualTo.tupled)
+  newPlan = Join(newPlan, sub,
+ExistenceJoin(exists), (inConditions ++ 
conditions).reduceLeftOption(And))
+  exists
+  }
 }
 (newExprs.reduceOption(And), newPlan)
   }
 }
 
+ /**
+  * Pull out all (outer) correlated predicates from a given subquery. This 
method removes the
+  * correlated predicates from subquery [[Filter]]s and adds the 
references of these predicates
+  * to all intermediate [[Project]] and [[Aggregate]] clauses (if they are 
missing) in order to
+  * be able to evaluate the predicates at the top level.
+  *
+  * TODO: Look to merge this rule with RewritePredicateSubquery.
+  */
+object PullupCorrelatedPredicates extends Rule[LogicalPlan] with 
PredicateHelper {
+   /**
+* Returns the correlated predicates and a updated plan that removes 
the outer references.
+*/
+  private def pullOutCorrelatedPredicates(
+  sub: LogicalPlan,
+  outer: Seq[LogicalPlan]): (LogicalPlan, Seq[Expression]) = {
+val predicateMap = scala.collection.mutable.Map.empty[LogicalPlan, 
Seq[Expression]]
+
+/** Determine which correlated predicate references are missing from 
this plan. */
+def missingReferences(p: LogicalPlan): AttributeSet = {
+  val localPredicateReferences = p.collect(predicateMap)
+.flatten
+.map(_.references)
+.reduceOption(_ ++ _)
+.getOrElse(AttributeSet.empty)
+  localPredicateReferences -- p.outputSet
+}
+
+// Simplify the predicates before pulling them out.
+val transformed = BooleanSimplification(sub) transformUp {
+  case f @ Filter(cond, child) =>
+val (correlated, local) =
+  
splitConjunctivePredicates(cond).partition(SubExprUtils.containsOuter)
+
+// Rewrite the filter without the correlated predicates if any.
+correlated match {
+  case Nil => f
+  case xs if local.nonEmpty =>
+val newFilter = Filter(local.reduce(And), child)
+predicateMap += newFilter -> xs
+newFilter
+  case xs =>
+predicateMap += child -> xs
+child
+}
+  case p @ Project(expressions, child) =>
+val referencesToAdd = missingReferences(p)
+if (referencesToAdd.nonEmpty) {
+  Project(expressions ++ referencesToAdd, child)
+} else {
+  p
+}
+  case a @ Aggregate(grouping, expressions, child) =>
+val referencesToAdd = missingReferences(a)
+if (referencesToAdd.nonEmpty) {
+  Aggregate(grouping ++ referencesToAdd, expressions ++ 
referencesToAdd, child)
+} else {
+  a
+  

[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-03-01 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r103852913
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala
 ---
@@ -83,29 +95,150 @@ object RewritePredicateSubquery extends 
Rule[LogicalPlan] with PredicateHelper {
   }
 
   /**
-   * Given a predicate expression and an input plan, it rewrites
-   * any embedded existential sub-query into an existential join.
-   * It returns the rewritten expression together with the updated plan.
-   * Currently, it does not support null-aware joins. Embedded NOT IN 
predicates
-   * are blocked in the Analyzer.
+   * Given a predicate expression and an input plan, it rewrites any 
embedded existential sub-query
+   * into an existential join. It returns the rewritten expression 
together with the updated plan.
+   * Currently, it does not support NOT IN nested inside a NOT expression. 
This case is blocked in
+   * the Analyzer.
*/
   private def rewriteExistentialExpr(
   exprs: Seq[Expression],
   plan: LogicalPlan): (Option[Expression], LogicalPlan) = {
 var newPlan = plan
 val newExprs = exprs.map { e =>
   e transformUp {
-case PredicateSubquery(sub, conditions, nullAware, _) =>
-  // TODO: support null-aware join
+case Exists(sub, conditions, exprId) =>
   val exists = AttributeReference("exists", BooleanType, nullable 
= false)()
-  newPlan = Join(newPlan, sub, ExistenceJoin(exists), 
conditions.reduceLeftOption(And))
+  newPlan = Join(newPlan, sub,
+ExistenceJoin(exists), conditions.reduceLeftOption(And))
   exists
-}
+case In(e, Seq(l@ ListQuery(sub, conditions, exprId))) =>
+  val exists = AttributeReference("exists", BooleanType, nullable 
= false)()
+  val inConditions = 
getValueExpression(e).zip(sub.output).map(EqualTo.tupled)
+  newPlan = Join(newPlan, sub,
+ExistenceJoin(exists), (inConditions ++ 
conditions).reduceLeftOption(And))
+  exists
+  }
 }
 (newExprs.reduceOption(And), newPlan)
   }
 }
 
+ /**
+  * Pull out all (outer) correlated predicates from a given subquery. This 
method removes the
+  * correlated predicates from subquery [[Filter]]s and adds the 
references of these predicates
+  * to all intermediate [[Project]] and [[Aggregate]] clauses (if they are 
missing) in order to
+  * be able to evaluate the predicates at the top level.
+  *
+  * TODO: Look to merge this rule with RewritePredicateSubquery.
+  */
+object PullupCorrelatedPredicates extends Rule[LogicalPlan] with 
PredicateHelper {
+   /**
+* Returns the correlated predicates and a updated plan that removes 
the outer references.
+*/
+  private def pullOutCorrelatedPredicates(
+  sub: LogicalPlan,
+  outer: Seq[LogicalPlan]): (LogicalPlan, Seq[Expression]) = {
+val predicateMap = scala.collection.mutable.Map.empty[LogicalPlan, 
Seq[Expression]]
+
+/** Determine which correlated predicate references are missing from 
this plan. */
+def missingReferences(p: LogicalPlan): AttributeSet = {
+  val localPredicateReferences = p.collect(predicateMap)
+.flatten
+.map(_.references)
+.reduceOption(_ ++ _)
+.getOrElse(AttributeSet.empty)
+  localPredicateReferences -- p.outputSet
+}
+
+// Simplify the predicates before pulling them out.
+val transformed = BooleanSimplification(sub) transformUp {
+  case f @ Filter(cond, child) =>
+val (correlated, local) =
+  
splitConjunctivePredicates(cond).partition(SubExprUtils.containsOuter)
+
+// Rewrite the filter without the correlated predicates if any.
+correlated match {
+  case Nil => f
+  case xs if local.nonEmpty =>
+val newFilter = Filter(local.reduce(And), child)
+predicateMap += newFilter -> xs
+newFilter
+  case xs =>
+predicateMap += child -> xs
+child
+}
+  case p @ Project(expressions, child) =>
+val referencesToAdd = missingReferences(p)
+if (referencesToAdd.nonEmpty) {
+  Project(expressions ++ referencesToAdd, child)
+} else {
+  p
+}
+  case a @ Aggregate(grouping, expressions, child) =>
+val referencesToAdd = missingReferences(a)
+if (referencesToAdd.nonEmpty) {
+  Aggregate(grouping ++ referencesToAdd, expressions ++ 
referencesToAdd, child)
+} else {
+  a
+  

[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-03-01 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r103852419
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala
 ---
@@ -83,29 +95,150 @@ object RewritePredicateSubquery extends 
Rule[LogicalPlan] with PredicateHelper {
   }
 
   /**
-   * Given a predicate expression and an input plan, it rewrites
-   * any embedded existential sub-query into an existential join.
-   * It returns the rewritten expression together with the updated plan.
-   * Currently, it does not support null-aware joins. Embedded NOT IN 
predicates
-   * are blocked in the Analyzer.
+   * Given a predicate expression and an input plan, it rewrites any 
embedded existential sub-query
+   * into an existential join. It returns the rewritten expression 
together with the updated plan.
+   * Currently, it does not support NOT IN nested inside a NOT expression. 
This case is blocked in
+   * the Analyzer.
*/
   private def rewriteExistentialExpr(
   exprs: Seq[Expression],
   plan: LogicalPlan): (Option[Expression], LogicalPlan) = {
 var newPlan = plan
 val newExprs = exprs.map { e =>
   e transformUp {
-case PredicateSubquery(sub, conditions, nullAware, _) =>
-  // TODO: support null-aware join
+case Exists(sub, conditions, exprId) =>
   val exists = AttributeReference("exists", BooleanType, nullable 
= false)()
-  newPlan = Join(newPlan, sub, ExistenceJoin(exists), 
conditions.reduceLeftOption(And))
+  newPlan = Join(newPlan, sub,
+ExistenceJoin(exists), conditions.reduceLeftOption(And))
   exists
-}
+case In(e, Seq(l@ ListQuery(sub, conditions, exprId))) =>
+  val exists = AttributeReference("exists", BooleanType, nullable 
= false)()
+  val inConditions = 
getValueExpression(e).zip(sub.output).map(EqualTo.tupled)
--- End diff --

`getValueExpression(e)` -> `getValueExpression(value)`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-03-01 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r103852353
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala
 ---
@@ -83,29 +95,150 @@ object RewritePredicateSubquery extends 
Rule[LogicalPlan] with PredicateHelper {
   }
 
   /**
-   * Given a predicate expression and an input plan, it rewrites
-   * any embedded existential sub-query into an existential join.
-   * It returns the rewritten expression together with the updated plan.
-   * Currently, it does not support null-aware joins. Embedded NOT IN 
predicates
-   * are blocked in the Analyzer.
+   * Given a predicate expression and an input plan, it rewrites any 
embedded existential sub-query
+   * into an existential join. It returns the rewritten expression 
together with the updated plan.
+   * Currently, it does not support NOT IN nested inside a NOT expression. 
This case is blocked in
+   * the Analyzer.
*/
   private def rewriteExistentialExpr(
   exprs: Seq[Expression],
   plan: LogicalPlan): (Option[Expression], LogicalPlan) = {
 var newPlan = plan
 val newExprs = exprs.map { e =>
   e transformUp {
-case PredicateSubquery(sub, conditions, nullAware, _) =>
-  // TODO: support null-aware join
+case Exists(sub, conditions, exprId) =>
   val exists = AttributeReference("exists", BooleanType, nullable 
= false)()
-  newPlan = Join(newPlan, sub, ExistenceJoin(exists), 
conditions.reduceLeftOption(And))
+  newPlan = Join(newPlan, sub,
+ExistenceJoin(exists), conditions.reduceLeftOption(And))
   exists
-}
+case In(e, Seq(l@ ListQuery(sub, conditions, exprId))) =>
--- End diff --

`case In(value, Seq(ListQuery(sub, conditions, _)))`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-03-01 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r103852210
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala
 ---
@@ -83,29 +95,150 @@ object RewritePredicateSubquery extends 
Rule[LogicalPlan] with PredicateHelper {
   }
 
   /**
-   * Given a predicate expression and an input plan, it rewrites
-   * any embedded existential sub-query into an existential join.
-   * It returns the rewritten expression together with the updated plan.
-   * Currently, it does not support null-aware joins. Embedded NOT IN 
predicates
-   * are blocked in the Analyzer.
+   * Given a predicate expression and an input plan, it rewrites any 
embedded existential sub-query
+   * into an existential join. It returns the rewritten expression 
together with the updated plan.
+   * Currently, it does not support NOT IN nested inside a NOT expression. 
This case is blocked in
+   * the Analyzer.
*/
   private def rewriteExistentialExpr(
   exprs: Seq[Expression],
   plan: LogicalPlan): (Option[Expression], LogicalPlan) = {
 var newPlan = plan
 val newExprs = exprs.map { e =>
   e transformUp {
-case PredicateSubquery(sub, conditions, nullAware, _) =>
-  // TODO: support null-aware join
+case Exists(sub, conditions, exprId) =>
   val exists = AttributeReference("exists", BooleanType, nullable 
= false)()
-  newPlan = Join(newPlan, sub, ExistenceJoin(exists), 
conditions.reduceLeftOption(And))
+  newPlan = Join(newPlan, sub,
+ExistenceJoin(exists), conditions.reduceLeftOption(And))
   exists
-}
+case In(e, Seq(l@ ListQuery(sub, conditions, exprId))) =>
+  val exists = AttributeReference("exists", BooleanType, nullable 
= false)()
+  val inConditions = 
getValueExpression(e).zip(sub.output).map(EqualTo.tupled)
+  newPlan = Join(newPlan, sub,
+ExistenceJoin(exists), (inConditions ++ 
conditions).reduceLeftOption(And))
--- End diff --

```
val newConditions = (inConditions ++ conditions).reduceLeftOption(And)
newPlan = Join(newPlan, sub, ExistenceJoin(exists), newConditions)
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-03-01 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r103852067
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala
 ---
@@ -83,29 +95,150 @@ object RewritePredicateSubquery extends 
Rule[LogicalPlan] with PredicateHelper {
   }
 
   /**
-   * Given a predicate expression and an input plan, it rewrites
-   * any embedded existential sub-query into an existential join.
-   * It returns the rewritten expression together with the updated plan.
-   * Currently, it does not support null-aware joins. Embedded NOT IN 
predicates
-   * are blocked in the Analyzer.
+   * Given a predicate expression and an input plan, it rewrites any 
embedded existential sub-query
+   * into an existential join. It returns the rewritten expression 
together with the updated plan.
+   * Currently, it does not support NOT IN nested inside a NOT expression. 
This case is blocked in
+   * the Analyzer.
*/
   private def rewriteExistentialExpr(
   exprs: Seq[Expression],
   plan: LogicalPlan): (Option[Expression], LogicalPlan) = {
 var newPlan = plan
 val newExprs = exprs.map { e =>
   e transformUp {
-case PredicateSubquery(sub, conditions, nullAware, _) =>
-  // TODO: support null-aware join
+case Exists(sub, conditions, exprId) =>
   val exists = AttributeReference("exists", BooleanType, nullable 
= false)()
-  newPlan = Join(newPlan, sub, ExistenceJoin(exists), 
conditions.reduceLeftOption(And))
+  newPlan = Join(newPlan, sub,
--- End diff --

`newPlan = Join(newPlan, sub, ExistenceJoin(exists), 
conditions.reduceLeftOption(And))`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-03-01 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r103852025
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala
 ---
@@ -83,29 +95,150 @@ object RewritePredicateSubquery extends 
Rule[LogicalPlan] with PredicateHelper {
   }
 
   /**
-   * Given a predicate expression and an input plan, it rewrites
-   * any embedded existential sub-query into an existential join.
-   * It returns the rewritten expression together with the updated plan.
-   * Currently, it does not support null-aware joins. Embedded NOT IN 
predicates
-   * are blocked in the Analyzer.
+   * Given a predicate expression and an input plan, it rewrites any 
embedded existential sub-query
+   * into an existential join. It returns the rewritten expression 
together with the updated plan.
+   * Currently, it does not support NOT IN nested inside a NOT expression. 
This case is blocked in
+   * the Analyzer.
*/
   private def rewriteExistentialExpr(
   exprs: Seq[Expression],
   plan: LogicalPlan): (Option[Expression], LogicalPlan) = {
 var newPlan = plan
 val newExprs = exprs.map { e =>
   e transformUp {
-case PredicateSubquery(sub, conditions, nullAware, _) =>
-  // TODO: support null-aware join
+case Exists(sub, conditions, exprId) =>
--- End diff --

`case Exists(sub, conditions, _)`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-03-01 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r103851862
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala
 ---
@@ -54,20 +61,25 @@ object RewritePredicateSubquery extends 
Rule[LogicalPlan] with PredicateHelper {
 
   // Filter the plan by applying left semi and left anti joins.
   withSubquery.foldLeft(newFilter) {
-case (p, PredicateSubquery(sub, conditions, _, _)) =>
+case (p, Exists(sub, conditions, _)) =>
   val (joinCond, outerPlan) = rewriteExistentialExpr(conditions, p)
   Join(outerPlan, sub, LeftSemi, joinCond)
-case (p, Not(PredicateSubquery(sub, conditions, false, _))) =>
+case (p, Not(Exists(sub, conditions, _))) =>
   val (joinCond, outerPlan) = rewriteExistentialExpr(conditions, p)
   Join(outerPlan, sub, LeftAnti, joinCond)
-case (p, Not(PredicateSubquery(sub, conditions, true, _))) =>
+case (p, In(e, Seq(l @ ListQuery(sub, conditions, _ =>
+  val inConditions = 
getValueExpression(e).zip(sub.output).map(EqualTo.tupled)
+  val (joinCond, outerPlan) = rewriteExistentialExpr(inConditions 
++ conditions, p)
+  Join(outerPlan, sub, LeftSemi, joinCond)
+case (p, Not(In(e, Seq(l @ ListQuery(sub, conditions, _) =>
--- End diff --

`case (p, Not(In(e, Seq(ListQuery(sub, conditions, _) =>`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-03-01 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r103851814
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala
 ---
@@ -54,20 +61,25 @@ object RewritePredicateSubquery extends 
Rule[LogicalPlan] with PredicateHelper {
 
   // Filter the plan by applying left semi and left anti joins.
   withSubquery.foldLeft(newFilter) {
-case (p, PredicateSubquery(sub, conditions, _, _)) =>
+case (p, Exists(sub, conditions, _)) =>
   val (joinCond, outerPlan) = rewriteExistentialExpr(conditions, p)
   Join(outerPlan, sub, LeftSemi, joinCond)
-case (p, Not(PredicateSubquery(sub, conditions, false, _))) =>
+case (p, Not(Exists(sub, conditions, _))) =>
   val (joinCond, outerPlan) = rewriteExistentialExpr(conditions, p)
   Join(outerPlan, sub, LeftAnti, joinCond)
-case (p, Not(PredicateSubquery(sub, conditions, true, _))) =>
+case (p, In(e, Seq(l @ ListQuery(sub, conditions, _ =>
--- End diff --

`case (p, In(e, Seq(ListQuery(sub, conditions, _`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-03-01 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r103850635
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
 ---
@@ -365,17 +385,66 @@ object TypeCoercion {
   }
 
   /**
-   * Convert the value and in list expressions to the common operator type
-   * by looking at all the argument types and finding the closest one that
-   * all the arguments can be cast to. When no common operator type is 
found
-   * the original expression will be returned and an Analysis Exception 
will
-   * be raised at type checking phase.
+   * Handles type coercion for both IN expression with subquery and IN
+   * expressions without subquery.
+   * 1. In the first case, find the common type by comparing the left hand 
side
+   *expression types against corresponding right hand side expression 
derived
+   *from the subquery expression's plan output. Inject appropriate 
casts in the
+   *LHS and RHS side of IN expression.
+   *
+   * 2. In the second case, convert the value and in list expressions to 
the
+   *common operator type by looking at all the argument types and 
finding
+   *the closest one that all the arguments can be cast to. When no 
common
+   *operator type is found the original expression will be returned 
and an
+   *Analysis Exception will be raised at the type checking phase.
*/
   object InConversion extends Rule[LogicalPlan] {
 def apply(plan: LogicalPlan): LogicalPlan = plan resolveExpressions {
   // Skip nodes who's children have not been resolved yet.
   case e if !e.childrenResolved => e
 
+  // Handle type casting required between value expression and 
subquery output
+  // in IN subquery.
+  case i @ In(a, Seq(ListQuery(sub, children, exprId))) if !i.resolved 
=>
+// lhs is the value expression of IN subquery.
+val lhs = a match {
+  // Multi columns in IN clause is represented as a 
CreateNamedStruct.
+  // flatten the named struct to get the list of expressions.
+  case cns: CreateNamedStruct => cns.valExprs
+  case expr => Seq(expr)
+}
+
+// rhs is the subquery output.
+val rhs = sub.output
+require(lhs.length == rhs.length)
+
+val commonTypes = lhs.zip(rhs).flatMap { case (l, r) =>
+  findCommonTypeForBinaryComparison(l.dataType, r.dataType)
+}
+
+if (commonTypes.length == lhs.length) {
+  val castedRhs = rhs.zip(commonTypes).map {
+case (e, dt) if e.dataType != dt => Alias(Cast(e, dt), 
e.name)()
+case (e, _) => e
+  }
+  val castedLhs = lhs.zip(commonTypes).map {
+case (e, dt) if e.dataType != dt => Cast(e, dt)
+case (e, _) => e
+  }
+
+  // Before constructing the In expression, wrap the multi values 
in lhs
+  // in a CreatedNamedStruct.
+  val newLhs = a match {
+case cns: CreateNamedStruct =>
+  val nameValue = cns.nameExprs.zip(castedLhs).flatMap(pair => 
Seq(pair._1, pair._2))
--- End diff --

Please use `case (name, value) =>` instead of `pair`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-03-01 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r103850482
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
 ---
@@ -365,17 +385,66 @@ object TypeCoercion {
   }
 
   /**
-   * Convert the value and in list expressions to the common operator type
-   * by looking at all the argument types and finding the closest one that
-   * all the arguments can be cast to. When no common operator type is 
found
-   * the original expression will be returned and an Analysis Exception 
will
-   * be raised at type checking phase.
+   * Handles type coercion for both IN expression with subquery and IN
+   * expressions without subquery.
+   * 1. In the first case, find the common type by comparing the left hand 
side
+   *expression types against corresponding right hand side expression 
derived
+   *from the subquery expression's plan output. Inject appropriate 
casts in the
+   *LHS and RHS side of IN expression.
+   *
+   * 2. In the second case, convert the value and in list expressions to 
the
+   *common operator type by looking at all the argument types and 
finding
+   *the closest one that all the arguments can be cast to. When no 
common
+   *operator type is found the original expression will be returned 
and an
+   *Analysis Exception will be raised at the type checking phase.
*/
   object InConversion extends Rule[LogicalPlan] {
 def apply(plan: LogicalPlan): LogicalPlan = plan resolveExpressions {
   // Skip nodes who's children have not been resolved yet.
   case e if !e.childrenResolved => e
 
+  // Handle type casting required between value expression and 
subquery output
+  // in IN subquery.
+  case i @ In(a, Seq(ListQuery(sub, children, exprId))) if !i.resolved 
=>
+// lhs is the value expression of IN subquery.
+val lhs = a match {
+  // Multi columns in IN clause is represented as a 
CreateNamedStruct.
+  // flatten the named struct to get the list of expressions.
+  case cns: CreateNamedStruct => cns.valExprs
+  case expr => Seq(expr)
+}
+
+// rhs is the subquery output.
+val rhs = sub.output
+require(lhs.length == rhs.length)
+
+val commonTypes = lhs.zip(rhs).flatMap { case (l, r) =>
+  findCommonTypeForBinaryComparison(l.dataType, r.dataType)
+}
+
+if (commonTypes.length == lhs.length) {
--- End diff --

Please add a comment to explain what this condition means.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-03-01 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r103850385
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
 ---
@@ -365,17 +385,66 @@ object TypeCoercion {
   }
 
   /**
-   * Convert the value and in list expressions to the common operator type
-   * by looking at all the argument types and finding the closest one that
-   * all the arguments can be cast to. When no common operator type is 
found
-   * the original expression will be returned and an Analysis Exception 
will
-   * be raised at type checking phase.
+   * Handles type coercion for both IN expression with subquery and IN
+   * expressions without subquery.
+   * 1. In the first case, find the common type by comparing the left hand 
side
+   *expression types against corresponding right hand side expression 
derived
+   *from the subquery expression's plan output. Inject appropriate 
casts in the
+   *LHS and RHS side of IN expression.
+   *
+   * 2. In the second case, convert the value and in list expressions to 
the
+   *common operator type by looking at all the argument types and 
finding
+   *the closest one that all the arguments can be cast to. When no 
common
+   *operator type is found the original expression will be returned 
and an
+   *Analysis Exception will be raised at the type checking phase.
*/
   object InConversion extends Rule[LogicalPlan] {
 def apply(plan: LogicalPlan): LogicalPlan = plan resolveExpressions {
   // Skip nodes who's children have not been resolved yet.
   case e if !e.childrenResolved => e
 
+  // Handle type casting required between value expression and 
subquery output
+  // in IN subquery.
+  case i @ In(a, Seq(ListQuery(sub, children, exprId))) if !i.resolved 
=>
+// lhs is the value expression of IN subquery.
--- End diff --

`lhs` -> `LHS`. Please correct all the similar cases in comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-03-01 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r103850171
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
 ---
@@ -365,17 +385,66 @@ object TypeCoercion {
   }
 
   /**
-   * Convert the value and in list expressions to the common operator type
-   * by looking at all the argument types and finding the closest one that
-   * all the arguments can be cast to. When no common operator type is 
found
-   * the original expression will be returned and an Analysis Exception 
will
-   * be raised at type checking phase.
+   * Handles type coercion for both IN expression with subquery and IN
+   * expressions without subquery.
+   * 1. In the first case, find the common type by comparing the left hand 
side
+   *expression types against corresponding right hand side expression 
derived
--- End diff --

`left hand side` -> `left hand side (LHS)`
`right hand side` -> `right hand side (RHS)`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-03-01 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r103849859
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
 ---
@@ -202,14 +194,9 @@ trait CheckAnalysis extends PredicateHelper {
   s"filter expression '${f.condition.sql}' " +
 s"of type ${f.condition.dataType.simpleString} is not a 
boolean.")
 
-  case f @ Filter(condition, child) =>
-splitConjunctivePredicates(condition).foreach {
-  case _: PredicateSubquery | Not(_: PredicateSubquery) =>
-  case e if 
PredicateSubquery.hasNullAwarePredicateWithinNot(e) =>
-failAnalysis(s"Null-aware predicate sub-queries cannot be 
used in nested" +
-  s" conditions: $e")
-  case e =>
-}
+  case Filter(condition, _) if 
SubExprUtils.hasNullAwarePredicateWithinNot(condition) =>
+failAnalysis(s"Null-aware predicate sub-queries cannot be used 
in nested" +
+  s" conditions: $condition")
--- End diff --

Nit: `s"` -> `"` and move the space from the beginning of the second line 
to the end of the first line.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-02-28 Thread dilipbiswal
Github user dilipbiswal commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r103600773
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala
 ---
@@ -40,19 +42,179 @@ abstract class PlanExpression[T <: QueryPlan[_]] 
extends Expression {
 /**
  * A base interface for expressions that contain a [[LogicalPlan]].
  */
-abstract class SubqueryExpression extends PlanExpression[LogicalPlan] {
+abstract class SubqueryExpression(
+plan: LogicalPlan,
+children: Seq[Expression],
+exprId: ExprId) extends PlanExpression[LogicalPlan] {
+
+  override lazy val resolved: Boolean = childrenResolved && plan.resolved
+  override lazy val references: AttributeSet =
+if (plan.resolved) super.references -- plan.outputSet else 
super.references
   override def withNewPlan(plan: LogicalPlan): SubqueryExpression
+  override def semanticEquals(o: Expression): Boolean = o match {
+case p: SubqueryExpression =>
+  this.getClass.getName.equals(p.getClass.getName) && 
plan.sameResult(p.plan) &&
+children.length == p.children.length &&
+children.zip(p.children).forall(p => p._1.semanticEquals(p._2))
+case _ => false
+  }
 }
 
 object SubqueryExpression {
+  /**
+   * Returns true when an expression contains an IN or EXISTS subquery and 
false otherwise.
+   */
+  def hasInOrExistsSubquery(e: Expression): Boolean = {
+e.find {
+  case _: ListQuery | _: Exists => true
+  case _ => false
+}.isDefined
+  }
+
+  /**
+   * Returns true when an expression contains a subquery that has outer 
reference(s). The outer
+   * reference attributes are kept as children of subquery expression by
+   * [[org.apache.spark.sql.catalyst.analysis.Analyzer.ResolveSubquery]]
+   */
   def hasCorrelatedSubquery(e: Expression): Boolean = {
 e.find {
-  case e: SubqueryExpression if e.children.nonEmpty => true
+  case s: SubqueryExpression if s.children.nonEmpty => true
   case _ => false
 }.isDefined
   }
 }
 
+object SubExprUtils extends PredicateHelper {
+  /**
+   * Returns true when an expression contains correlated predicates i.e 
outer references and
+   * returns false otherwise.
+   */
+  def containsOuter(e: Expression): Boolean = {
+e.find(_.isInstanceOf[OuterReference]).isDefined
+  }
+
+  /**
+   * Returns whether there are any null-aware predicate subqueries inside 
Not. If not, we could
+   * turn the null-aware predicate into not-null-aware predicate.
+   */
+  def hasNullAwarePredicateWithinNot(e: Expression): Boolean = {
+e.find{ x =>
+  x.isInstanceOf[Not] && e.find {
--- End diff --

@hvanhovell I have refactored the code a bit and i hope, this function is 
bit clearer to follow. I have moved the code from its caller in checkAnalysis 
to this function. Please let me know what you think.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-02-28 Thread nsyca
Github user nsyca commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r103461455
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -707,13 +709,85 @@ class Analyzer(
   } transformUp {
 case other => other transformExpressions {
   case a: Attribute =>
-
attributeRewrites.get(a).getOrElse(a).withQualifier(a.qualifier)
+dedupAttr(a, attributeRewrites)
+  case s: SubqueryExpression =>
+s.withNewPlan(dedupOuterReferencesInSubquery(s.plan, 
attributeRewrites))
 }
   }
   newRight
   }
 }
 
+private def dedupAttr(attr: Attribute, attrMap: 
AttributeMap[Attribute]): Attribute = {
+  attrMap.get(attr).getOrElse(attr).withQualifier(attr.qualifier)
+}
+
+/**
+ * The outer plan may have been de-duplicated and the function below 
updates the
+ * outer references to refer to the de-duplicated attributes.
+ *
+ * For example (SQL):
+ * {{{
+ *   SELECT * FROM t1
+ *   WHERE EXISTS (SELECT 1
+ * FROM t2
+ * WHERE t1.c1 = t2.c1)
+ *   INTERSECT
+ *   SELECT * FROM t1
+ *   WHERE EXISTS (SELECT 1
+ * FROM t2
+ * WHERE t1.c1 = t2.c1)
+ * }}}
+  * Plan before resolveReference rule.
--- End diff --

@dilipbiswal You probably are already on the right track that one of the 
subquery expression is not required. I guess either `T1 intersect T1 where 
exists (T2)` or a self-join scenario like `T1, T1 Tx where exists (T2 where 
.col = T2.col)` with `` be `T1` or `Tx` should also do.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-02-28 Thread dilipbiswal
Github user dilipbiswal commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r103426702
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -1110,31 +1184,24 @@ class Analyzer(
 }
 
 /**
- * Pull out all (outer) correlated predicates from a given subquery. 
This method removes the
- * correlated predicates from subquery [[Filter]]s and adds the 
references of these predicates
- * to all intermediate [[Project]] and [[Aggregate]] clauses (if they 
are missing) in order to
- * be able to evaluate the predicates at the top level.
- *
- * This method returns the rewritten subquery and correlated 
predicates.
+ * Validates to make sure the outer references appearing inside the 
subquery
+ * are legal. This function also returns the list of expressions
+ * that contain outer references. These outer references would be kept 
as children
+ * of subquery expressions by the caller of this function.
  */
-private def pullOutCorrelatedPredicates(sub: LogicalPlan): 
(LogicalPlan, Seq[Expression]) = {
-  val predicateMap = scala.collection.mutable.Map.empty[LogicalPlan, 
Seq[Expression]]
+private def checkAndGetOuterReferences(sub: LogicalPlan): 
Seq[Expression] = {
+  val outerReferences = 
scala.collection.mutable.ArrayBuffer.empty[Seq[Expression]]
--- End diff --

@hvanhovell Thank you. I will change to use ArrayBuffer[Expression] 
instead. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-02-28 Thread dilipbiswal
Github user dilipbiswal commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r103415750
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -1398,42 +1399,46 @@ class Analyzer(
 }
   } while (!current.resolved && !current.fastEquals(previous))
 
-  // Step 2: Pull out the predicates if the plan is resolved.
+  // Step 2: pull the outer references and record them as children of 
SubqueryExpression
   if (current.resolved) {
 // Make sure the resolved query has the required number of output 
columns. This is only
 // needed for Scalar and IN subqueries.
 if (requiredColumns > 0 && requiredColumns != current.output.size) 
{
   failAnalysis(s"The number of columns in the subquery 
(${current.output.size}) " +
 s"does not match the required number of columns 
($requiredColumns)")
 }
-// Pullout predicates and construct a new plan.
-f.tupled(rewriteSubQuery(current, plans))
+// Validate the outer reference and record the outer references as 
children of
+// subquery expression.
+f.tupled(current, checkAndGetOuterReferences(current))
--- End diff --

@hvanhovell Got it. Thank you !!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-02-28 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r103412751
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -1110,31 +1184,24 @@ class Analyzer(
 }
 
 /**
- * Pull out all (outer) correlated predicates from a given subquery. 
This method removes the
- * correlated predicates from subquery [[Filter]]s and adds the 
references of these predicates
- * to all intermediate [[Project]] and [[Aggregate]] clauses (if they 
are missing) in order to
- * be able to evaluate the predicates at the top level.
- *
- * This method returns the rewritten subquery and correlated 
predicates.
+ * Validates to make sure the outer references appearing inside the 
subquery
+ * are legal. This function also returns the list of expressions
+ * that contain outer references. These outer references would be kept 
as children
+ * of subquery expressions by the caller of this function.
  */
-private def pullOutCorrelatedPredicates(sub: LogicalPlan): 
(LogicalPlan, Seq[Expression]) = {
-  val predicateMap = scala.collection.mutable.Map.empty[LogicalPlan, 
Seq[Expression]]
+private def checkAndGetOuterReferences(sub: LogicalPlan): 
Seq[Expression] = {
--- End diff --

Ok, then leave as it is. I thought it was only doing validation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-02-28 Thread dilipbiswal
Github user dilipbiswal commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r103411993
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala
 ---
@@ -40,19 +42,179 @@ abstract class PlanExpression[T <: QueryPlan[_]] 
extends Expression {
 /**
  * A base interface for expressions that contain a [[LogicalPlan]].
  */
-abstract class SubqueryExpression extends PlanExpression[LogicalPlan] {
+abstract class SubqueryExpression(
+plan: LogicalPlan,
+children: Seq[Expression],
+exprId: ExprId) extends PlanExpression[LogicalPlan] {
+
+  override lazy val resolved: Boolean = childrenResolved && plan.resolved
+  override lazy val references: AttributeSet =
+if (plan.resolved) super.references -- plan.outputSet else 
super.references
   override def withNewPlan(plan: LogicalPlan): SubqueryExpression
+  override def semanticEquals(o: Expression): Boolean = o match {
+case p: SubqueryExpression =>
+  this.getClass.getName.equals(p.getClass.getName) && 
plan.sameResult(p.plan) &&
+children.length == p.children.length &&
+children.zip(p.children).forall(p => p._1.semanticEquals(p._2))
+case _ => false
+  }
 }
 
 object SubqueryExpression {
+  /**
+   * Returns true when an expression contains an IN or EXISTS subquery and 
false otherwise.
+   */
+  def hasInOrExistsSubquery(e: Expression): Boolean = {
+e.find {
+  case _: ListQuery | _: Exists => true
+  case _ => false
+}.isDefined
+  }
+
+  /**
+   * Returns true when an expression contains a subquery that has outer 
reference(s). The outer
+   * reference attributes are kept as children of subquery expression by
+   * [[org.apache.spark.sql.catalyst.analysis.Analyzer.ResolveSubquery]]
+   */
   def hasCorrelatedSubquery(e: Expression): Boolean = {
 e.find {
-  case e: SubqueryExpression if e.children.nonEmpty => true
+  case s: SubqueryExpression if s.children.nonEmpty => true
   case _ => false
 }.isDefined
   }
 }
 
+object SubExprUtils extends PredicateHelper {
+  /**
+   * Returns true when an expression contains correlated predicates i.e 
outer references and
+   * returns false otherwise.
+   */
+  def containsOuter(e: Expression): Boolean = {
+e.find(_.isInstanceOf[OuterReference]).isDefined
+  }
+
+  /**
+   * Returns whether there are any null-aware predicate subqueries inside 
Not. If not, we could
+   * turn the null-aware predicate into not-null-aware predicate.
+   */
+  def hasNullAwarePredicateWithinNot(e: Expression): Boolean = {
+e.find{ x =>
+  x.isInstanceOf[Not] && e.find {
--- End diff --

@hvanhovell I actually had debugged this before and had completely 
forgotten about it. The case you mention actually works ok because of the way 
we invoke this functions.  Please see the caller at - 
[code](https://github.com/dilipbiswal/spark/blob/33844bb8c6e72c35e12a7ea9f124ad3c6b8bd43c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala#L197-L204)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-02-28 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r103406735
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -2512,3 +2522,67 @@ object ResolveCreateNamedStruct extends 
Rule[LogicalPlan] {
   CreateNamedStruct(children.toList)
   }
 }
+
+/**
+ * The aggregate expressions from subquery referencing outer query block 
are pushed
+ * down to the outer query block for evaluation. This rule below updates 
such outer references
+ * as AttributeReference referring attributes from the parent/outer query 
block.
+ *
+ * For example (SQL):
+ * {{{
+ *   SELECT l.a FROM l GROUP BY 1 HAVING EXISTS (SELECT 1 FROM r WHERE r.d 
< min(l.b))
+ * }}}
+ * Plan before the rule.
+ *Project [a#226]
+ *+- Filter exists#245 [min(b#227)#249]
+ *   :  +- Project [1 AS 1#247]
+ *   : +- Filter (d#238 < min(outer(b#227)))   <-
+ *   :+- SubqueryAlias r
+ *   :   +- Project [_1#234 AS c#237, _2#235 AS d#238]
+ *   :  +- LocalRelation [_1#234, _2#235]
+ *   +- Aggregate [a#226], [a#226, min(b#227) AS min(b#227)#249]
+ *  +- SubqueryAlias l
+ * +- Project [_1#223 AS a#226, _2#224 AS b#227]
+ *+- LocalRelation [_1#223, _2#224]
+ * Plan after the rule.
+ *Project [a#226]
+ *+- Filter exists#245 [min(b#227)#249]
+ *   :  +- Project [1 AS 1#247]
+ *   : +- Filter (d#238 < outer(min(b#227)#249))   <-
+ *   :+- SubqueryAlias r
+ *   :   +- Project [_1#234 AS c#237, _2#235 AS d#238]
+ *   :  +- LocalRelation [_1#234, _2#235]
+ *   +- Aggregate [a#226], [a#226, min(b#227) AS min(b#227)#249]
+ *  +- SubqueryAlias l
+ * +- Project [_1#223 AS a#226, _2#224 AS b#227]
+ *+- LocalRelation [_1#223, _2#224]
+ */
+object UpdateOuterReferences extends Rule[LogicalPlan] {
+  private def stripAlias(expr: Expression): Expression = expr match { case 
a: Alias => a.child }
+
+  private def updateOuterReferenceInSubquery(
+  plan: LogicalPlan,
+  refExprs: Seq[Expression]): LogicalPlan = {
+plan transformAllExpressions { case e =>
+  val outerAlias =
+
refExprs.find(stripAlias(_).semanticEquals(SubExprUtils.stripOuterReference(e)))
+  outerAlias match {
+case Some(a: Alias) => OuterReference(a.toAttribute)
+case _ => e
+  }
+}
+  }
+
+  def apply(plan: LogicalPlan): LogicalPlan = {
+plan transform {
+  case f @ Filter(_, a: Aggregate) if f.resolved =>
--- End diff --

Ok, awesome.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-02-28 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r103406494
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -1398,42 +1399,46 @@ class Analyzer(
 }
   } while (!current.resolved && !current.fastEquals(previous))
 
-  // Step 2: Pull out the predicates if the plan is resolved.
+  // Step 2: pull the outer references and record them as children of 
SubqueryExpression
   if (current.resolved) {
 // Make sure the resolved query has the required number of output 
columns. This is only
 // needed for Scalar and IN subqueries.
 if (requiredColumns > 0 && requiredColumns != current.output.size) 
{
   failAnalysis(s"The number of columns in the subquery 
(${current.output.size}) " +
 s"does not match the required number of columns 
($requiredColumns)")
 }
-// Pullout predicates and construct a new plan.
-f.tupled(rewriteSubQuery(current, plans))
+// Validate the outer reference and record the outer references as 
children of
+// subquery expression.
+f.tupled(current, checkAndGetOuterReferences(current))
--- End diff --

I think we can. The reason I used `f.tupled` was because 
`rewriteSubQuery(..)` returned a tuple.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-02-28 Thread dilipbiswal
Github user dilipbiswal commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r103398818
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -707,13 +709,85 @@ class Analyzer(
   } transformUp {
 case other => other transformExpressions {
   case a: Attribute =>
-
attributeRewrites.get(a).getOrElse(a).withQualifier(a.qualifier)
+dedupAttr(a, attributeRewrites)
+  case s: SubqueryExpression =>
+s.withNewPlan(dedupOuterReferencesInSubquery(s.plan, 
attributeRewrites))
 }
   }
   newRight
   }
 }
 
+private def dedupAttr(attr: Attribute, attrMap: 
AttributeMap[Attribute]): Attribute = {
+  attrMap.get(attr).getOrElse(attr).withQualifier(attr.qualifier)
+}
+
+/**
+ * The outer plan may have been de-duplicated and the function below 
updates the
+ * outer references to refer to the de-duplicated attributes.
+ *
+ * For example (SQL):
+ * {{{
+ *   SELECT * FROM t1
+ *   WHERE EXISTS (SELECT 1
+ * FROM t2
+ * WHERE t1.c1 = t2.c1)
+ *   INTERSECT
+ *   SELECT * FROM t1
+ *   WHERE EXISTS (SELECT 1
+ * FROM t2
+ * WHERE t1.c1 = t2.c1)
+ * }}}
+  * Plan before resolveReference rule.
--- End diff --

@hvanhovell I will work on a smaller example. I think we don't need the 
subquery expression in the left plan. That should cut down the plan size.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-02-27 Thread dilipbiswal
Github user dilipbiswal commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r103384302
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -1398,42 +1399,46 @@ class Analyzer(
 }
   } while (!current.resolved && !current.fastEquals(previous))
 
-  // Step 2: Pull out the predicates if the plan is resolved.
+  // Step 2: pull the outer references and record them as children of 
SubqueryExpression
   if (current.resolved) {
 // Make sure the resolved query has the required number of output 
columns. This is only
 // needed for Scalar and IN subqueries.
 if (requiredColumns > 0 && requiredColumns != current.output.size) 
{
   failAnalysis(s"The number of columns in the subquery 
(${current.output.size}) " +
 s"does not match the required number of columns 
($requiredColumns)")
 }
-// Pullout predicates and construct a new plan.
-f.tupled(rewriteSubQuery(current, plans))
+// Validate the outer reference and record the outer references as 
children of
+// subquery expression.
+f.tupled(current, checkAndGetOuterReferences(current))
--- End diff --

@hvanhovell We can remove tupled and just say f(current, ...) ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-02-27 Thread dilipbiswal
Github user dilipbiswal commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r103363226
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -707,13 +709,85 @@ class Analyzer(
   } transformUp {
 case other => other transformExpressions {
   case a: Attribute =>
-
attributeRewrites.get(a).getOrElse(a).withQualifier(a.qualifier)
+dedupAttr(a, attributeRewrites)
+  case s: SubqueryExpression =>
+s.withNewPlan(dedupOuterReferencesInSubquery(s.plan, 
attributeRewrites))
 }
   }
   newRight
   }
 }
 
+private def dedupAttr(attr: Attribute, attrMap: 
AttributeMap[Attribute]): Attribute = {
+  attrMap.get(attr).getOrElse(attr).withQualifier(attr.qualifier)
+}
+
+/**
+ * The outer plan may have been de-duplicated and the function below 
updates the
+ * outer references to refer to the de-duplicated attributes.
+ *
+ * For example (SQL):
+ * {{{
+ *   SELECT * FROM t1
+ *   WHERE EXISTS (SELECT 1
+ * FROM t2
+ * WHERE t1.c1 = t2.c1)
+ *   INTERSECT
+ *   SELECT * FROM t1
+ *   WHERE EXISTS (SELECT 1
+ * FROM t2
+ * WHERE t1.c1 = t2.c1)
+ * }}}
+  * Plan before resolveReference rule.
+ *'Intersect
+ *:- 'Project [*]
+ *:  +- Filter exists#271 [c1#250]
+ *: :  +- Project [1 AS 1#295]
+ *: : +- Filter (outer(c1#250) = c1#263)
+ *: :+- SubqueryAlias t2
+ *: :   +- Relation[c1#263,c2#264] parquet
+ *: +- SubqueryAlias t1
+ *:+- Relation[c1#250,c2#251] parquet
+ *+- 'Project [*]
+ *   +- Filter exists#272 [c1#250]
+ *  :  +- Project [1 AS 1#298]
+ *  : +- Filter (outer(c1#250) = c1#263)
+ *  :+- SubqueryAlias t2
+ *  :   +- Relation[c1#263,c2#264] parquet
+ *  +- SubqueryAlias t1
+ * +- Relation[c1#250,c2#251] parquet
+ * Plan after the resolveReference rule.
+ *Intersect
+ *:- Project [c1#250, c2#251]
+ *:  +- Filter exists#271 [c1#250]
+ *: :  +- Project [1 AS 1#295]
+ *: : +- Filter (outer(c1#250) = c1#263)
+ *: :+- SubqueryAlias t2
+ *: :   +- Relation[c1#263,c2#264] parquet
+ *: +- SubqueryAlias t1
+ *:+- Relation[c1#250,c2#251] parquet
+ *+- Project [c1#299, c2#300]
+ *+- Filter exists#272 [c1#299]
--- End diff --

@hvanhovell Thanks.. will change.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-02-27 Thread dilipbiswal
Github user dilipbiswal commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r103363171
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala
 ---
@@ -40,19 +42,179 @@ abstract class PlanExpression[T <: QueryPlan[_]] 
extends Expression {
 /**
  * A base interface for expressions that contain a [[LogicalPlan]].
  */
-abstract class SubqueryExpression extends PlanExpression[LogicalPlan] {
+abstract class SubqueryExpression(
+plan: LogicalPlan,
+children: Seq[Expression],
+exprId: ExprId) extends PlanExpression[LogicalPlan] {
+
+  override lazy val resolved: Boolean = childrenResolved && plan.resolved
+  override lazy val references: AttributeSet =
+if (plan.resolved) super.references -- plan.outputSet else 
super.references
   override def withNewPlan(plan: LogicalPlan): SubqueryExpression
+  override def semanticEquals(o: Expression): Boolean = o match {
+case p: SubqueryExpression =>
+  this.getClass.getName.equals(p.getClass.getName) && 
plan.sameResult(p.plan) &&
+children.length == p.children.length &&
+children.zip(p.children).forall(p => p._1.semanticEquals(p._2))
+case _ => false
+  }
 }
 
 object SubqueryExpression {
+  /**
+   * Returns true when an expression contains an IN or EXISTS subquery and 
false otherwise.
+   */
+  def hasInOrExistsSubquery(e: Expression): Boolean = {
+e.find {
+  case _: ListQuery | _: Exists => true
+  case _ => false
+}.isDefined
+  }
+
+  /**
+   * Returns true when an expression contains a subquery that has outer 
reference(s). The outer
+   * reference attributes are kept as children of subquery expression by
+   * [[org.apache.spark.sql.catalyst.analysis.Analyzer.ResolveSubquery]]
+   */
   def hasCorrelatedSubquery(e: Expression): Boolean = {
 e.find {
-  case e: SubqueryExpression if e.children.nonEmpty => true
+  case s: SubqueryExpression if s.children.nonEmpty => true
   case _ => false
 }.isDefined
   }
 }
 
+object SubExprUtils extends PredicateHelper {
+  /**
+   * Returns true when an expression contains correlated predicates i.e 
outer references and
+   * returns false otherwise.
+   */
+  def containsOuter(e: Expression): Boolean = {
+e.find(_.isInstanceOf[OuterReference]).isDefined
+  }
+
+  /**
+   * Returns whether there are any null-aware predicate subqueries inside 
Not. If not, we could
+   * turn the null-aware predicate into not-null-aware predicate.
+   */
+  def hasNullAwarePredicateWithinNot(e: Expression): Boolean = {
+e.find{ x =>
+  x.isInstanceOf[Not] && e.find {
--- End diff --

@hvanhovell Actually I copied this function from existing 
PredicateSubquery.hasNullAwarePredicateWithinNot. I also found it a little odd 
that we are doing a e.find .. but was afraid to change this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-02-27 Thread dilipbiswal
Github user dilipbiswal commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r103362319
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
 ---
@@ -123,19 +123,36 @@ case class Not(child: Expression)
  */
 @ExpressionDescription(
   usage = "expr1 _FUNC_(expr2, expr3, ...) - Returns true if `expr` equals 
to any valN.")
-case class In(value: Expression, list: Seq[Expression]) extends Predicate
-with ImplicitCastInputTypes {
+case class In(value: Expression, list: Seq[Expression]) extends Predicate {
 
   require(list != null, "list should not be null")
-
-  override def inputTypes: Seq[AbstractDataType] = value.dataType +: 
list.map(_.dataType)
-
   override def checkInputDataTypes(): TypeCheckResult = {
-if (list.exists(l => l.dataType != value.dataType)) {
-  TypeCheckResult.TypeCheckFailure(
-"Arguments must be same type")
-} else {
-  TypeCheckResult.TypeCheckSuccess
+list match {
+  case ListQuery(sub, _, _) :: Nil =>
--- End diff --

@hvanhovell Actually previously checkInputDataTypes() called from 
checkAnalysis never had to deal with **In subquery expressions** as it gets 
rewritten to PredicateSubquery. With the change in this PR we now see the 
original IN subquery expressions and thus we need to make sure the types are 
same between LHS and RHS of the IN subquery expression.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-02-27 Thread dilipbiswal
Github user dilipbiswal commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r103359052
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
 ---
@@ -109,6 +109,26 @@ object TypeCoercion {
   }
 
   /**
+   * This function determines the target type of a comparison operator 
when one operand
+   * is a String and the other is not. It also handles when one op is a 
Date and the
+   * other is a Timestamp by making the target type to be String. 
Currently this is used
+   * to coerce types between LHS and RHS of the IN expression.
+   */
+  val findCommonTypeForBinaryComparison: (DataType, DataType) => 
Option[DataType] = {
+case (StringType, DateType) => Some(StringType)
+case (DateType, StringType) => Some(StringType)
+case (StringType, TimestampType) => Some(StringType)
+case (TimestampType, StringType) => Some(StringType)
+case (TimestampType, DateType) => Some(StringType)
--- End diff --

@hvanhovell Here is where i saw the code that handles the promotion between 
date and timestamp types.

https://github.com/dilipbiswal/spark/blob/33844bb8c6e72c35e12a7ea9f124ad3c6b8bd43c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala#L347:L360
Please let me know if i missed something here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-02-27 Thread dilipbiswal
Github user dilipbiswal commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r103354299
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala
 ---
@@ -40,19 +42,179 @@ abstract class PlanExpression[T <: QueryPlan[_]] 
extends Expression {
 /**
  * A base interface for expressions that contain a [[LogicalPlan]].
  */
-abstract class SubqueryExpression extends PlanExpression[LogicalPlan] {
+abstract class SubqueryExpression(
+plan: LogicalPlan,
+children: Seq[Expression],
+exprId: ExprId) extends PlanExpression[LogicalPlan] {
+
+  override lazy val resolved: Boolean = childrenResolved && plan.resolved
+  override lazy val references: AttributeSet =
+if (plan.resolved) super.references -- plan.outputSet else 
super.references
   override def withNewPlan(plan: LogicalPlan): SubqueryExpression
+  override def semanticEquals(o: Expression): Boolean = o match {
+case p: SubqueryExpression =>
+  this.getClass.getName.equals(p.getClass.getName) && 
plan.sameResult(p.plan) &&
+children.length == p.children.length &&
+children.zip(p.children).forall(p => p._1.semanticEquals(p._2))
+case _ => false
+  }
 }
 
 object SubqueryExpression {
+  /**
+   * Returns true when an expression contains an IN or EXISTS subquery and 
false otherwise.
+   */
+  def hasInOrExistsSubquery(e: Expression): Boolean = {
+e.find {
+  case _: ListQuery | _: Exists => true
+  case _ => false
+}.isDefined
+  }
+
+  /**
+   * Returns true when an expression contains a subquery that has outer 
reference(s). The outer
+   * reference attributes are kept as children of subquery expression by
+   * [[org.apache.spark.sql.catalyst.analysis.Analyzer.ResolveSubquery]]
+   */
   def hasCorrelatedSubquery(e: Expression): Boolean = {
 e.find {
-  case e: SubqueryExpression if e.children.nonEmpty => true
+  case s: SubqueryExpression if s.children.nonEmpty => true
   case _ => false
 }.isDefined
   }
 }
 
+object SubExprUtils extends PredicateHelper {
+  /**
+   * Returns true when an expression contains correlated predicates i.e 
outer references and
+   * returns false otherwise.
+   */
+  def containsOuter(e: Expression): Boolean = {
+e.find(_.isInstanceOf[OuterReference]).isDefined
+  }
+
+  /**
+   * Returns whether there are any null-aware predicate subqueries inside 
Not. If not, we could
+   * turn the null-aware predicate into not-null-aware predicate.
+   */
+  def hasNullAwarePredicateWithinNot(e: Expression): Boolean = {
+e.find{ x =>
+  x.isInstanceOf[Not] && e.find {
+case In(_, Seq(_: ListQuery)) => true
+case _ => false
+  }.isDefined
+}.isDefined
+  }
+
+  /**
+   * Returns an expression after removing the OuterReference shell.
+   */
+  def stripOuterReference(e: Expression): Expression = {
+e.transform {
+  case OuterReference(r) => r
+}
+  }
+
+  /**
+   * Returns the list of expressions after removing the OuterReference 
shell from each of
+   * the expression.
+   */
+  def stripOuterReferences(e: Seq[Expression]): Seq[Expression] = 
e.map(stripOuterReference)
+
+  /**
+   * Returns the logical plan after removing the OuterReference shell from 
all the expressions
+   * of the input logical plan.
+   */
+  def stripOuterReferences(p: LogicalPlan): LogicalPlan = {
+p.transformAllExpressions {
+  case OuterReference(a) => a
+}
+  }
+
+  /**
+   * Given a list of expressions, returns the expressions which have outer 
references. Aggregate
+   * expressions are treated in a special way. If the children of 
aggregate expression contains an
+   * outer reference, then the entire aggregate expression is marked as an 
outer reference.
+   * Example (SQL):
+   * {{{
+   *   SELECT a FROM l GROUP by 1 HAVING EXISTS (SELECT 1 FROM r WHERE d < 
min(b))
+   * }}}
+   * In the above case, we want to mark the entire min(b) as an outer 
reference
+   * OuterReference(min(b)) instead of min(OuterReference(b)).
+   * TODO: Currently we don't allow deep correlation. Also, we don't allow 
mixing of
+   * outer references and local references under an aggregate expression.
+   * For example (SQL):
+   * {{{
+   *   SELECT .. FROM p1
+   *   WHERE EXISTS (SELECT ...
+   * FROM p2
+   * WHERE EXISTS (SELECT ...
+   *   FROM sq
+   *   WHERE min(p1.a + p2.b) = sq.c))
+   *
+   *  

[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-02-27 Thread dilipbiswal
Github user dilipbiswal commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r103353840
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -2512,3 +2522,67 @@ object ResolveCreateNamedStruct extends 
Rule[LogicalPlan] {
   CreateNamedStruct(children.toList)
   }
 }
+
+/**
+ * The aggregate expressions from subquery referencing outer query block 
are pushed
+ * down to the outer query block for evaluation. This rule below updates 
such outer references
+ * as AttributeReference referring attributes from the parent/outer query 
block.
+ *
+ * For example (SQL):
+ * {{{
+ *   SELECT l.a FROM l GROUP BY 1 HAVING EXISTS (SELECT 1 FROM r WHERE r.d 
< min(l.b))
+ * }}}
+ * Plan before the rule.
+ *Project [a#226]
+ *+- Filter exists#245 [min(b#227)#249]
+ *   :  +- Project [1 AS 1#247]
+ *   : +- Filter (d#238 < min(outer(b#227)))   <-
+ *   :+- SubqueryAlias r
+ *   :   +- Project [_1#234 AS c#237, _2#235 AS d#238]
+ *   :  +- LocalRelation [_1#234, _2#235]
+ *   +- Aggregate [a#226], [a#226, min(b#227) AS min(b#227)#249]
+ *  +- SubqueryAlias l
+ * +- Project [_1#223 AS a#226, _2#224 AS b#227]
+ *+- LocalRelation [_1#223, _2#224]
+ * Plan after the rule.
+ *Project [a#226]
+ *+- Filter exists#245 [min(b#227)#249]
+ *   :  +- Project [1 AS 1#247]
+ *   : +- Filter (d#238 < outer(min(b#227)#249))   <-
+ *   :+- SubqueryAlias r
+ *   :   +- Project [_1#234 AS c#237, _2#235 AS d#238]
+ *   :  +- LocalRelation [_1#234, _2#235]
+ *   +- Aggregate [a#226], [a#226, min(b#227) AS min(b#227)#249]
+ *  +- SubqueryAlias l
+ * +- Project [_1#223 AS a#226, _2#224 AS b#227]
+ *+- LocalRelation [_1#223, _2#224]
+ */
+object UpdateOuterReferences extends Rule[LogicalPlan] {
+  private def stripAlias(expr: Expression): Expression = expr match { case 
a: Alias => a.child }
+
+  private def updateOuterReferenceInSubquery(
+  plan: LogicalPlan,
+  refExprs: Seq[Expression]): LogicalPlan = {
+plan transformAllExpressions { case e =>
+  val outerAlias =
+
refExprs.find(stripAlias(_).semanticEquals(SubExprUtils.stripOuterReference(e)))
+  outerAlias match {
+case Some(a: Alias) => OuterReference(a.toAttribute)
+case _ => e
+  }
+}
+  }
+
+  def apply(plan: LogicalPlan): LogicalPlan = {
+plan transform {
+  case f @ Filter(_, a: Aggregate) if f.resolved =>
--- End diff --

@hvanhovell At the time of executing this rule, the aggregate expressions 
from the subquery plan are already pushed down to the Aggregate operator 
through ResolveAggregateFunctions. Here we are just updating the outer 
references in the subquery plan. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-02-27 Thread dilipbiswal
Github user dilipbiswal commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r103347540
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -2332,6 +2337,11 @@ class Analyzer(
 override def apply(plan: LogicalPlan): LogicalPlan = 
plan.resolveExpressions {
   case e: TimeZoneAwareExpression if e.timeZoneId.isEmpty =>
 e.withTimeZone(conf.sessionLocalTimeZone)
+  // Casts could be added in the subquery plan through the rule 
TypeCoercion while coercing
+  // the types between the value expression and list query expression 
of IN expression.
+  // We need to subject the subquery plan through ResolveTimeZone 
again to setup timezone
+  // information for time zone aware expressions.
+  case e: ListQuery => e.withNewPlan(ResolveTimeZone.apply(e.plan))
--- End diff --

@hvanhovell Thank you.. I will change.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-02-27 Thread dilipbiswal
Github user dilipbiswal commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r103346762
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -1110,31 +1184,24 @@ class Analyzer(
 }
 
 /**
- * Pull out all (outer) correlated predicates from a given subquery. 
This method removes the
- * correlated predicates from subquery [[Filter]]s and adds the 
references of these predicates
- * to all intermediate [[Project]] and [[Aggregate]] clauses (if they 
are missing) in order to
- * be able to evaluate the predicates at the top level.
- *
- * This method returns the rewritten subquery and correlated 
predicates.
+ * Validates to make sure the outer references appearing inside the 
subquery
+ * are legal. This function also returns the list of expressions
+ * that contain outer references. These outer references would be kept 
as children
+ * of subquery expressions by the caller of this function.
  */
-private def pullOutCorrelatedPredicates(sub: LogicalPlan): 
(LogicalPlan, Seq[Expression]) = {
-  val predicateMap = scala.collection.mutable.Map.empty[LogicalPlan, 
Seq[Expression]]
+private def checkAndGetOuterReferences(sub: LogicalPlan): 
Seq[Expression] = {
--- End diff --

@hvanhovell The code here validates the correlated references as well as 
collects them to rewrite the outer plan to record the outer references. You are 
suggesting to move the "check" portion to checkAnalysis ? I will change to use 
foreachUp. Thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-02-27 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r103340921
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala
 ---
@@ -40,19 +42,179 @@ abstract class PlanExpression[T <: QueryPlan[_]] 
extends Expression {
 /**
  * A base interface for expressions that contain a [[LogicalPlan]].
  */
-abstract class SubqueryExpression extends PlanExpression[LogicalPlan] {
+abstract class SubqueryExpression(
+plan: LogicalPlan,
+children: Seq[Expression],
+exprId: ExprId) extends PlanExpression[LogicalPlan] {
+
+  override lazy val resolved: Boolean = childrenResolved && plan.resolved
+  override lazy val references: AttributeSet =
+if (plan.resolved) super.references -- plan.outputSet else 
super.references
   override def withNewPlan(plan: LogicalPlan): SubqueryExpression
+  override def semanticEquals(o: Expression): Boolean = o match {
+case p: SubqueryExpression =>
+  this.getClass.getName.equals(p.getClass.getName) && 
plan.sameResult(p.plan) &&
+children.length == p.children.length &&
+children.zip(p.children).forall(p => p._1.semanticEquals(p._2))
+case _ => false
+  }
 }
 
 object SubqueryExpression {
+  /**
+   * Returns true when an expression contains an IN or EXISTS subquery and 
false otherwise.
+   */
+  def hasInOrExistsSubquery(e: Expression): Boolean = {
+e.find {
+  case _: ListQuery | _: Exists => true
+  case _ => false
+}.isDefined
+  }
+
+  /**
+   * Returns true when an expression contains a subquery that has outer 
reference(s). The outer
+   * reference attributes are kept as children of subquery expression by
+   * [[org.apache.spark.sql.catalyst.analysis.Analyzer.ResolveSubquery]]
+   */
   def hasCorrelatedSubquery(e: Expression): Boolean = {
 e.find {
-  case e: SubqueryExpression if e.children.nonEmpty => true
+  case s: SubqueryExpression if s.children.nonEmpty => true
   case _ => false
 }.isDefined
   }
 }
 
+object SubExprUtils extends PredicateHelper {
+  /**
+   * Returns true when an expression contains correlated predicates i.e 
outer references and
+   * returns false otherwise.
+   */
+  def containsOuter(e: Expression): Boolean = {
+e.find(_.isInstanceOf[OuterReference]).isDefined
+  }
+
+  /**
+   * Returns whether there are any null-aware predicate subqueries inside 
Not. If not, we could
+   * turn the null-aware predicate into not-null-aware predicate.
+   */
+  def hasNullAwarePredicateWithinNot(e: Expression): Boolean = {
+e.find{ x =>
+  x.isInstanceOf[Not] && e.find {
+case In(_, Seq(_: ListQuery)) => true
+case _ => false
+  }.isDefined
+}.isDefined
+  }
+
+  /**
+   * Returns an expression after removing the OuterReference shell.
+   */
+  def stripOuterReference(e: Expression): Expression = {
+e.transform {
+  case OuterReference(r) => r
+}
+  }
+
+  /**
+   * Returns the list of expressions after removing the OuterReference 
shell from each of
+   * the expression.
+   */
+  def stripOuterReferences(e: Seq[Expression]): Seq[Expression] = 
e.map(stripOuterReference)
+
+  /**
+   * Returns the logical plan after removing the OuterReference shell from 
all the expressions
+   * of the input logical plan.
+   */
+  def stripOuterReferences(p: LogicalPlan): LogicalPlan = {
+p.transformAllExpressions {
+  case OuterReference(a) => a
+}
+  }
+
+  /**
+   * Given a list of expressions, returns the expressions which have outer 
references. Aggregate
+   * expressions are treated in a special way. If the children of 
aggregate expression contains an
+   * outer reference, then the entire aggregate expression is marked as an 
outer reference.
+   * Example (SQL):
+   * {{{
+   *   SELECT a FROM l GROUP by 1 HAVING EXISTS (SELECT 1 FROM r WHERE d < 
min(b))
+   * }}}
+   * In the above case, we want to mark the entire min(b) as an outer 
reference
+   * OuterReference(min(b)) instead of min(OuterReference(b)).
+   * TODO: Currently we don't allow deep correlation. Also, we don't allow 
mixing of
+   * outer references and local references under an aggregate expression.
+   * For example (SQL):
+   * {{{
+   *   SELECT .. FROM p1
+   *   WHERE EXISTS (SELECT ...
+   * FROM p2
+   * WHERE EXISTS (SELECT ...
+   *   FROM sq
+   *   WHERE min(p1.a + p2.b) = sq.c))
+   *
+   *   

[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-02-27 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r103337724
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -2512,3 +2522,67 @@ object ResolveCreateNamedStruct extends 
Rule[LogicalPlan] {
   CreateNamedStruct(children.toList)
   }
 }
+
+/**
+ * The aggregate expressions from subquery referencing outer query block 
are pushed
+ * down to the outer query block for evaluation. This rule below updates 
such outer references
+ * as AttributeReference referring attributes from the parent/outer query 
block.
+ *
+ * For example (SQL):
+ * {{{
+ *   SELECT l.a FROM l GROUP BY 1 HAVING EXISTS (SELECT 1 FROM r WHERE r.d 
< min(l.b))
+ * }}}
+ * Plan before the rule.
+ *Project [a#226]
+ *+- Filter exists#245 [min(b#227)#249]
+ *   :  +- Project [1 AS 1#247]
+ *   : +- Filter (d#238 < min(outer(b#227)))   <-
+ *   :+- SubqueryAlias r
+ *   :   +- Project [_1#234 AS c#237, _2#235 AS d#238]
+ *   :  +- LocalRelation [_1#234, _2#235]
+ *   +- Aggregate [a#226], [a#226, min(b#227) AS min(b#227)#249]
+ *  +- SubqueryAlias l
+ * +- Project [_1#223 AS a#226, _2#224 AS b#227]
+ *+- LocalRelation [_1#223, _2#224]
+ * Plan after the rule.
+ *Project [a#226]
+ *+- Filter exists#245 [min(b#227)#249]
+ *   :  +- Project [1 AS 1#247]
+ *   : +- Filter (d#238 < outer(min(b#227)#249))   <-
+ *   :+- SubqueryAlias r
+ *   :   +- Project [_1#234 AS c#237, _2#235 AS d#238]
+ *   :  +- LocalRelation [_1#234, _2#235]
+ *   +- Aggregate [a#226], [a#226, min(b#227) AS min(b#227)#249]
+ *  +- SubqueryAlias l
+ * +- Project [_1#223 AS a#226, _2#224 AS b#227]
+ *+- LocalRelation [_1#223, _2#224]
+ */
+object UpdateOuterReferences extends Rule[LogicalPlan] {
+  private def stripAlias(expr: Expression): Expression = expr match { case 
a: Alias => a.child }
+
+  private def updateOuterReferenceInSubquery(
+  plan: LogicalPlan,
+  refExprs: Seq[Expression]): LogicalPlan = {
+plan transformAllExpressions { case e =>
+  val outerAlias =
+
refExprs.find(stripAlias(_).semanticEquals(SubExprUtils.stripOuterReference(e)))
+  outerAlias match {
+case Some(a: Alias) => OuterReference(a.toAttribute)
+case _ => e
+  }
+}
+  }
+
+  def apply(plan: LogicalPlan): LogicalPlan = {
+plan transform {
+  case f @ Filter(_, a: Aggregate) if f.resolved =>
--- End diff --

This only works with aggregates that are already in the `Aggregate` 
operator, this seems like a regression. What does Hive do?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-02-27 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r102165726
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -707,13 +709,85 @@ class Analyzer(
   } transformUp {
 case other => other transformExpressions {
   case a: Attribute =>
-
attributeRewrites.get(a).getOrElse(a).withQualifier(a.qualifier)
+dedupAttr(a, attributeRewrites)
+  case s: SubqueryExpression =>
+s.withNewPlan(dedupOuterReferencesInSubquery(s.plan, 
attributeRewrites))
 }
   }
   newRight
   }
 }
 
+private def dedupAttr(attr: Attribute, attrMap: 
AttributeMap[Attribute]): Attribute = {
+  attrMap.get(attr).getOrElse(attr).withQualifier(attr.qualifier)
+}
+
+/**
+ * The outer plan may have been de-duplicated and the function below 
updates the
+ * outer references to refer to the de-duplicated attributes.
+ *
+ * For example (SQL):
+ * {{{
+ *   SELECT * FROM t1
+ *   WHERE EXISTS (SELECT 1
+ * FROM t2
+ * WHERE t1.c1 = t2.c1)
+ *   INTERSECT
+ *   SELECT * FROM t1
+ *   WHERE EXISTS (SELECT 1
+ * FROM t2
+ * WHERE t1.c1 = t2.c1)
+ * }}}
+  * Plan before resolveReference rule.
+ *'Intersect
+ *:- 'Project [*]
+ *:  +- Filter exists#271 [c1#250]
+ *: :  +- Project [1 AS 1#295]
+ *: : +- Filter (outer(c1#250) = c1#263)
+ *: :+- SubqueryAlias t2
+ *: :   +- Relation[c1#263,c2#264] parquet
+ *: +- SubqueryAlias t1
+ *:+- Relation[c1#250,c2#251] parquet
+ *+- 'Project [*]
+ *   +- Filter exists#272 [c1#250]
+ *  :  +- Project [1 AS 1#298]
+ *  : +- Filter (outer(c1#250) = c1#263)
+ *  :+- SubqueryAlias t2
+ *  :   +- Relation[c1#263,c2#264] parquet
+ *  +- SubqueryAlias t1
+ * +- Relation[c1#250,c2#251] parquet
+ * Plan after the resolveReference rule.
+ *Intersect
+ *:- Project [c1#250, c2#251]
+ *:  +- Filter exists#271 [c1#250]
+ *: :  +- Project [1 AS 1#295]
+ *: : +- Filter (outer(c1#250) = c1#263)
+ *: :+- SubqueryAlias t2
+ *: :   +- Relation[c1#263,c2#264] parquet
+ *: +- SubqueryAlias t1
+ *:+- Relation[c1#250,c2#251] parquet
+ *+- Project [c1#299, c2#300]
+ *+- Filter exists#272 [c1#299]
--- End diff --

Nit: spacing is of here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-02-27 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r102256790
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -2332,6 +2337,11 @@ class Analyzer(
 override def apply(plan: LogicalPlan): LogicalPlan = 
plan.resolveExpressions {
   case e: TimeZoneAwareExpression if e.timeZoneId.isEmpty =>
 e.withTimeZone(conf.sessionLocalTimeZone)
+  // Casts could be added in the subquery plan through the rule 
TypeCoercion while coercing
+  // the types between the value expression and list query expression 
of IN expression.
+  // We need to subject the subquery plan through ResolveTimeZone 
again to setup timezone
+  // information for time zone aware expressions.
+  case e: ListQuery => e.withNewPlan(ResolveTimeZone.apply(e.plan))
--- End diff --

Nit: just use `apply`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-02-27 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r103340692
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala
 ---
@@ -40,19 +42,179 @@ abstract class PlanExpression[T <: QueryPlan[_]] 
extends Expression {
 /**
  * A base interface for expressions that contain a [[LogicalPlan]].
  */
-abstract class SubqueryExpression extends PlanExpression[LogicalPlan] {
+abstract class SubqueryExpression(
+plan: LogicalPlan,
+children: Seq[Expression],
+exprId: ExprId) extends PlanExpression[LogicalPlan] {
+
+  override lazy val resolved: Boolean = childrenResolved && plan.resolved
+  override lazy val references: AttributeSet =
+if (plan.resolved) super.references -- plan.outputSet else 
super.references
   override def withNewPlan(plan: LogicalPlan): SubqueryExpression
+  override def semanticEquals(o: Expression): Boolean = o match {
+case p: SubqueryExpression =>
+  this.getClass.getName.equals(p.getClass.getName) && 
plan.sameResult(p.plan) &&
+children.length == p.children.length &&
+children.zip(p.children).forall(p => p._1.semanticEquals(p._2))
+case _ => false
+  }
 }
 
 object SubqueryExpression {
+  /**
+   * Returns true when an expression contains an IN or EXISTS subquery and 
false otherwise.
+   */
+  def hasInOrExistsSubquery(e: Expression): Boolean = {
+e.find {
+  case _: ListQuery | _: Exists => true
+  case _ => false
+}.isDefined
+  }
+
+  /**
+   * Returns true when an expression contains a subquery that has outer 
reference(s). The outer
+   * reference attributes are kept as children of subquery expression by
+   * [[org.apache.spark.sql.catalyst.analysis.Analyzer.ResolveSubquery]]
+   */
   def hasCorrelatedSubquery(e: Expression): Boolean = {
 e.find {
-  case e: SubqueryExpression if e.children.nonEmpty => true
+  case s: SubqueryExpression if s.children.nonEmpty => true
   case _ => false
 }.isDefined
   }
 }
 
+object SubExprUtils extends PredicateHelper {
+  /**
+   * Returns true when an expression contains correlated predicates i.e 
outer references and
+   * returns false otherwise.
+   */
+  def containsOuter(e: Expression): Boolean = {
+e.find(_.isInstanceOf[OuterReference]).isDefined
+  }
+
+  /**
+   * Returns whether there are any null-aware predicate subqueries inside 
Not. If not, we could
+   * turn the null-aware predicate into not-null-aware predicate.
+   */
+  def hasNullAwarePredicateWithinNot(e: Expression): Boolean = {
+e.find{ x =>
+  x.isInstanceOf[Not] && e.find {
--- End diff --

I am pretty sure this is wrong. This will work on the following expression: 
`NOT(a) AND b IN(SELECT q FROM t)`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-02-27 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r103340272
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
 ---
@@ -123,19 +123,36 @@ case class Not(child: Expression)
  */
 @ExpressionDescription(
   usage = "expr1 _FUNC_(expr2, expr3, ...) - Returns true if `expr` equals 
to any valN.")
-case class In(value: Expression, list: Seq[Expression]) extends Predicate
-with ImplicitCastInputTypes {
+case class In(value: Expression, list: Seq[Expression]) extends Predicate {
 
   require(list != null, "list should not be null")
-
-  override def inputTypes: Seq[AbstractDataType] = value.dataType +: 
list.map(_.dataType)
-
   override def checkInputDataTypes(): TypeCheckResult = {
-if (list.exists(l => l.dataType != value.dataType)) {
-  TypeCheckResult.TypeCheckFailure(
-"Arguments must be same type")
-} else {
-  TypeCheckResult.TypeCheckSuccess
+list match {
+  case ListQuery(sub, _, _) :: Nil =>
--- End diff --

IIUC this is all done to get a better error message right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-02-27 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r103339031
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
 ---
@@ -109,6 +109,26 @@ object TypeCoercion {
   }
 
   /**
+   * This function determines the target type of a comparison operator 
when one operand
+   * is a String and the other is not. It also handles when one op is a 
Date and the
+   * other is a Timestamp by making the target type to be String. 
Currently this is used
+   * to coerce types between LHS and RHS of the IN expression.
+   */
+  val findCommonTypeForBinaryComparison: (DataType, DataType) => 
Option[DataType] = {
+case (StringType, DateType) => Some(StringType)
+case (DateType, StringType) => Some(StringType)
+case (StringType, TimestampType) => Some(StringType)
+case (TimestampType, StringType) => Some(StringType)
+case (TimestampType, DateType) => Some(StringType)
--- End diff --

This seems weird. Is this also the current behavior?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-02-27 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r102167746
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -1110,31 +1184,24 @@ class Analyzer(
 }
 
 /**
- * Pull out all (outer) correlated predicates from a given subquery. 
This method removes the
- * correlated predicates from subquery [[Filter]]s and adds the 
references of these predicates
- * to all intermediate [[Project]] and [[Aggregate]] clauses (if they 
are missing) in order to
- * be able to evaluate the predicates at the top level.
- *
- * This method returns the rewritten subquery and correlated 
predicates.
+ * Validates to make sure the outer references appearing inside the 
subquery
+ * are legal. This function also returns the list of expressions
+ * that contain outer references. These outer references would be kept 
as children
+ * of subquery expressions by the caller of this function.
  */
-private def pullOutCorrelatedPredicates(sub: LogicalPlan): 
(LogicalPlan, Seq[Expression]) = {
-  val predicateMap = scala.collection.mutable.Map.empty[LogicalPlan, 
Seq[Expression]]
+private def checkAndGetOuterReferences(sub: LogicalPlan): 
Seq[Expression] = {
--- End diff --

Also use `foreachUp` instead of `transformUp` for the tree traversal in 
this method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-02-27 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r103336411
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -1398,42 +1399,46 @@ class Analyzer(
 }
   } while (!current.resolved && !current.fastEquals(previous))
 
-  // Step 2: Pull out the predicates if the plan is resolved.
+  // Step 2: pull the outer references and record them as children of 
SubqueryExpression
   if (current.resolved) {
 // Make sure the resolved query has the required number of output 
columns. This is only
 // needed for Scalar and IN subqueries.
 if (requiredColumns > 0 && requiredColumns != current.output.size) 
{
   failAnalysis(s"The number of columns in the subquery 
(${current.output.size}) " +
 s"does not match the required number of columns 
($requiredColumns)")
 }
-// Pullout predicates and construct a new plan.
-f.tupled(rewriteSubQuery(current, plans))
+// Validate the outer reference and record the outer references as 
children of
+// subquery expression.
+f.tupled(current, checkAndGetOuterReferences(current))
--- End diff --

NIT: Do we still need `f.tupled`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-02-27 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r102168672
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -1110,31 +1184,24 @@ class Analyzer(
 }
 
 /**
- * Pull out all (outer) correlated predicates from a given subquery. 
This method removes the
- * correlated predicates from subquery [[Filter]]s and adds the 
references of these predicates
- * to all intermediate [[Project]] and [[Aggregate]] clauses (if they 
are missing) in order to
- * be able to evaluate the predicates at the top level.
- *
- * This method returns the rewritten subquery and correlated 
predicates.
+ * Validates to make sure the outer references appearing inside the 
subquery
+ * are legal. This function also returns the list of expressions
+ * that contain outer references. These outer references would be kept 
as children
+ * of subquery expressions by the caller of this function.
  */
-private def pullOutCorrelatedPredicates(sub: LogicalPlan): 
(LogicalPlan, Seq[Expression]) = {
-  val predicateMap = scala.collection.mutable.Map.empty[LogicalPlan, 
Seq[Expression]]
+private def checkAndGetOuterReferences(sub: LogicalPlan): 
Seq[Expression] = {
+  val outerReferences = 
scala.collection.mutable.ArrayBuffer.empty[Seq[Expression]]
--- End diff --

Nit: `ArrayBuffer.empty[Seq[Expression]]` should also work. Why is it 
useful to collect sequences of expressions instead of just expressions?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-02-27 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r102168299
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -707,13 +709,85 @@ class Analyzer(
   } transformUp {
 case other => other transformExpressions {
   case a: Attribute =>
-
attributeRewrites.get(a).getOrElse(a).withQualifier(a.qualifier)
+dedupAttr(a, attributeRewrites)
+  case s: SubqueryExpression =>
+s.withNewPlan(dedupOuterReferencesInSubquery(s.plan, 
attributeRewrites))
 }
   }
   newRight
   }
 }
 
+private def dedupAttr(attr: Attribute, attrMap: 
AttributeMap[Attribute]): Attribute = {
+  attrMap.get(attr).getOrElse(attr).withQualifier(attr.qualifier)
+}
+
+/**
+ * The outer plan may have been de-duplicated and the function below 
updates the
+ * outer references to refer to the de-duplicated attributes.
+ *
+ * For example (SQL):
+ * {{{
+ *   SELECT * FROM t1
+ *   WHERE EXISTS (SELECT 1
+ * FROM t2
+ * WHERE t1.c1 = t2.c1)
+ *   INTERSECT
+ *   SELECT * FROM t1
+ *   WHERE EXISTS (SELECT 1
+ * FROM t2
+ * WHERE t1.c1 = t2.c1)
+ * }}}
+  * Plan before resolveReference rule.
--- End diff --

So I am all for decent documentation, but do you think we can find a 
somewhat smaller example?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-02-27 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r102167233
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -1110,31 +1184,24 @@ class Analyzer(
 }
 
 /**
- * Pull out all (outer) correlated predicates from a given subquery. 
This method removes the
- * correlated predicates from subquery [[Filter]]s and adds the 
references of these predicates
- * to all intermediate [[Project]] and [[Aggregate]] clauses (if they 
are missing) in order to
- * be able to evaluate the predicates at the top level.
- *
- * This method returns the rewritten subquery and correlated 
predicates.
+ * Validates to make sure the outer references appearing inside the 
subquery
+ * are legal. This function also returns the list of expressions
+ * that contain outer references. These outer references would be kept 
as children
+ * of subquery expressions by the caller of this function.
  */
-private def pullOutCorrelatedPredicates(sub: LogicalPlan): 
(LogicalPlan, Seq[Expression]) = {
-  val predicateMap = scala.collection.mutable.Map.empty[LogicalPlan, 
Seq[Expression]]
+private def checkAndGetOuterReferences(sub: LogicalPlan): 
Seq[Expression] = {
--- End diff --

Should we move this into `CheckAnalysis`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-02-27 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r102168200
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -1110,31 +1184,24 @@ class Analyzer(
 }
 
 /**
- * Pull out all (outer) correlated predicates from a given subquery. 
This method removes the
- * correlated predicates from subquery [[Filter]]s and adds the 
references of these predicates
- * to all intermediate [[Project]] and [[Aggregate]] clauses (if they 
are missing) in order to
- * be able to evaluate the predicates at the top level.
- *
- * This method returns the rewritten subquery and correlated 
predicates.
+ * Validates to make sure the outer references appearing inside the 
subquery
+ * are legal. This function also returns the list of expressions
+ * that contain outer references. These outer references would be kept 
as children
+ * of subquery expressions by the caller of this function.
  */
-private def pullOutCorrelatedPredicates(sub: LogicalPlan): 
(LogicalPlan, Seq[Expression]) = {
-  val predicateMap = scala.collection.mutable.Map.empty[LogicalPlan, 
Seq[Expression]]
+private def checkAndGetOuterReferences(sub: LogicalPlan): 
Seq[Expression] = {
+  val outerReferences = 
scala.collection.mutable.ArrayBuffer.empty[Seq[Expression]]
 
   // Make sure a plan's subtree does not contain outer references
   def failOnOuterReferenceInSubTree(p: LogicalPlan): Unit = {
-if (p.collectFirst(predicateMap).nonEmpty) {
+if (p.find(SubExprUtils.getOuterReferences(_).nonEmpty).nonEmpty) {
--- End diff --

This might be somewhat expensive. We could also use something that 
terminates early.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-02-16 Thread dilipbiswal
Github user dilipbiswal commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r101618763
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
 ---
@@ -158,17 +160,7 @@ trait CheckAnalysis extends PredicateHelper {
   // For projects, do the necessary mapping and skip to its 
child.
   def cleanQuery(p: LogicalPlan): LogicalPlan = p match {
 case s: SubqueryAlias => cleanQuery(s.child)
-case p: Project =>
-  // SPARK-18814: Map any aliases to their 
AttributeReference children
-  // for the checking in the Aggregate operators below 
this Project.
-  subqueryColumns = subqueryColumns.map {
-xs => p.projectList.collectFirst {
-  case e @ Alias(child : AttributeReference, _) if 
e.exprId == xs.exprId =>
-child
-}.getOrElse(xs)
-  }
-
-  cleanQuery(p.child)
+case p: Project => cleanQuery(p.child)
--- End diff --

Now the de-duplication of sub plan happens in optimizer when we actually 
pull up the correlated predicates. Thus the project case is simplified.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-02-16 Thread nsyca
Github user nsyca commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r101594561
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala
 ---
@@ -40,19 +42,179 @@ abstract class PlanExpression[T <: QueryPlan[_]] 
extends Expression {
 /**
  * A base interface for expressions that contain a [[LogicalPlan]].
  */
-abstract class SubqueryExpression extends PlanExpression[LogicalPlan] {
+abstract class SubqueryExpression(
+plan: LogicalPlan,
+children: Seq[Expression],
+exprId: ExprId) extends PlanExpression[LogicalPlan] {
+
+  override lazy val resolved: Boolean = childrenResolved && plan.resolved
+  override lazy val references: AttributeSet =
+if (plan.resolved) super.references -- plan.outputSet else 
super.references
   override def withNewPlan(plan: LogicalPlan): SubqueryExpression
+  override def semanticEquals(o: Expression): Boolean = o match {
+case p: SubqueryExpression =>
+  this.getClass.getName.equals(p.getClass.getName) && 
plan.sameResult(p.plan) &&
+children.length == p.children.length &&
+children.zip(p.children).forall(p => p._1.semanticEquals(p._2))
+case _ => false
+  }
 }
 
 object SubqueryExpression {
+  /**
+   * Returns true when an expression contains an IN or EXISTS subquery and 
false otherwise.
+   */
+  def hasInOrExistsSubquery(e: Expression): Boolean = {
+e.find {
+  case _: ListQuery | _: Exists => true
+  case _ => false
+}.isDefined
+  }
+
+  /**
+   * Returns true when an expression contains a subquery that has outer 
reference(s). The outer
+   * reference attributes are kept as children of subquery expression by
+   * [[org.apache.spark.sql.catalyst.analysis.Analyzer.ResolveSubquery]]
+   */
   def hasCorrelatedSubquery(e: Expression): Boolean = {
 e.find {
-  case e: SubqueryExpression if e.children.nonEmpty => true
+  case s: SubqueryExpression if s.children.nonEmpty => true
   case _ => false
 }.isDefined
   }
 }
 
+object SubExprUtils extends PredicateHelper {
+  /**
+   * Returns true when an expression contains correlated predicates i.e 
outer references and
+   * returns false otherwise.
+   */
+  def containsOuter(e: Expression): Boolean = {
+e.find(_.isInstanceOf[OuterReference]).isDefined
+  }
+
+  /**
+   * Returns whether there are any null-aware predicate subqueries inside 
Not. If not, we could
+   * turn the null-aware predicate into not-null-aware predicate.
+   */
+  def hasNullAwarePredicateWithinNot(e: Expression): Boolean = {
+e.find{ x =>
+  x.isInstanceOf[Not] && e.find {
+case In(_, Seq(_: ListQuery)) => true
+case _ => false
+  }.isDefined
+}.isDefined
+  }
+
+  /**
+   * Returns an expression after removing the OuterReference shell.
+   */
--- End diff --

The following three definitions of `stripOuterReference(s)` is taken from a 
segment of code embedded in `ResolveSubquery.rewriteSubQuery` 
(`Analyzer.scala`).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-02-16 Thread nsyca
Github user nsyca commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r101596895
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala
 ---
@@ -83,29 +95,150 @@ object RewritePredicateSubquery extends 
Rule[LogicalPlan] with PredicateHelper {
   }
 
   /**
-   * Given a predicate expression and an input plan, it rewrites
-   * any embedded existential sub-query into an existential join.
-   * It returns the rewritten expression together with the updated plan.
-   * Currently, it does not support null-aware joins. Embedded NOT IN 
predicates
-   * are blocked in the Analyzer.
+   * Given a predicate expression and an input plan, it rewrites any 
embedded existential sub-query
+   * into an existential join. It returns the rewritten expression 
together with the updated plan.
+   * Currently, it does not support NOT IN nested inside a NOT expression. 
This case is blocked in
+   * the Analyzer.
*/
   private def rewriteExistentialExpr(
   exprs: Seq[Expression],
   plan: LogicalPlan): (Option[Expression], LogicalPlan) = {
 var newPlan = plan
 val newExprs = exprs.map { e =>
   e transformUp {
-case PredicateSubquery(sub, conditions, nullAware, _) =>
-  // TODO: support null-aware join
+case Exists(sub, conditions, exprId) =>
   val exists = AttributeReference("exists", BooleanType, nullable 
= false)()
-  newPlan = Join(newPlan, sub, ExistenceJoin(exists), 
conditions.reduceLeftOption(And))
+  newPlan = Join(newPlan, sub,
+ExistenceJoin(exists), conditions.reduceLeftOption(And))
   exists
-}
+case In(e, Seq(l@ ListQuery(sub, conditions, exprId))) =>
+  val exists = AttributeReference("exists", BooleanType, nullable 
= false)()
+  val inConditions = 
getValueExpression(e).zip(sub.output).map(EqualTo.tupled)
+  newPlan = Join(newPlan, sub,
+ExistenceJoin(exists), (inConditions ++ 
conditions).reduceLeftOption(And))
+  exists
+  }
 }
 (newExprs.reduceOption(And), newPlan)
   }
 }
 
+ /**
+  * Pull out all (outer) correlated predicates from a given subquery. This 
method removes the
+  * correlated predicates from subquery [[Filter]]s and adds the 
references of these predicates
+  * to all intermediate [[Project]] and [[Aggregate]] clauses (if they are 
missing) in order to
+  * be able to evaluate the predicates at the top level.
+  *
+  * TODO: Look to merge this rule with RewritePredicateSubquery.
+  */
+object PullupCorrelatedPredicates extends Rule[LogicalPlan] with 
PredicateHelper {
+   /**
+* Returns the correlated predicates and a updated plan that removes 
the outer references.
+*/
+  private def pullOutCorrelatedPredicates(
+  sub: LogicalPlan,
+  outer: Seq[LogicalPlan]): (LogicalPlan, Seq[Expression]) = {
+val predicateMap = scala.collection.mutable.Map.empty[LogicalPlan, 
Seq[Expression]]
+
+/** Determine which correlated predicate references are missing from 
this plan. */
--- End diff --

The following definition is moved unchanged from a nested definition of the 
same name in `ResolveSubquery.pullOutCorrelatedPredicates` (`Analyzer.scala`)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-02-16 Thread nsyca
Github user nsyca commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r101590145
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala
 ---
@@ -40,19 +42,179 @@ abstract class PlanExpression[T <: QueryPlan[_]] 
extends Expression {
 /**
  * A base interface for expressions that contain a [[LogicalPlan]].
  */
-abstract class SubqueryExpression extends PlanExpression[LogicalPlan] {
+abstract class SubqueryExpression(
+plan: LogicalPlan,
+children: Seq[Expression],
+exprId: ExprId) extends PlanExpression[LogicalPlan] {
+
+  override lazy val resolved: Boolean = childrenResolved && plan.resolved
+  override lazy val references: AttributeSet =
+if (plan.resolved) super.references -- plan.outputSet else 
super.references
   override def withNewPlan(plan: LogicalPlan): SubqueryExpression
+  override def semanticEquals(o: Expression): Boolean = o match {
+case p: SubqueryExpression =>
+  this.getClass.getName.equals(p.getClass.getName) && 
plan.sameResult(p.plan) &&
+children.length == p.children.length &&
+children.zip(p.children).forall(p => p._1.semanticEquals(p._2))
+case _ => false
+  }
 }
 
 object SubqueryExpression {
+  /**
+   * Returns true when an expression contains an IN or EXISTS subquery and 
false otherwise.
+   */
--- End diff --

The following definition is moved from 
`PredicateSubquery.hasPredicateSubquery` with the removing of the  
PredicateSubquery in the first `case` pattern.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-02-16 Thread nsyca
Github user nsyca commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r101590658
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala
 ---
@@ -40,19 +42,179 @@ abstract class PlanExpression[T <: QueryPlan[_]] 
extends Expression {
 /**
  * A base interface for expressions that contain a [[LogicalPlan]].
  */
-abstract class SubqueryExpression extends PlanExpression[LogicalPlan] {
+abstract class SubqueryExpression(
+plan: LogicalPlan,
+children: Seq[Expression],
+exprId: ExprId) extends PlanExpression[LogicalPlan] {
+
+  override lazy val resolved: Boolean = childrenResolved && plan.resolved
+  override lazy val references: AttributeSet =
+if (plan.resolved) super.references -- plan.outputSet else 
super.references
   override def withNewPlan(plan: LogicalPlan): SubqueryExpression
+  override def semanticEquals(o: Expression): Boolean = o match {
+case p: SubqueryExpression =>
+  this.getClass.getName.equals(p.getClass.getName) && 
plan.sameResult(p.plan) &&
+children.length == p.children.length &&
+children.zip(p.children).forall(p => p._1.semanticEquals(p._2))
+case _ => false
+  }
 }
 
 object SubqueryExpression {
+  /**
+   * Returns true when an expression contains an IN or EXISTS subquery and 
false otherwise.
+   */
+  def hasInOrExistsSubquery(e: Expression): Boolean = {
+e.find {
+  case _: ListQuery | _: Exists => true
+  case _ => false
+}.isDefined
+  }
+
+  /**
+   * Returns true when an expression contains a subquery that has outer 
reference(s). The outer
+   * reference attributes are kept as children of subquery expression by
+   * [[org.apache.spark.sql.catalyst.analysis.Analyzer.ResolveSubquery]]
+   */
   def hasCorrelatedSubquery(e: Expression): Boolean = {
 e.find {
-  case e: SubqueryExpression if e.children.nonEmpty => true
+  case s: SubqueryExpression if s.children.nonEmpty => true
   case _ => false
 }.isDefined
   }
 }
 
+object SubExprUtils extends PredicateHelper {
+  /**
+   * Returns true when an expression contains correlated predicates i.e 
outer references and
+   * returns false otherwise.
+   */
+  def containsOuter(e: Expression): Boolean = {
+e.find(_.isInstanceOf[OuterReference]).isDefined
+  }
+
+  /**
+   * Returns whether there are any null-aware predicate subqueries inside 
Not. If not, we could
+   * turn the null-aware predicate into not-null-aware predicate.
+   */
--- End diff --

The following definition is moved from 
`PredicateSubquery.hasNullAwarePredicateWithinNot` with a code change to 
reflect the new representation of IN subquery.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-02-16 Thread nsyca
Github user nsyca commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r101593305
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala
 ---
@@ -40,19 +42,179 @@ abstract class PlanExpression[T <: QueryPlan[_]] 
extends Expression {
 /**
  * A base interface for expressions that contain a [[LogicalPlan]].
  */
-abstract class SubqueryExpression extends PlanExpression[LogicalPlan] {
+abstract class SubqueryExpression(
+plan: LogicalPlan,
+children: Seq[Expression],
+exprId: ExprId) extends PlanExpression[LogicalPlan] {
+
+  override lazy val resolved: Boolean = childrenResolved && plan.resolved
+  override lazy val references: AttributeSet =
+if (plan.resolved) super.references -- plan.outputSet else 
super.references
   override def withNewPlan(plan: LogicalPlan): SubqueryExpression
+  override def semanticEquals(o: Expression): Boolean = o match {
+case p: SubqueryExpression =>
+  this.getClass.getName.equals(p.getClass.getName) && 
plan.sameResult(p.plan) &&
+children.length == p.children.length &&
+children.zip(p.children).forall(p => p._1.semanticEquals(p._2))
+case _ => false
+  }
 }
 
 object SubqueryExpression {
+  /**
+   * Returns true when an expression contains an IN or EXISTS subquery and 
false otherwise.
+   */
+  def hasInOrExistsSubquery(e: Expression): Boolean = {
+e.find {
+  case _: ListQuery | _: Exists => true
+  case _ => false
+}.isDefined
+  }
+
+  /**
+   * Returns true when an expression contains a subquery that has outer 
reference(s). The outer
+   * reference attributes are kept as children of subquery expression by
+   * [[org.apache.spark.sql.catalyst.analysis.Analyzer.ResolveSubquery]]
+   */
   def hasCorrelatedSubquery(e: Expression): Boolean = {
 e.find {
-  case e: SubqueryExpression if e.children.nonEmpty => true
+  case s: SubqueryExpression if s.children.nonEmpty => true
   case _ => false
 }.isDefined
   }
 }
 
+object SubExprUtils extends PredicateHelper {
+  /**
+   * Returns true when an expression contains correlated predicates i.e 
outer references and
+   * returns false otherwise.
+   */
--- End diff --

The following definition is moved from a nested definition of the same name 
in `ResovleSubquery.pullOuterCorrelatedPredicates` in `Analyzer.scala`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-02-16 Thread dilipbiswal
Github user dilipbiswal commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r101598818
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
 ---
@@ -365,17 +385,66 @@ object TypeCoercion {
   }
 
   /**
-   * Convert the value and in list expressions to the common operator type
-   * by looking at all the argument types and finding the closest one that
-   * all the arguments can be cast to. When no common operator type is 
found
-   * the original expression will be returned and an Analysis Exception 
will
-   * be raised at type checking phase.
+   * Handles type coercion for both IN expression with subquery and IN
+   * expressions without subquery.
+   * 1. In the first case, find the common type by comparing the left hand 
side
+   *expression types against corresponding right hand side expression 
derived
+   *from the subquery expression's plan output. Inject appropriate 
casts in the
+   *LHS and RHS side of IN expression.
+   *
+   * 2. In the second case, convert the value and in list expressions to 
the
+   *common operator type by looking at all the argument types and 
finding
+   *the closest one that all the arguments can be cast to. When no 
common
+   *operator type is found the original expression will be returned 
and an
+   *Analysis Exception will be raised at the type checking phase.
*/
   object InConversion extends Rule[LogicalPlan] {
 def apply(plan: LogicalPlan): LogicalPlan = plan resolveExpressions {
   // Skip nodes who's children have not been resolved yet.
   case e if !e.childrenResolved => e
 
+  // Handle type casting required between value expression and 
subquery output
+  // in IN subquery.
+  case i @ In(a, Seq(ListQuery(sub, children, exprId))) if !i.resolved 
=>
--- End diff --

Plan example is available in [Code 
Changes](https://docs.google.com/document/d/18mqjhL9V1An-tNta7aVE13HkALRZ5GZ24AATA-Vqqf0/edit#)
 in section 3.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   >