[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/16954 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/16954#discussion_r105831346 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala --- @@ -123,19 +123,36 @@ case class Not(child: Expression) */ @ExpressionDescription( usage = "expr1 _FUNC_(expr2, expr3, ...) - Returns true if `expr` equals to any valN.") -case class In(value: Expression, list: Seq[Expression]) extends Predicate -with ImplicitCastInputTypes { +case class In(value: Expression, list: Seq[Expression]) extends Predicate { require(list != null, "list should not be null") - - override def inputTypes: Seq[AbstractDataType] = value.dataType +: list.map(_.dataType) - override def checkInputDataTypes(): TypeCheckResult = { -if (list.exists(l => l.dataType != value.dataType)) { - TypeCheckResult.TypeCheckFailure( -"Arguments must be same type") -} else { - TypeCheckResult.TypeCheckSuccess +list match { + case ListQuery(sub, _, _) :: Nil => +val valExprs = value match { + case cns: CreateNamedStruct => cns.valExprs + case expr => Seq(expr) +} +val isTypeMismatched = valExprs.zip(sub.output).exists { + case (l, r) => l.dataType != r.dataType +} +if (isTypeMismatched) { --- End diff -- @hvanhovell The new error message looks like following. Does this look okay to you ? ``` Error in query: cannot resolve '(named_struct('c1', at1.`c1`, 'c2', at1.`c2`) IN (listquery()))' due to data type mismatch: The data type of one or more elements in the left hand side of an IN subquery is not compatible with the data type of the output of the subquery Mismatched columns: [(at1.`c1`:decimal(10,0), at2.`c1`:timestamp), (at1.`c2`:timestamp, at2.`c2`:decimal(10,0))] Left side: [decimal(10,0), timestamp]. Right side: [timestamp, decimal(10,0)]. ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/16954#discussion_r105830367 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala --- @@ -365,17 +368,73 @@ object TypeCoercion { } /** - * Convert the value and in list expressions to the common operator type - * by looking at all the argument types and finding the closest one that - * all the arguments can be cast to. When no common operator type is found - * the original expression will be returned and an Analysis Exception will - * be raised at type checking phase. + * Handles type coercion for both IN expression with subquery and IN + * expressions without subquery. + * 1. In the first case, find the common type by comparing the left hand side (LHS) + *expression types against corresponding right hand side (RHS) expression derived + *from the subquery expression's plan output. Inject appropriate casts in the + *LHS and RHS side of IN expression. + * + * 2. In the second case, convert the value and in list expressions to the + *common operator type by looking at all the argument types and finding + *the closest one that all the arguments can be cast to. When no common + *operator type is found the original expression will be returned and an + *Analysis Exception will be raised at the type checking phase. */ object InConversion extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan resolveExpressions { // Skip nodes who's children have not been resolved yet. case e if !e.childrenResolved => e + // Handle type casting required between value expression and subquery output + // in IN subquery. + case i @ In(a, Seq(ListQuery(sub, children, exprId))) if !i.resolved => +// LHS is the value expression of IN subquery. +val lhs = a match { + // Multi columns in IN clause is represented as a CreateNamedStruct. + // flatten the named struct to get the list of expressions. + case cns: CreateNamedStruct => cns.valExprs + case expr => Seq(expr) +} + +// RHS is the subquery output. +val rhs = sub.output +require(lhs.length == rhs.length) + +val commonTypes = lhs.zip(rhs).flatMap { case (l, r) => + findCommonTypeForBinaryComparison(l.dataType, r.dataType) match { +case d @ Some(_) => d +case _ => findTightestCommonType(l.dataType, r.dataType) + } +} + +// The number of columns/expressions must match between LHS and RHS of an +// IN subquery expression. +if (commonTypes.length == lhs.length) { + val castedRhs = rhs.zip(commonTypes).map { +case (e, dt) if e.dataType != dt => Alias(Cast(e, dt), e.name)() +case (e, _) => e + } + val castedLhs = lhs.zip(commonTypes).map { +case (e, dt) if e.dataType != dt => Cast(e, dt) +case (e, _) => e + } + + // Before constructing the In expression, wrap the multi values in LHS + // in a CreatedNamedStruct. + val newLhs = a match { --- End diff -- @hvanhovell Thanks a lot. You are right, we don't care about the names. This looks much better. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/16954#discussion_r105780981 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala --- @@ -40,19 +43,189 @@ abstract class PlanExpression[T <: QueryPlan[_]] extends Expression { /** * A base interface for expressions that contain a [[LogicalPlan]]. */ -abstract class SubqueryExpression extends PlanExpression[LogicalPlan] { +abstract class SubqueryExpression( +plan: LogicalPlan, +children: Seq[Expression], +exprId: ExprId) extends PlanExpression[LogicalPlan] { + + override lazy val resolved: Boolean = childrenResolved && plan.resolved + override lazy val references: AttributeSet = +if (plan.resolved) super.references -- plan.outputSet else super.references override def withNewPlan(plan: LogicalPlan): SubqueryExpression + override def semanticEquals(o: Expression): Boolean = o match { +case p: SubqueryExpression => + this.getClass.getName.equals(p.getClass.getName) && plan.sameResult(p.plan) && +children.length == p.children.length && +children.zip(p.children).forall(p => p._1.semanticEquals(p._2)) +case _ => false + } } object SubqueryExpression { + /** + * Returns true when an expression contains an IN or EXISTS subquery and false otherwise. + */ + def hasInOrExistsSubquery(e: Expression): Boolean = { +e.find { + case _: ListQuery | _: Exists => true + case _ => false +}.isDefined + } + + /** + * Returns true when an expression contains a subquery that has outer reference(s). The outer + * reference attributes are kept as children of subquery expression by + * [[org.apache.spark.sql.catalyst.analysis.Analyzer.ResolveSubquery]] + */ def hasCorrelatedSubquery(e: Expression): Boolean = { e.find { - case e: SubqueryExpression if e.children.nonEmpty => true + case s: SubqueryExpression => s.children.nonEmpty case _ => false }.isDefined } } +object SubExprUtils extends PredicateHelper { + /** + * Returns true when an expression contains correlated predicates i.e outer references and + * returns false otherwise. + */ + def containsOuter(e: Expression): Boolean = { +e.find(_.isInstanceOf[OuterReference]).isDefined + } + + /** + * Returns whether there are any null-aware predicate subqueries inside Not. If not, we could + * turn the null-aware predicate into not-null-aware predicate. + */ + def hasNullAwarePredicateWithinNot(condition: Expression): Boolean = { +splitConjunctivePredicates(condition).exists { + case _: Exists | Not(_: Exists) | In(_, Seq(_: ListQuery)) | Not(In(_, Seq(_: ListQuery))) => +false + case e => e.find { x => +x.isInstanceOf[Not] && e.find { + case In(_, Seq(_: ListQuery)) => true + case _ => false +}.isDefined + }.isDefined +} + + } + + /** + * Returns an expression after removing the OuterReference shell. + */ + def stripOuterReference(e: Expression): Expression = e.transform { case OuterReference(r) => r } + + /** + * Returns the list of expressions after removing the OuterReference shell from each of + * the expression. + */ + def stripOuterReferences(e: Seq[Expression]): Seq[Expression] = e.map(stripOuterReference) + + /** + * Returns the logical plan after removing the OuterReference shell from all the expressions + * of the input logical plan. + */ + def stripOuterReferences(p: LogicalPlan): LogicalPlan = { +p.transformAllExpressions { + case OuterReference(a) => a +} + } + + /** + * Given a logical plan, returns TRUE if it has an outer reference and false otherwise. + */ + def hasOuterReferences(plan: LogicalPlan): Boolean = { +plan.find { + case f: Filter => containsOuter(f.condition) + case other => false +}.isDefined + } + + /** + * Given a list of expressions, returns the expressions which have outer references. Aggregate + * expressions are treated in a special way. If the children of aggregate expression contains an + * outer reference, then the entire aggregate expression is marked as an outer reference. + * Example (SQL): + * {{{ + * SELECT a FROM l GROUP by 1 HAVING EXISTS (SELECT 1 FROM r WHERE d < min(b)) + * }}} + * In the above case, we want to mark the entire min(b) as an outer reference + * OuterReference(min(b)) instead of min(OuterReference(b)).
[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/16954#discussion_r105780845 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala --- @@ -40,19 +43,189 @@ abstract class PlanExpression[T <: QueryPlan[_]] extends Expression { /** * A base interface for expressions that contain a [[LogicalPlan]]. */ -abstract class SubqueryExpression extends PlanExpression[LogicalPlan] { +abstract class SubqueryExpression( +plan: LogicalPlan, +children: Seq[Expression], +exprId: ExprId) extends PlanExpression[LogicalPlan] { + + override lazy val resolved: Boolean = childrenResolved && plan.resolved + override lazy val references: AttributeSet = +if (plan.resolved) super.references -- plan.outputSet else super.references override def withNewPlan(plan: LogicalPlan): SubqueryExpression + override def semanticEquals(o: Expression): Boolean = o match { +case p: SubqueryExpression => + this.getClass.getName.equals(p.getClass.getName) && plan.sameResult(p.plan) && +children.length == p.children.length && +children.zip(p.children).forall(p => p._1.semanticEquals(p._2)) +case _ => false + } } object SubqueryExpression { + /** + * Returns true when an expression contains an IN or EXISTS subquery and false otherwise. + */ + def hasInOrExistsSubquery(e: Expression): Boolean = { +e.find { + case _: ListQuery | _: Exists => true + case _ => false +}.isDefined + } + + /** + * Returns true when an expression contains a subquery that has outer reference(s). The outer + * reference attributes are kept as children of subquery expression by + * [[org.apache.spark.sql.catalyst.analysis.Analyzer.ResolveSubquery]] + */ def hasCorrelatedSubquery(e: Expression): Boolean = { e.find { - case e: SubqueryExpression if e.children.nonEmpty => true + case s: SubqueryExpression => s.children.nonEmpty case _ => false }.isDefined } } +object SubExprUtils extends PredicateHelper { --- End diff -- Ok, so the the `SubExprUtils` should be moved to the `OuterReference` companion :)... Let's leave it for now. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/16954#discussion_r105779054 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala --- @@ -40,19 +43,189 @@ abstract class PlanExpression[T <: QueryPlan[_]] extends Expression { /** * A base interface for expressions that contain a [[LogicalPlan]]. */ -abstract class SubqueryExpression extends PlanExpression[LogicalPlan] { +abstract class SubqueryExpression( +plan: LogicalPlan, +children: Seq[Expression], +exprId: ExprId) extends PlanExpression[LogicalPlan] { + + override lazy val resolved: Boolean = childrenResolved && plan.resolved + override lazy val references: AttributeSet = +if (plan.resolved) super.references -- plan.outputSet else super.references override def withNewPlan(plan: LogicalPlan): SubqueryExpression + override def semanticEquals(o: Expression): Boolean = o match { +case p: SubqueryExpression => + this.getClass.getName.equals(p.getClass.getName) && plan.sameResult(p.plan) && +children.length == p.children.length && +children.zip(p.children).forall(p => p._1.semanticEquals(p._2)) +case _ => false + } } object SubqueryExpression { + /** + * Returns true when an expression contains an IN or EXISTS subquery and false otherwise. + */ + def hasInOrExistsSubquery(e: Expression): Boolean = { +e.find { + case _: ListQuery | _: Exists => true + case _ => false +}.isDefined + } + + /** + * Returns true when an expression contains a subquery that has outer reference(s). The outer + * reference attributes are kept as children of subquery expression by + * [[org.apache.spark.sql.catalyst.analysis.Analyzer.ResolveSubquery]] + */ def hasCorrelatedSubquery(e: Expression): Boolean = { e.find { - case e: SubqueryExpression if e.children.nonEmpty => true + case s: SubqueryExpression => s.children.nonEmpty case _ => false }.isDefined } } +object SubExprUtils extends PredicateHelper { + /** + * Returns true when an expression contains correlated predicates i.e outer references and + * returns false otherwise. + */ + def containsOuter(e: Expression): Boolean = { +e.find(_.isInstanceOf[OuterReference]).isDefined + } + + /** + * Returns whether there are any null-aware predicate subqueries inside Not. If not, we could + * turn the null-aware predicate into not-null-aware predicate. + */ + def hasNullAwarePredicateWithinNot(condition: Expression): Boolean = { +splitConjunctivePredicates(condition).exists { + case _: Exists | Not(_: Exists) | In(_, Seq(_: ListQuery)) | Not(In(_, Seq(_: ListQuery))) => +false + case e => e.find { x => +x.isInstanceOf[Not] && e.find { + case In(_, Seq(_: ListQuery)) => true + case _ => false +}.isDefined + }.isDefined +} + + } + + /** + * Returns an expression after removing the OuterReference shell. + */ + def stripOuterReference(e: Expression): Expression = e.transform { case OuterReference(r) => r } + + /** + * Returns the list of expressions after removing the OuterReference shell from each of + * the expression. + */ + def stripOuterReferences(e: Seq[Expression]): Seq[Expression] = e.map(stripOuterReference) + + /** + * Returns the logical plan after removing the OuterReference shell from all the expressions + * of the input logical plan. + */ + def stripOuterReferences(p: LogicalPlan): LogicalPlan = { +p.transformAllExpressions { + case OuterReference(a) => a +} + } + + /** + * Given a logical plan, returns TRUE if it has an outer reference and false otherwise. + */ + def hasOuterReferences(plan: LogicalPlan): Boolean = { +plan.find { + case f: Filter => containsOuter(f.condition) + case other => false +}.isDefined + } + + /** + * Given a list of expressions, returns the expressions which have outer references. Aggregate + * expressions are treated in a special way. If the children of aggregate expression contains an + * outer reference, then the entire aggregate expression is marked as an outer reference. + * Example (SQL): + * {{{ + * SELECT a FROM l GROUP by 1 HAVING EXISTS (SELECT 1 FROM r WHERE d < min(b)) + * }}} + * In the above case, we want to mark the entire min(b) as an outer reference + * OuterReference(min(b)) instead of min(OuterReference(b)).
[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/16954#discussion_r105778315 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala --- @@ -40,19 +43,189 @@ abstract class PlanExpression[T <: QueryPlan[_]] extends Expression { /** * A base interface for expressions that contain a [[LogicalPlan]]. */ -abstract class SubqueryExpression extends PlanExpression[LogicalPlan] { +abstract class SubqueryExpression( +plan: LogicalPlan, +children: Seq[Expression], +exprId: ExprId) extends PlanExpression[LogicalPlan] { + + override lazy val resolved: Boolean = childrenResolved && plan.resolved + override lazy val references: AttributeSet = +if (plan.resolved) super.references -- plan.outputSet else super.references override def withNewPlan(plan: LogicalPlan): SubqueryExpression + override def semanticEquals(o: Expression): Boolean = o match { +case p: SubqueryExpression => + this.getClass.getName.equals(p.getClass.getName) && plan.sameResult(p.plan) && +children.length == p.children.length && +children.zip(p.children).forall(p => p._1.semanticEquals(p._2)) +case _ => false + } } object SubqueryExpression { + /** + * Returns true when an expression contains an IN or EXISTS subquery and false otherwise. + */ + def hasInOrExistsSubquery(e: Expression): Boolean = { +e.find { + case _: ListQuery | _: Exists => true + case _ => false +}.isDefined + } + + /** + * Returns true when an expression contains a subquery that has outer reference(s). The outer + * reference attributes are kept as children of subquery expression by + * [[org.apache.spark.sql.catalyst.analysis.Analyzer.ResolveSubquery]] + */ def hasCorrelatedSubquery(e: Expression): Boolean = { e.find { - case e: SubqueryExpression if e.children.nonEmpty => true + case s: SubqueryExpression => s.children.nonEmpty case _ => false }.isDefined } } +object SubExprUtils extends PredicateHelper { --- End diff -- @hvanhovell Actually @nsyca and i had discussed a bit about this. SubqueryExpression object has methods that operates strictly in the context of SubqueryExpression where as the utils has methods that mostly deals with OuterReferences.. So they can operate on the subquery plans referred to from SubqueryExpression or may be in the future if we support queries of the form. ``` select * from t1 left outer join (select * from t2 where t2.c1 = t1.c1) on ⦠``` If you think we should merge this two then let me know and i will do it :-). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/16954#discussion_r105778426 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -1204,80 +1250,44 @@ class Analyzer( // Category 1: // BroadcastHint, Distinct, LeafNode, Repartition, and SubqueryAlias -case p: BroadcastHint => - p -case p: Distinct => - p -case p: LeafNode => - p -case p: Repartition => - p -case p: SubqueryAlias => - p +case _: BroadcastHint | _: Distinct | _: LeafNode | _: Repartition | _: SubqueryAlias => // Category 2: // These operators can be anywhere in a correlated subquery. // so long as they do not host outer references in the operators. -case p: Sort => - failOnOuterReference(p) - p -case p: RepartitionByExpression => - failOnOuterReference(p) - p +case s: Sort => + failOnOuterReference(s) +case r: RepartitionByExpression => + failOnOuterReference(r) // Category 3: // Filter is one of the two operators allowed to host correlated expressions. // The other operator is Join. Filter can be anywhere in a correlated subquery. -case f @ Filter(cond, child) => +case f: Filter => // Find all predicates with an outer reference. - val (correlated, local) = splitConjunctivePredicates(cond).partition(containsOuter) + val (correlated, _) = splitConjunctivePredicates(f.condition).partition(containsOuter) // Find any non-equality correlated predicates foundNonEqualCorrelatedPred = foundNonEqualCorrelatedPred || correlated.exists { case _: EqualTo | _: EqualNullSafe => false case _ => true } - - // Rewrite the filter without the correlated predicates if any. - correlated match { -case Nil => f -case xs if local.nonEmpty => - val newFilter = Filter(local.reduce(And), child) - predicateMap += newFilter -> xs - newFilter -case xs => - predicateMap += child -> xs - child - } + outerReferences ++= getOuterReferences(correlated) --- End diff -- @hvanhovell Will do. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/16954#discussion_r105766906 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala --- @@ -365,17 +368,73 @@ object TypeCoercion { } /** - * Convert the value and in list expressions to the common operator type - * by looking at all the argument types and finding the closest one that - * all the arguments can be cast to. When no common operator type is found - * the original expression will be returned and an Analysis Exception will - * be raised at type checking phase. + * Handles type coercion for both IN expression with subquery and IN + * expressions without subquery. + * 1. In the first case, find the common type by comparing the left hand side (LHS) + *expression types against corresponding right hand side (RHS) expression derived + *from the subquery expression's plan output. Inject appropriate casts in the + *LHS and RHS side of IN expression. + * + * 2. In the second case, convert the value and in list expressions to the + *common operator type by looking at all the argument types and finding + *the closest one that all the arguments can be cast to. When no common + *operator type is found the original expression will be returned and an + *Analysis Exception will be raised at the type checking phase. */ object InConversion extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan resolveExpressions { // Skip nodes who's children have not been resolved yet. case e if !e.childrenResolved => e + // Handle type casting required between value expression and subquery output + // in IN subquery. + case i @ In(a, Seq(ListQuery(sub, children, exprId))) if !i.resolved => +// LHS is the value expression of IN subquery. +val lhs = a match { + // Multi columns in IN clause is represented as a CreateNamedStruct. + // flatten the named struct to get the list of expressions. + case cns: CreateNamedStruct => cns.valExprs + case expr => Seq(expr) +} + +// RHS is the subquery output. +val rhs = sub.output +require(lhs.length == rhs.length) + +val commonTypes = lhs.zip(rhs).flatMap { case (l, r) => + findCommonTypeForBinaryComparison(l.dataType, r.dataType) match { +case d @ Some(_) => d +case _ => findTightestCommonType(l.dataType, r.dataType) + } +} + +// The number of columns/expressions must match between LHS and RHS of an +// IN subquery expression. +if (commonTypes.length == lhs.length) { + val castedRhs = rhs.zip(commonTypes).map { +case (e, dt) if e.dataType != dt => Alias(Cast(e, dt), e.name)() +case (e, _) => e + } + val castedLhs = lhs.zip(commonTypes).map { +case (e, dt) if e.dataType != dt => Cast(e, dt) +case (e, _) => e + } + + // Before constructing the In expression, wrap the multi values in LHS + // in a CreatedNamedStruct. + val newLhs = a match { --- End diff -- My bad, I never compile these snippets. You have a point there. We could just use CreateStruct (since we really don't care about the name). So that would look something like this (again not compiled): ```scala val newLhs = castedLhs match { case Seq(lhs) => lhs case _ => CreateStruct(castedLhs) } ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/16954#discussion_r105734975 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala --- @@ -40,19 +43,189 @@ abstract class PlanExpression[T <: QueryPlan[_]] extends Expression { /** * A base interface for expressions that contain a [[LogicalPlan]]. */ -abstract class SubqueryExpression extends PlanExpression[LogicalPlan] { +abstract class SubqueryExpression( +plan: LogicalPlan, +children: Seq[Expression], +exprId: ExprId) extends PlanExpression[LogicalPlan] { + + override lazy val resolved: Boolean = childrenResolved && plan.resolved + override lazy val references: AttributeSet = +if (plan.resolved) super.references -- plan.outputSet else super.references override def withNewPlan(plan: LogicalPlan): SubqueryExpression + override def semanticEquals(o: Expression): Boolean = o match { +case p: SubqueryExpression => + this.getClass.getName.equals(p.getClass.getName) && plan.sameResult(p.plan) && +children.length == p.children.length && +children.zip(p.children).forall(p => p._1.semanticEquals(p._2)) +case _ => false + } } object SubqueryExpression { + /** + * Returns true when an expression contains an IN or EXISTS subquery and false otherwise. + */ + def hasInOrExistsSubquery(e: Expression): Boolean = { +e.find { + case _: ListQuery | _: Exists => true + case _ => false +}.isDefined + } + + /** + * Returns true when an expression contains a subquery that has outer reference(s). The outer + * reference attributes are kept as children of subquery expression by + * [[org.apache.spark.sql.catalyst.analysis.Analyzer.ResolveSubquery]] + */ def hasCorrelatedSubquery(e: Expression): Boolean = { e.find { - case e: SubqueryExpression if e.children.nonEmpty => true + case s: SubqueryExpression => s.children.nonEmpty case _ => false }.isDefined } } +object SubExprUtils extends PredicateHelper { + /** + * Returns true when an expression contains correlated predicates i.e outer references and + * returns false otherwise. + */ + def containsOuter(e: Expression): Boolean = { +e.find(_.isInstanceOf[OuterReference]).isDefined + } + + /** + * Returns whether there are any null-aware predicate subqueries inside Not. If not, we could + * turn the null-aware predicate into not-null-aware predicate. + */ + def hasNullAwarePredicateWithinNot(condition: Expression): Boolean = { +splitConjunctivePredicates(condition).exists { + case _: Exists | Not(_: Exists) | In(_, Seq(_: ListQuery)) | Not(In(_, Seq(_: ListQuery))) => +false + case e => e.find { x => +x.isInstanceOf[Not] && e.find { --- End diff -- @hvanhovell Sure. will do --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/16954#discussion_r105732875 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala --- @@ -365,17 +368,73 @@ object TypeCoercion { } /** - * Convert the value and in list expressions to the common operator type - * by looking at all the argument types and finding the closest one that - * all the arguments can be cast to. When no common operator type is found - * the original expression will be returned and an Analysis Exception will - * be raised at type checking phase. + * Handles type coercion for both IN expression with subquery and IN + * expressions without subquery. + * 1. In the first case, find the common type by comparing the left hand side (LHS) + *expression types against corresponding right hand side (RHS) expression derived + *from the subquery expression's plan output. Inject appropriate casts in the + *LHS and RHS side of IN expression. + * + * 2. In the second case, convert the value and in list expressions to the + *common operator type by looking at all the argument types and finding + *the closest one that all the arguments can be cast to. When no common + *operator type is found the original expression will be returned and an + *Analysis Exception will be raised at the type checking phase. */ object InConversion extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan resolveExpressions { // Skip nodes who's children have not been resolved yet. case e if !e.childrenResolved => e + // Handle type casting required between value expression and subquery output + // in IN subquery. + case i @ In(a, Seq(ListQuery(sub, children, exprId))) if !i.resolved => +// LHS is the value expression of IN subquery. +val lhs = a match { + // Multi columns in IN clause is represented as a CreateNamedStruct. + // flatten the named struct to get the list of expressions. + case cns: CreateNamedStruct => cns.valExprs + case expr => Seq(expr) +} + +// RHS is the subquery output. +val rhs = sub.output +require(lhs.length == rhs.length) + +val commonTypes = lhs.zip(rhs).flatMap { case (l, r) => + findCommonTypeForBinaryComparison(l.dataType, r.dataType) match { +case d @ Some(_) => d +case _ => findTightestCommonType(l.dataType, r.dataType) + } +} + +// The number of columns/expressions must match between LHS and RHS of an +// IN subquery expression. +if (commonTypes.length == lhs.length) { + val castedRhs = rhs.zip(commonTypes).map { +case (e, dt) if e.dataType != dt => Alias(Cast(e, dt), e.name)() +case (e, _) => e + } + val castedLhs = lhs.zip(commonTypes).map { +case (e, dt) if e.dataType != dt => Cast(e, dt) +case (e, _) => e + } + + // Before constructing the In expression, wrap the multi values in LHS + // in a CreatedNamedStruct. + val newLhs = a match { --- End diff -- @hvanhovell Can you please double check the above code ? I am unable to compile the code as cns is not defined. In this block, i was looking at the original left hand side expression to see if its a named struct and if so i construct a named struct back. castedLhs would always match a Seq(lhs) , no ? Please let me know if i am missing something here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/16954#discussion_r105724836 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala --- @@ -40,19 +43,189 @@ abstract class PlanExpression[T <: QueryPlan[_]] extends Expression { /** * A base interface for expressions that contain a [[LogicalPlan]]. */ -abstract class SubqueryExpression extends PlanExpression[LogicalPlan] { +abstract class SubqueryExpression( +plan: LogicalPlan, +children: Seq[Expression], +exprId: ExprId) extends PlanExpression[LogicalPlan] { + + override lazy val resolved: Boolean = childrenResolved && plan.resolved + override lazy val references: AttributeSet = +if (plan.resolved) super.references -- plan.outputSet else super.references override def withNewPlan(plan: LogicalPlan): SubqueryExpression + override def semanticEquals(o: Expression): Boolean = o match { +case p: SubqueryExpression => + this.getClass.getName.equals(p.getClass.getName) && plan.sameResult(p.plan) && +children.length == p.children.length && +children.zip(p.children).forall(p => p._1.semanticEquals(p._2)) +case _ => false + } } object SubqueryExpression { + /** + * Returns true when an expression contains an IN or EXISTS subquery and false otherwise. + */ + def hasInOrExistsSubquery(e: Expression): Boolean = { +e.find { + case _: ListQuery | _: Exists => true + case _ => false +}.isDefined + } + + /** + * Returns true when an expression contains a subquery that has outer reference(s). The outer + * reference attributes are kept as children of subquery expression by + * [[org.apache.spark.sql.catalyst.analysis.Analyzer.ResolveSubquery]] + */ def hasCorrelatedSubquery(e: Expression): Boolean = { e.find { - case e: SubqueryExpression if e.children.nonEmpty => true + case s: SubqueryExpression => s.children.nonEmpty case _ => false }.isDefined } } +object SubExprUtils extends PredicateHelper { --- End diff -- Why make create two objects `SubqueryExpression` and `SubExprUtils`? Are they that different? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/16954#discussion_r105723410 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -1204,80 +1250,44 @@ class Analyzer( // Category 1: // BroadcastHint, Distinct, LeafNode, Repartition, and SubqueryAlias -case p: BroadcastHint => - p -case p: Distinct => - p -case p: LeafNode => - p -case p: Repartition => - p -case p: SubqueryAlias => - p +case _: BroadcastHint | _: Distinct | _: LeafNode | _: Repartition | _: SubqueryAlias => // Category 2: // These operators can be anywhere in a correlated subquery. // so long as they do not host outer references in the operators. -case p: Sort => - failOnOuterReference(p) - p -case p: RepartitionByExpression => - failOnOuterReference(p) - p +case s: Sort => + failOnOuterReference(s) +case r: RepartitionByExpression => + failOnOuterReference(r) // Category 3: // Filter is one of the two operators allowed to host correlated expressions. // The other operator is Join. Filter can be anywhere in a correlated subquery. -case f @ Filter(cond, child) => +case f: Filter => // Find all predicates with an outer reference. - val (correlated, local) = splitConjunctivePredicates(cond).partition(containsOuter) + val (correlated, _) = splitConjunctivePredicates(f.condition).partition(containsOuter) // Find any non-equality correlated predicates foundNonEqualCorrelatedPred = foundNonEqualCorrelatedPred || correlated.exists { case _: EqualTo | _: EqualNullSafe => false case _ => true } - - // Rewrite the filter without the correlated predicates if any. - correlated match { -case Nil => f -case xs if local.nonEmpty => - val newFilter = Filter(local.reduce(And), child) - predicateMap += newFilter -> xs - newFilter -case xs => - predicateMap += child -> xs - child - } + outerReferences ++= getOuterReferences(correlated) --- End diff -- `getOuterReferences` is kind of magical in the sense that it also isolates the aggregate expression. Could you add a single line of comment to highlight this? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/16954#discussion_r105719841 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala --- @@ -40,19 +43,189 @@ abstract class PlanExpression[T <: QueryPlan[_]] extends Expression { /** * A base interface for expressions that contain a [[LogicalPlan]]. */ -abstract class SubqueryExpression extends PlanExpression[LogicalPlan] { +abstract class SubqueryExpression( +plan: LogicalPlan, +children: Seq[Expression], +exprId: ExprId) extends PlanExpression[LogicalPlan] { + + override lazy val resolved: Boolean = childrenResolved && plan.resolved + override lazy val references: AttributeSet = +if (plan.resolved) super.references -- plan.outputSet else super.references override def withNewPlan(plan: LogicalPlan): SubqueryExpression + override def semanticEquals(o: Expression): Boolean = o match { +case p: SubqueryExpression => + this.getClass.getName.equals(p.getClass.getName) && plan.sameResult(p.plan) && +children.length == p.children.length && +children.zip(p.children).forall(p => p._1.semanticEquals(p._2)) +case _ => false + } } object SubqueryExpression { + /** + * Returns true when an expression contains an IN or EXISTS subquery and false otherwise. + */ + def hasInOrExistsSubquery(e: Expression): Boolean = { +e.find { + case _: ListQuery | _: Exists => true + case _ => false +}.isDefined + } + + /** + * Returns true when an expression contains a subquery that has outer reference(s). The outer + * reference attributes are kept as children of subquery expression by + * [[org.apache.spark.sql.catalyst.analysis.Analyzer.ResolveSubquery]] + */ def hasCorrelatedSubquery(e: Expression): Boolean = { e.find { - case e: SubqueryExpression if e.children.nonEmpty => true + case s: SubqueryExpression => s.children.nonEmpty case _ => false }.isDefined } } +object SubExprUtils extends PredicateHelper { + /** + * Returns true when an expression contains correlated predicates i.e outer references and + * returns false otherwise. + */ + def containsOuter(e: Expression): Boolean = { +e.find(_.isInstanceOf[OuterReference]).isDefined + } + + /** + * Returns whether there are any null-aware predicate subqueries inside Not. If not, we could + * turn the null-aware predicate into not-null-aware predicate. + */ + def hasNullAwarePredicateWithinNot(condition: Expression): Boolean = { +splitConjunctivePredicates(condition).exists { + case _: Exists | Not(_: Exists) | In(_, Seq(_: ListQuery)) | Not(In(_, Seq(_: ListQuery))) => +false + case e => e.find { x => +x.isInstanceOf[Not] && e.find { + case In(_, Seq(_: ListQuery)) => true + case _ => false +}.isDefined + }.isDefined +} + + } + + /** + * Returns an expression after removing the OuterReference shell. + */ + def stripOuterReference(e: Expression): Expression = e.transform { case OuterReference(r) => r } + + /** + * Returns the list of expressions after removing the OuterReference shell from each of + * the expression. + */ + def stripOuterReferences(e: Seq[Expression]): Seq[Expression] = e.map(stripOuterReference) + + /** + * Returns the logical plan after removing the OuterReference shell from all the expressions + * of the input logical plan. + */ + def stripOuterReferences(p: LogicalPlan): LogicalPlan = { +p.transformAllExpressions { + case OuterReference(a) => a +} + } + + /** + * Given a logical plan, returns TRUE if it has an outer reference and false otherwise. + */ + def hasOuterReferences(plan: LogicalPlan): Boolean = { +plan.find { + case f: Filter => containsOuter(f.condition) + case other => false +}.isDefined + } + + /** + * Given a list of expressions, returns the expressions which have outer references. Aggregate + * expressions are treated in a special way. If the children of aggregate expression contains an + * outer reference, then the entire aggregate expression is marked as an outer reference. + * Example (SQL): + * {{{ + * SELECT a FROM l GROUP by 1 HAVING EXISTS (SELECT 1 FROM r WHERE d < min(b)) + * }}} + * In the above case, we want to mark the entire min(b) as an outer reference + * OuterReference(min(b)) instead of min(OuterReference(b)).
[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/16954#discussion_r105718880 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala --- @@ -40,19 +43,189 @@ abstract class PlanExpression[T <: QueryPlan[_]] extends Expression { /** * A base interface for expressions that contain a [[LogicalPlan]]. */ -abstract class SubqueryExpression extends PlanExpression[LogicalPlan] { +abstract class SubqueryExpression( +plan: LogicalPlan, +children: Seq[Expression], +exprId: ExprId) extends PlanExpression[LogicalPlan] { + + override lazy val resolved: Boolean = childrenResolved && plan.resolved + override lazy val references: AttributeSet = +if (plan.resolved) super.references -- plan.outputSet else super.references override def withNewPlan(plan: LogicalPlan): SubqueryExpression + override def semanticEquals(o: Expression): Boolean = o match { +case p: SubqueryExpression => + this.getClass.getName.equals(p.getClass.getName) && plan.sameResult(p.plan) && +children.length == p.children.length && +children.zip(p.children).forall(p => p._1.semanticEquals(p._2)) +case _ => false + } } object SubqueryExpression { + /** + * Returns true when an expression contains an IN or EXISTS subquery and false otherwise. + */ + def hasInOrExistsSubquery(e: Expression): Boolean = { +e.find { + case _: ListQuery | _: Exists => true + case _ => false +}.isDefined + } + + /** + * Returns true when an expression contains a subquery that has outer reference(s). The outer + * reference attributes are kept as children of subquery expression by + * [[org.apache.spark.sql.catalyst.analysis.Analyzer.ResolveSubquery]] + */ def hasCorrelatedSubquery(e: Expression): Boolean = { e.find { - case e: SubqueryExpression if e.children.nonEmpty => true + case s: SubqueryExpression => s.children.nonEmpty case _ => false }.isDefined } } +object SubExprUtils extends PredicateHelper { + /** + * Returns true when an expression contains correlated predicates i.e outer references and + * returns false otherwise. + */ + def containsOuter(e: Expression): Boolean = { +e.find(_.isInstanceOf[OuterReference]).isDefined + } + + /** + * Returns whether there are any null-aware predicate subqueries inside Not. If not, we could + * turn the null-aware predicate into not-null-aware predicate. + */ + def hasNullAwarePredicateWithinNot(condition: Expression): Boolean = { +splitConjunctivePredicates(condition).exists { + case _: Exists | Not(_: Exists) | In(_, Seq(_: ListQuery)) | Not(In(_, Seq(_: ListQuery))) => +false + case e => e.find { x => +x.isInstanceOf[Not] && e.find { + case In(_, Seq(_: ListQuery)) => true + case _ => false +}.isDefined + }.isDefined +} + + } + + /** + * Returns an expression after removing the OuterReference shell. + */ + def stripOuterReference(e: Expression): Expression = e.transform { case OuterReference(r) => r } + + /** + * Returns the list of expressions after removing the OuterReference shell from each of + * the expression. + */ + def stripOuterReferences(e: Seq[Expression]): Seq[Expression] = e.map(stripOuterReference) + + /** + * Returns the logical plan after removing the OuterReference shell from all the expressions + * of the input logical plan. + */ + def stripOuterReferences(p: LogicalPlan): LogicalPlan = { +p.transformAllExpressions { + case OuterReference(a) => a +} + } + + /** + * Given a logical plan, returns TRUE if it has an outer reference and false otherwise. + */ + def hasOuterReferences(plan: LogicalPlan): Boolean = { +plan.find { + case f: Filter => containsOuter(f.condition) + case other => false +}.isDefined + } + + /** + * Given a list of expressions, returns the expressions which have outer references. Aggregate + * expressions are treated in a special way. If the children of aggregate expression contains an + * outer reference, then the entire aggregate expression is marked as an outer reference. + * Example (SQL): + * {{{ + * SELECT a FROM l GROUP by 1 HAVING EXISTS (SELECT 1 FROM r WHERE d < min(b)) + * }}} + * In the above case, we want to mark the entire min(b) as an outer reference + * OuterReference(min(b)) instead of min(OuterReference(b)).
[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/16954#discussion_r105717565 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala --- @@ -40,19 +43,189 @@ abstract class PlanExpression[T <: QueryPlan[_]] extends Expression { /** * A base interface for expressions that contain a [[LogicalPlan]]. */ -abstract class SubqueryExpression extends PlanExpression[LogicalPlan] { +abstract class SubqueryExpression( +plan: LogicalPlan, +children: Seq[Expression], +exprId: ExprId) extends PlanExpression[LogicalPlan] { + + override lazy val resolved: Boolean = childrenResolved && plan.resolved + override lazy val references: AttributeSet = +if (plan.resolved) super.references -- plan.outputSet else super.references override def withNewPlan(plan: LogicalPlan): SubqueryExpression + override def semanticEquals(o: Expression): Boolean = o match { +case p: SubqueryExpression => + this.getClass.getName.equals(p.getClass.getName) && plan.sameResult(p.plan) && +children.length == p.children.length && +children.zip(p.children).forall(p => p._1.semanticEquals(p._2)) +case _ => false + } } object SubqueryExpression { + /** + * Returns true when an expression contains an IN or EXISTS subquery and false otherwise. + */ + def hasInOrExistsSubquery(e: Expression): Boolean = { +e.find { + case _: ListQuery | _: Exists => true + case _ => false +}.isDefined + } + + /** + * Returns true when an expression contains a subquery that has outer reference(s). The outer + * reference attributes are kept as children of subquery expression by + * [[org.apache.spark.sql.catalyst.analysis.Analyzer.ResolveSubquery]] + */ def hasCorrelatedSubquery(e: Expression): Boolean = { e.find { - case e: SubqueryExpression if e.children.nonEmpty => true + case s: SubqueryExpression => s.children.nonEmpty case _ => false }.isDefined } } +object SubExprUtils extends PredicateHelper { + /** + * Returns true when an expression contains correlated predicates i.e outer references and + * returns false otherwise. + */ + def containsOuter(e: Expression): Boolean = { +e.find(_.isInstanceOf[OuterReference]).isDefined + } + + /** + * Returns whether there are any null-aware predicate subqueries inside Not. If not, we could + * turn the null-aware predicate into not-null-aware predicate. + */ + def hasNullAwarePredicateWithinNot(condition: Expression): Boolean = { +splitConjunctivePredicates(condition).exists { + case _: Exists | Not(_: Exists) | In(_, Seq(_: ListQuery)) | Not(In(_, Seq(_: ListQuery))) => +false + case e => e.find { x => +x.isInstanceOf[Not] && e.find { --- End diff -- This still looks fishy to me. It is also in the original code base, so we don't have to fix this now. Can you open a JIRA to track this? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/16954#discussion_r105717078 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala --- @@ -123,19 +123,36 @@ case class Not(child: Expression) */ @ExpressionDescription( usage = "expr1 _FUNC_(expr2, expr3, ...) - Returns true if `expr` equals to any valN.") -case class In(value: Expression, list: Seq[Expression]) extends Predicate -with ImplicitCastInputTypes { +case class In(value: Expression, list: Seq[Expression]) extends Predicate { require(list != null, "list should not be null") - - override def inputTypes: Seq[AbstractDataType] = value.dataType +: list.map(_.dataType) - override def checkInputDataTypes(): TypeCheckResult = { -if (list.exists(l => l.dataType != value.dataType)) { - TypeCheckResult.TypeCheckFailure( -"Arguments must be same type") -} else { - TypeCheckResult.TypeCheckSuccess +list match { + case ListQuery(sub, _, _) :: Nil => +val valExprs = value match { + case cns: CreateNamedStruct => cns.valExprs + case expr => Seq(expr) +} +val isTypeMismatched = valExprs.zip(sub.output).exists { + case (l, r) => l.dataType != r.dataType +} +if (isTypeMismatched) { + TypeCheckResult.TypeCheckFailure( +s""" + |The data type of one or more elements in the LHS of an IN subquery + |[${valExprs.map(_.dataType).mkString(", ")}] --- End diff -- Use catalog strings please. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/16954#discussion_r105716983 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala --- @@ -123,19 +123,36 @@ case class Not(child: Expression) */ @ExpressionDescription( usage = "expr1 _FUNC_(expr2, expr3, ...) - Returns true if `expr` equals to any valN.") -case class In(value: Expression, list: Seq[Expression]) extends Predicate -with ImplicitCastInputTypes { +case class In(value: Expression, list: Seq[Expression]) extends Predicate { require(list != null, "list should not be null") - - override def inputTypes: Seq[AbstractDataType] = value.dataType +: list.map(_.dataType) - override def checkInputDataTypes(): TypeCheckResult = { -if (list.exists(l => l.dataType != value.dataType)) { - TypeCheckResult.TypeCheckFailure( -"Arguments must be same type") -} else { - TypeCheckResult.TypeCheckSuccess +list match { + case ListQuery(sub, _, _) :: Nil => +val valExprs = value match { + case cns: CreateNamedStruct => cns.valExprs + case expr => Seq(expr) +} +val isTypeMismatched = valExprs.zip(sub.output).exists { + case (l, r) => l.dataType != r.dataType +} +if (isTypeMismatched) { --- End diff -- It is better to display the offending columns. Now the user has to find the offending columns by herself/himself. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/16954#discussion_r105715265 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala --- @@ -365,17 +368,73 @@ object TypeCoercion { } /** - * Convert the value and in list expressions to the common operator type - * by looking at all the argument types and finding the closest one that - * all the arguments can be cast to. When no common operator type is found - * the original expression will be returned and an Analysis Exception will - * be raised at type checking phase. + * Handles type coercion for both IN expression with subquery and IN + * expressions without subquery. + * 1. In the first case, find the common type by comparing the left hand side (LHS) + *expression types against corresponding right hand side (RHS) expression derived + *from the subquery expression's plan output. Inject appropriate casts in the + *LHS and RHS side of IN expression. + * + * 2. In the second case, convert the value and in list expressions to the + *common operator type by looking at all the argument types and finding + *the closest one that all the arguments can be cast to. When no common + *operator type is found the original expression will be returned and an + *Analysis Exception will be raised at the type checking phase. */ object InConversion extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan resolveExpressions { // Skip nodes who's children have not been resolved yet. case e if !e.childrenResolved => e + // Handle type casting required between value expression and subquery output + // in IN subquery. + case i @ In(a, Seq(ListQuery(sub, children, exprId))) if !i.resolved => +// LHS is the value expression of IN subquery. +val lhs = a match { + // Multi columns in IN clause is represented as a CreateNamedStruct. + // flatten the named struct to get the list of expressions. + case cns: CreateNamedStruct => cns.valExprs + case expr => Seq(expr) +} + +// RHS is the subquery output. +val rhs = sub.output +require(lhs.length == rhs.length) + +val commonTypes = lhs.zip(rhs).flatMap { case (l, r) => + findCommonTypeForBinaryComparison(l.dataType, r.dataType) match { --- End diff -- Use `orElse`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/16954#discussion_r105714984 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala --- @@ -365,17 +385,66 @@ object TypeCoercion { } /** - * Convert the value and in list expressions to the common operator type - * by looking at all the argument types and finding the closest one that - * all the arguments can be cast to. When no common operator type is found - * the original expression will be returned and an Analysis Exception will - * be raised at type checking phase. + * Handles type coercion for both IN expression with subquery and IN + * expressions without subquery. + * 1. In the first case, find the common type by comparing the left hand side + *expression types against corresponding right hand side expression derived + *from the subquery expression's plan output. Inject appropriate casts in the + *LHS and RHS side of IN expression. + * + * 2. In the second case, convert the value and in list expressions to the + *common operator type by looking at all the argument types and finding + *the closest one that all the arguments can be cast to. When no common + *operator type is found the original expression will be returned and an + *Analysis Exception will be raised at the type checking phase. */ object InConversion extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan resolveExpressions { // Skip nodes who's children have not been resolved yet. case e if !e.childrenResolved => e + // Handle type casting required between value expression and subquery output + // in IN subquery. + case i @ In(a, Seq(ListQuery(sub, children, exprId))) if !i.resolved => --- End diff -- Maybe we should create a special InSubQuery expression. This looks like a lot of work for what we are doing here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/16954#discussion_r105716052 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala --- @@ -365,17 +368,73 @@ object TypeCoercion { } /** - * Convert the value and in list expressions to the common operator type - * by looking at all the argument types and finding the closest one that - * all the arguments can be cast to. When no common operator type is found - * the original expression will be returned and an Analysis Exception will - * be raised at type checking phase. + * Handles type coercion for both IN expression with subquery and IN + * expressions without subquery. + * 1. In the first case, find the common type by comparing the left hand side (LHS) + *expression types against corresponding right hand side (RHS) expression derived + *from the subquery expression's plan output. Inject appropriate casts in the + *LHS and RHS side of IN expression. + * + * 2. In the second case, convert the value and in list expressions to the + *common operator type by looking at all the argument types and finding + *the closest one that all the arguments can be cast to. When no common + *operator type is found the original expression will be returned and an + *Analysis Exception will be raised at the type checking phase. */ object InConversion extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan resolveExpressions { // Skip nodes who's children have not been resolved yet. case e if !e.childrenResolved => e + // Handle type casting required between value expression and subquery output + // in IN subquery. + case i @ In(a, Seq(ListQuery(sub, children, exprId))) if !i.resolved => +// LHS is the value expression of IN subquery. +val lhs = a match { + // Multi columns in IN clause is represented as a CreateNamedStruct. + // flatten the named struct to get the list of expressions. + case cns: CreateNamedStruct => cns.valExprs + case expr => Seq(expr) +} + +// RHS is the subquery output. +val rhs = sub.output +require(lhs.length == rhs.length) + +val commonTypes = lhs.zip(rhs).flatMap { case (l, r) => + findCommonTypeForBinaryComparison(l.dataType, r.dataType) match { +case d @ Some(_) => d +case _ => findTightestCommonType(l.dataType, r.dataType) + } +} + +// The number of columns/expressions must match between LHS and RHS of an +// IN subquery expression. +if (commonTypes.length == lhs.length) { + val castedRhs = rhs.zip(commonTypes).map { +case (e, dt) if e.dataType != dt => Alias(Cast(e, dt), e.name)() +case (e, _) => e + } + val castedLhs = lhs.zip(commonTypes).map { +case (e, dt) if e.dataType != dt => Cast(e, dt) +case (e, _) => e + } + + // Before constructing the In expression, wrap the multi values in LHS + // in a CreatedNamedStruct. + val newLhs = a match { --- End diff -- I think it is better if you cast on the `castedLhs` here: ```scala val newLhs = castedLhs match { case Seq(lhs) => lhs case _ => CreateNamedStruct(cns.nameExprs.zip(castedLhs).flatMap { case (name, value) => Seq(name, value) }) } ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/16954#discussion_r105714546 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala --- @@ -365,17 +368,73 @@ object TypeCoercion { } /** - * Convert the value and in list expressions to the common operator type - * by looking at all the argument types and finding the closest one that - * all the arguments can be cast to. When no common operator type is found - * the original expression will be returned and an Analysis Exception will - * be raised at type checking phase. + * Handles type coercion for both IN expression with subquery and IN + * expressions without subquery. + * 1. In the first case, find the common type by comparing the left hand side (LHS) + *expression types against corresponding right hand side (RHS) expression derived + *from the subquery expression's plan output. Inject appropriate casts in the + *LHS and RHS side of IN expression. + * + * 2. In the second case, convert the value and in list expressions to the + *common operator type by looking at all the argument types and finding + *the closest one that all the arguments can be cast to. When no common + *operator type is found the original expression will be returned and an + *Analysis Exception will be raised at the type checking phase. */ object InConversion extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan resolveExpressions { // Skip nodes who's children have not been resolved yet. case e if !e.childrenResolved => e + // Handle type casting required between value expression and subquery output + // in IN subquery. + case i @ In(a, Seq(ListQuery(sub, children, exprId))) if !i.resolved => +// LHS is the value expression of IN subquery. +val lhs = a match { + // Multi columns in IN clause is represented as a CreateNamedStruct. + // flatten the named struct to get the list of expressions. + case cns: CreateNamedStruct => cns.valExprs + case expr => Seq(expr) +} + +// RHS is the subquery output. +val rhs = sub.output +require(lhs.length == rhs.length) --- End diff -- Please check this in the guard of the rule. This currently throws a very hard to understand error. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/16954#discussion_r105712270 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala --- @@ -109,6 +109,26 @@ object TypeCoercion { } /** + * This function determines the target type of a comparison operator when one operand + * is a String and the other is not. It also handles when one op is a Date and the + * other is a Timestamp by making the target type to be String. Currently this is used + * to coerce types between LHS and RHS of the IN expression. + */ + val findCommonTypeForBinaryComparison: (DataType, DataType) => Option[DataType] = { +case (StringType, DateType) => Some(StringType) +case (DateType, StringType) => Some(StringType) +case (StringType, TimestampType) => Some(StringType) +case (TimestampType, StringType) => Some(StringType) +case (TimestampType, DateType) => Some(StringType) --- End diff -- O wait, you have changed it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/16954#discussion_r105711886 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala --- @@ -109,6 +109,26 @@ object TypeCoercion { } /** + * This function determines the target type of a comparison operator when one operand + * is a String and the other is not. It also handles when one op is a Date and the + * other is a Timestamp by making the target type to be String. Currently this is used + * to coerce types between LHS and RHS of the IN expression. + */ + val findCommonTypeForBinaryComparison: (DataType, DataType) => Option[DataType] = { +case (StringType, DateType) => Some(StringType) +case (DateType, StringType) => Some(StringType) +case (StringType, TimestampType) => Some(StringType) +case (TimestampType, StringType) => Some(StringType) +case (TimestampType, DateType) => Some(StringType) --- End diff -- Ok please do this in a follow-up --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/16954#discussion_r104301109 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala --- @@ -109,6 +109,26 @@ object TypeCoercion { } /** + * This function determines the target type of a comparison operator when one operand + * is a String and the other is not. It also handles when one op is a Date and the + * other is a Timestamp by making the target type to be String. Currently this is used + * to coerce types between LHS and RHS of the IN expression. + */ + val findCommonTypeForBinaryComparison: (DataType, DataType) => Option[DataType] = { +case (StringType, DateType) => Some(StringType) +case (DateType, StringType) => Some(StringType) +case (StringType, TimestampType) => Some(StringType) +case (TimestampType, StringType) => Some(StringType) +case (TimestampType, DateType) => Some(StringType) --- End diff -- @hvanhovell Thanks!!. I had tried to do this before as well as this came up during the internal review. I have made another try. Please let me know what you think. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/16954#discussion_r104287344 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala --- @@ -109,6 +109,26 @@ object TypeCoercion { } /** + * This function determines the target type of a comparison operator when one operand + * is a String and the other is not. It also handles when one op is a Date and the + * other is a Timestamp by making the target type to be String. Currently this is used + * to coerce types between LHS and RHS of the IN expression. + */ + val findCommonTypeForBinaryComparison: (DataType, DataType) => Option[DataType] = { +case (StringType, DateType) => Some(StringType) +case (DateType, StringType) => Some(StringType) +case (StringType, TimestampType) => Some(StringType) +case (TimestampType, StringType) => Some(StringType) +case (TimestampType, DateType) => Some(StringType) --- End diff -- Shouldn't we factor that code out then? Now we have the same logic in two places. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/16954#discussion_r104063308 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -21,12 +21,13 @@ import scala.annotation.tailrec import scala.collection.mutable.ArrayBuffer import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.{CatalystConf, ScalaReflection, SimpleCatalystConf, TableIdentifier} +import org.apache.spark.sql.catalyst._ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.encoders.OuterScopes import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.expressions.objects.NewInstance +import org.apache.spark.sql.catalyst.expressions.SubExprUtils._ --- End diff -- Ok --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/16954#discussion_r103995282 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -21,12 +21,13 @@ import scala.annotation.tailrec import scala.collection.mutable.ArrayBuffer import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.{CatalystConf, ScalaReflection, SimpleCatalystConf, TableIdentifier} +import org.apache.spark.sql.catalyst._ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.encoders.OuterScopes import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.expressions.objects.NewInstance +import org.apache.spark.sql.catalyst.expressions.SubExprUtils._ --- End diff -- @gatorsmile There is another place in analyzer(updateOuterReferencesInSubquery) that also uses SubExprUtils. Thats why i moved it to the main imports .. What do you think ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/16954#discussion_r103985902 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala --- @@ -40,19 +43,194 @@ abstract class PlanExpression[T <: QueryPlan[_]] extends Expression { /** * A base interface for expressions that contain a [[LogicalPlan]]. */ -abstract class SubqueryExpression extends PlanExpression[LogicalPlan] { +abstract class SubqueryExpression( +plan: LogicalPlan, +children: Seq[Expression], +exprId: ExprId) extends PlanExpression[LogicalPlan] { + + override lazy val resolved: Boolean = childrenResolved && plan.resolved + override lazy val references: AttributeSet = +if (plan.resolved) super.references -- plan.outputSet else super.references override def withNewPlan(plan: LogicalPlan): SubqueryExpression + override def semanticEquals(o: Expression): Boolean = o match { +case p: SubqueryExpression => + this.getClass.getName.equals(p.getClass.getName) && plan.sameResult(p.plan) && +children.length == p.children.length && +children.zip(p.children).forall(p => p._1.semanticEquals(p._2)) +case _ => false + } } object SubqueryExpression { + /** + * Returns true when an expression contains an IN or EXISTS subquery and false otherwise. + */ + def hasInOrExistsSubquery(e: Expression): Boolean = { +e.find { + case _: ListQuery | _: Exists => true + case _ => false +}.isDefined + } + + /** + * Returns true when an expression contains a subquery that has outer reference(s). The outer + * reference attributes are kept as children of subquery expression by + * [[org.apache.spark.sql.catalyst.analysis.Analyzer.ResolveSubquery]] + */ def hasCorrelatedSubquery(e: Expression): Boolean = { e.find { - case e: SubqueryExpression if e.children.nonEmpty => true + case s: SubqueryExpression => s.children.nonEmpty case _ => false }.isDefined } } +object SubExprUtils extends PredicateHelper { + /** + * Returns true when an expression contains correlated predicates i.e outer references and + * returns false otherwise. + */ + def containsOuter(e: Expression): Boolean = { +e.find(_.isInstanceOf[OuterReference]).isDefined + } + + /** + * Returns whether there are any null-aware predicate subqueries inside Not. If not, we could + * turn the null-aware predicate into not-null-aware predicate. + */ + def hasNullAwarePredicateWithinNot(condition: Expression): Boolean = { +splitConjunctivePredicates(condition).exists { + case _: Exists | Not(_: Exists) | In(_, Seq(_: ListQuery)) | Not(In(_, Seq(_: ListQuery))) => +false + case e => e.find { x => +x.isInstanceOf[Not] && e.find { + case In(_, Seq(_: ListQuery)) => true + case _ => false +}.isDefined + }.isDefined +} + + } + + /** + * Returns an expression after removing the OuterReference shell. + */ + def stripOuterReference(e: Expression): Expression = { +e.transform { + case OuterReference(r) => r +} + } + + /** + * Returns the list of expressions after removing the OuterReference shell from each of + * the expression. + */ + def stripOuterReferences(e: Seq[Expression]): Seq[Expression] = e.map(stripOuterReference) + + /** + * Returns the logical plan after removing the OuterReference shell from all the expressions + * of the input logical plan. + */ + def stripOuterReferences(p: LogicalPlan): LogicalPlan = { +p.transformAllExpressions { + case OuterReference(a) => a +} + } + + /** + * Given a logical plan, returns TRUE if it has an outer reference and false otherwise. + */ + def hasOuterReferences(plan: LogicalPlan): Boolean = { +plan.find { + case f: Filter => containsOuter(f.condition) + case other => false +}.isDefined + } + + /** + * Given a list of expressions, returns the expressions which have outer references. Aggregate + * expressions are treated in a special way. If the children of aggregate expression contains an + * outer reference, then the entire aggregate expression is marked as an outer reference. + * Example (SQL): + * {{{ + * SELECT a FROM l GROUP by 1 HAVING EXISTS (SELECT 1 FROM r WHERE d < min(b)) + * }}} + * In the above case, we want to mark the entire min(b) as an outer reference + *
[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/16954#discussion_r103984027 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala --- @@ -40,19 +43,194 @@ abstract class PlanExpression[T <: QueryPlan[_]] extends Expression { /** * A base interface for expressions that contain a [[LogicalPlan]]. */ -abstract class SubqueryExpression extends PlanExpression[LogicalPlan] { +abstract class SubqueryExpression( +plan: LogicalPlan, +children: Seq[Expression], +exprId: ExprId) extends PlanExpression[LogicalPlan] { + + override lazy val resolved: Boolean = childrenResolved && plan.resolved + override lazy val references: AttributeSet = +if (plan.resolved) super.references -- plan.outputSet else super.references override def withNewPlan(plan: LogicalPlan): SubqueryExpression + override def semanticEquals(o: Expression): Boolean = o match { +case p: SubqueryExpression => + this.getClass.getName.equals(p.getClass.getName) && plan.sameResult(p.plan) && +children.length == p.children.length && +children.zip(p.children).forall(p => p._1.semanticEquals(p._2)) +case _ => false + } } object SubqueryExpression { + /** + * Returns true when an expression contains an IN or EXISTS subquery and false otherwise. + */ + def hasInOrExistsSubquery(e: Expression): Boolean = { +e.find { + case _: ListQuery | _: Exists => true + case _ => false +}.isDefined + } + + /** + * Returns true when an expression contains a subquery that has outer reference(s). The outer + * reference attributes are kept as children of subquery expression by + * [[org.apache.spark.sql.catalyst.analysis.Analyzer.ResolveSubquery]] + */ def hasCorrelatedSubquery(e: Expression): Boolean = { e.find { - case e: SubqueryExpression if e.children.nonEmpty => true + case s: SubqueryExpression => s.children.nonEmpty case _ => false }.isDefined } } +object SubExprUtils extends PredicateHelper { + /** + * Returns true when an expression contains correlated predicates i.e outer references and + * returns false otherwise. + */ + def containsOuter(e: Expression): Boolean = { +e.find(_.isInstanceOf[OuterReference]).isDefined + } + + /** + * Returns whether there are any null-aware predicate subqueries inside Not. If not, we could + * turn the null-aware predicate into not-null-aware predicate. + */ + def hasNullAwarePredicateWithinNot(condition: Expression): Boolean = { +splitConjunctivePredicates(condition).exists { + case _: Exists | Not(_: Exists) | In(_, Seq(_: ListQuery)) | Not(In(_, Seq(_: ListQuery))) => +false + case e => e.find { x => +x.isInstanceOf[Not] && e.find { + case In(_, Seq(_: ListQuery)) => true + case _ => false +}.isDefined + }.isDefined +} + + } + + /** + * Returns an expression after removing the OuterReference shell. + */ + def stripOuterReference(e: Expression): Expression = { +e.transform { + case OuterReference(r) => r +} + } --- End diff -- ``` def stripOuterReference(e: Expression): Expression = e.transform { case OuterReference(r) => r } ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/16954#discussion_r103983068 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala --- @@ -200,14 +191,9 @@ trait CheckAnalysis extends PredicateHelper { s"filter expression '${f.condition.sql}' " + s"of type ${f.condition.dataType.simpleString} is not a boolean.") - case f @ Filter(condition, child) => -splitConjunctivePredicates(condition).foreach { - case _: PredicateSubquery | Not(_: PredicateSubquery) => - case e if PredicateSubquery.hasNullAwarePredicateWithinNot(e) => -failAnalysis(s"Null-aware predicate sub-queries cannot be used in nested" + - s" conditions: $e") - case e => -} + case Filter(condition, _) if hasNullAwarePredicateWithinNot(condition) => +failAnalysis(s"Null-aware predicate sub-queries cannot be used in nested" + + " conditions: $condition") --- End diff -- ``` failAnalysis("Null-aware predicate sub-queries cannot be used in nested " + s"conditions: $condition") ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/16954#discussion_r103981569 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -1204,64 +1250,36 @@ class Analyzer( // Category 1: // BroadcastHint, Distinct, LeafNode, Repartition, and SubqueryAlias -case p: BroadcastHint => - p -case p: Distinct => - p -case p: LeafNode => - p -case p: Repartition => - p -case p: SubqueryAlias => - p +case _: BroadcastHint | _: Distinct | _: LeafNode | _: Repartition | _: SubqueryAlias => // Category 2: // These operators can be anywhere in a correlated subquery. // so long as they do not host outer references in the operators. case p: Sort => failOnOuterReference(p) - p case p: RepartitionByExpression => failOnOuterReference(p) - p // Category 3: // Filter is one of the two operators allowed to host correlated expressions. // The other operator is Join. Filter can be anywhere in a correlated subquery. case f @ Filter(cond, child) => // Find all predicates with an outer reference. - val (correlated, local) = splitConjunctivePredicates(cond).partition(containsOuter) + val (correlated, local) = +splitConjunctivePredicates(cond).partition(containsOuter) // Find any non-equality correlated predicates foundNonEqualCorrelatedPred = foundNonEqualCorrelatedPred || correlated.exists { case _: EqualTo | _: EqualNullSafe => false case _ => true } - - // Rewrite the filter without the correlated predicates if any. - correlated match { -case Nil => f -case xs if local.nonEmpty => - val newFilter = Filter(local.reduce(And), child) - predicateMap += newFilter -> xs - newFilter -case xs => - predicateMap += child -> xs - child - } + outerReferences ++= getOuterReferences(correlated) // Project cannot host any correlated expressions // but can be anywhere in a correlated subquery. case p @ Project(expressions, child) => --- End diff -- `case p: Project =>`. The same issue in the following `Aggregate` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/16954#discussion_r103982372 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -1317,62 +1327,24 @@ class Analyzer( // Generator with join=false is treated as Category 4. case p @ Generate(generator, true, _, _, _, _) => --- End diff -- `case g: Generate if g.join =>` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/16954#discussion_r103981872 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -1204,64 +1250,36 @@ class Analyzer( // Category 1: // BroadcastHint, Distinct, LeafNode, Repartition, and SubqueryAlias -case p: BroadcastHint => - p -case p: Distinct => - p -case p: LeafNode => - p -case p: Repartition => - p -case p: SubqueryAlias => - p +case _: BroadcastHint | _: Distinct | _: LeafNode | _: Repartition | _: SubqueryAlias => // Category 2: // These operators can be anywhere in a correlated subquery. // so long as they do not host outer references in the operators. case p: Sort => failOnOuterReference(p) - p case p: RepartitionByExpression => failOnOuterReference(p) - p // Category 3: // Filter is one of the two operators allowed to host correlated expressions. // The other operator is Join. Filter can be anywhere in a correlated subquery. case f @ Filter(cond, child) => --- End diff -- `case f: Filter =>` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/16954#discussion_r103980778 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -1395,42 +1367,45 @@ class Analyzer( } } while (!current.resolved && !current.fastEquals(previous)) - // Step 2: Pull out the predicates if the plan is resolved. + // Step 2: pull the outer references and record them as children of SubqueryExpression --- End diff -- `pull ` -> `Pull`. Also revert back the orignal msg `if the plan is resolved.` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/16954#discussion_r103980267 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -1204,64 +1250,36 @@ class Analyzer( // Category 1: // BroadcastHint, Distinct, LeafNode, Repartition, and SubqueryAlias -case p: BroadcastHint => - p -case p: Distinct => - p -case p: LeafNode => - p -case p: Repartition => - p -case p: SubqueryAlias => - p +case _: BroadcastHint | _: Distinct | _: LeafNode | _: Repartition | _: SubqueryAlias => // Category 2: // These operators can be anywhere in a correlated subquery. // so long as they do not host outer references in the operators. case p: Sort => failOnOuterReference(p) - p case p: RepartitionByExpression => failOnOuterReference(p) - p // Category 3: // Filter is one of the two operators allowed to host correlated expressions. // The other operator is Join. Filter can be anywhere in a correlated subquery. case f @ Filter(cond, child) => // Find all predicates with an outer reference. - val (correlated, local) = splitConjunctivePredicates(cond).partition(containsOuter) + val (correlated, local) = +splitConjunctivePredicates(cond).partition(containsOuter) --- End diff -- `val (correlated, local) = splitConjunctivePredicates(cond).partition(containsOuter)` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/16954#discussion_r103979911 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -21,12 +21,13 @@ import scala.annotation.tailrec import scala.collection.mutable.ArrayBuffer import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.{CatalystConf, ScalaReflection, SimpleCatalystConf, TableIdentifier} +import org.apache.spark.sql.catalyst._ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.encoders.OuterScopes import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.expressions.objects.NewInstance +import org.apache.spark.sql.catalyst.expressions.SubExprUtils._ --- End diff -- Move it here? https://github.com/dilipbiswal/spark/blob/e43ac08c71c6634b4c1e2cb98bd278a7e36846dc/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala#L1148 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/16954#discussion_r103881081 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -1172,20 +1226,11 @@ class Analyzer( } } - /** Determine which correlated predicate references are missing from this plan. */ - def missingReferences(p: LogicalPlan): AttributeSet = { -val localPredicateReferences = p.collect(predicateMap) - .flatten - .map(_.references) - .reduceOption(_ ++ _) - .getOrElse(AttributeSet.empty) -localPredicateReferences -- p.outputSet - } - var foundNonEqualCorrelatedPred : Boolean = false - // Simplify the predicates before pulling them out. - val transformed = BooleanSimplification(sub) transformUp { + // Simplify the predicates before validating any unsupported correlation patterns + // in the plan. + BooleanSimplification(sub).foreachUp { --- End diff -- @gatorsmile Currently the code relies on this. I will need to investigate this and would like to address this in a followup. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/16954#discussion_r103855766 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -1172,20 +1226,11 @@ class Analyzer( } } - /** Determine which correlated predicate references are missing from this plan. */ - def missingReferences(p: LogicalPlan): AttributeSet = { -val localPredicateReferences = p.collect(predicateMap) - .flatten - .map(_.references) - .reduceOption(_ ++ _) - .getOrElse(AttributeSet.empty) -localPredicateReferences -- p.outputSet - } - var foundNonEqualCorrelatedPred : Boolean = false - // Simplify the predicates before pulling them out. - val transformed = BooleanSimplification(sub) transformUp { + // Simplify the predicates before validating any unsupported correlation patterns + // in the plan. + BooleanSimplification(sub).foreachUp { --- End diff -- You should also remove the import of `BooleanSimplification` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/16954#discussion_r103855573 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -1208,63 +1253,39 @@ class Analyzer( // Category 1: // BroadcastHint, Distinct, LeafNode, Repartition, and SubqueryAlias case p: BroadcastHint => - p case p: Distinct => - p case p: LeafNode => - p case p: Repartition => - p case p: SubqueryAlias => - p // Category 2: // These operators can be anywhere in a correlated subquery. // so long as they do not host outer references in the operators. case p: Sort => failOnOuterReference(p) - p case p: RepartitionByExpression => failOnOuterReference(p) - p // Category 3: // Filter is one of the two operators allowed to host correlated expressions. // The other operator is Join. Filter can be anywhere in a correlated subquery. case f @ Filter(cond, child) => // Find all predicates with an outer reference. - val (correlated, local) = splitConjunctivePredicates(cond).partition(containsOuter) + val (correlated, local) = + splitConjunctivePredicates(cond).partition(SubExprUtils.containsOuter) // Find any non-equality correlated predicates foundNonEqualCorrelatedPred = foundNonEqualCorrelatedPred || correlated.exists { case _: EqualTo | _: EqualNullSafe => false case _ => true } - - // Rewrite the filter without the correlated predicates if any. - correlated match { -case Nil => f -case xs if local.nonEmpty => - val newFilter = Filter(local.reduce(And), child) - predicateMap += newFilter -> xs - newFilter -case xs => - predicateMap += child -> xs - child - } + outerReferences ++= SubExprUtils.getOuterReferences(correlated) --- End diff -- After the import, we also can clean up this to `outerReferences ++= getOuterReferences(correlated)` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/16954#discussion_r103855544 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -1208,63 +1253,39 @@ class Analyzer( // Category 1: // BroadcastHint, Distinct, LeafNode, Repartition, and SubqueryAlias case p: BroadcastHint => - p case p: Distinct => - p case p: LeafNode => - p case p: Repartition => - p case p: SubqueryAlias => - p // Category 2: // These operators can be anywhere in a correlated subquery. // so long as they do not host outer references in the operators. case p: Sort => failOnOuterReference(p) - p case p: RepartitionByExpression => failOnOuterReference(p) - p // Category 3: // Filter is one of the two operators allowed to host correlated expressions. // The other operator is Join. Filter can be anywhere in a correlated subquery. case f @ Filter(cond, child) => // Find all predicates with an outer reference. - val (correlated, local) = splitConjunctivePredicates(cond).partition(containsOuter) + val (correlated, local) = + splitConjunctivePredicates(cond).partition(SubExprUtils.containsOuter) --- End diff -- Let us `import org.apache.spark.sql.catalyst.expressions.SubExprUtils._` at the beginning of the rule of `ResolveSubquery` Thus, no change is needed after the import. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/16954#discussion_r103855200 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -1172,20 +1226,11 @@ class Analyzer( } } - /** Determine which correlated predicate references are missing from this plan. */ - def missingReferences(p: LogicalPlan): AttributeSet = { -val localPredicateReferences = p.collect(predicateMap) - .flatten - .map(_.references) - .reduceOption(_ ++ _) - .getOrElse(AttributeSet.empty) -localPredicateReferences -- p.outputSet - } - var foundNonEqualCorrelatedPred : Boolean = false - // Simplify the predicates before pulling them out. - val transformed = BooleanSimplification(sub) transformUp { + // Simplify the predicates before validating any unsupported correlation patterns + // in the plan. + BooleanSimplification(sub).foreachUp { --- End diff -- `BooleanSimplification ` is not needed after the changes of this PR --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/16954#discussion_r103854902 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -1208,63 +1253,39 @@ class Analyzer( // Category 1: // BroadcastHint, Distinct, LeafNode, Repartition, and SubqueryAlias case p: BroadcastHint => - p case p: Distinct => - p case p: LeafNode => - p case p: Repartition => - p case p: SubqueryAlias => --- End diff -- `case _: BroadcastHint | _: Distinct | _: LeafNode | _: Repartition | _: SubqueryAlias =>` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/16954#discussion_r103854544 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala --- @@ -297,8 +284,11 @@ trait CheckAnalysis extends PredicateHelper { s"Correlated scalar sub-queries can only be used in a Filter/Aggregate/Project: $p") } - case p if p.expressions.exists(PredicateSubquery.hasPredicateSubquery) => -failAnalysis(s"Predicate sub-queries can only be used in a Filter: $p") + case p if p.expressions.exists(SubqueryExpression.hasInOrExistsSubquery) => +p match { + case _: Filter => // Ok + case other => failAnalysis(s"Predicate sub-queries can only be used in a Filter: $p") --- End diff -- `case _ => failAnalysis(` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/16954#discussion_r103853851 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala --- @@ -79,57 +251,10 @@ case class ScalarSubquery( object ScalarSubquery { def hasCorrelatedScalarSubquery(e: Expression): Boolean = { e.find { - case e: ScalarSubquery if e.children.nonEmpty => true - case _ => false -}.isDefined - } -} - -/** - * A predicate subquery checks the existence of a value in a sub-query. We currently only allow - * [[PredicateSubquery]] expressions within a Filter plan (i.e. WHERE or a HAVING clause). This will - * be rewritten into a left semi/anti join during analysis. - */ -case class PredicateSubquery( -plan: LogicalPlan, -children: Seq[Expression] = Seq.empty, -nullAware: Boolean = false, -exprId: ExprId = NamedExpression.newExprId) - extends SubqueryExpression with Predicate with Unevaluable { - override lazy val resolved = childrenResolved && plan.resolved - override lazy val references: AttributeSet = super.references -- plan.outputSet - override def nullable: Boolean = nullAware - override def withNewPlan(plan: LogicalPlan): PredicateSubquery = copy(plan = plan) - override def semanticEquals(o: Expression): Boolean = o match { -case p: PredicateSubquery => - plan.sameResult(p.plan) && nullAware == p.nullAware && -children.length == p.children.length && -children.zip(p.children).forall(p => p._1.semanticEquals(p._2)) -case _ => false - } - override def toString: String = s"predicate-subquery#${exprId.id} $conditionString" -} - -object PredicateSubquery { - def hasPredicateSubquery(e: Expression): Boolean = { -e.find { - case _: PredicateSubquery | _: ListQuery | _: Exists => true + case s: ScalarSubquery if s.children.nonEmpty => true --- End diff -- `case s: ScalarSubquery => s.children.nonEmpty` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/16954#discussion_r103853428 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala --- @@ -83,29 +95,150 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] with PredicateHelper { } /** - * Given a predicate expression and an input plan, it rewrites - * any embedded existential sub-query into an existential join. - * It returns the rewritten expression together with the updated plan. - * Currently, it does not support null-aware joins. Embedded NOT IN predicates - * are blocked in the Analyzer. + * Given a predicate expression and an input plan, it rewrites any embedded existential sub-query + * into an existential join. It returns the rewritten expression together with the updated plan. + * Currently, it does not support NOT IN nested inside a NOT expression. This case is blocked in + * the Analyzer. */ private def rewriteExistentialExpr( exprs: Seq[Expression], plan: LogicalPlan): (Option[Expression], LogicalPlan) = { var newPlan = plan val newExprs = exprs.map { e => e transformUp { -case PredicateSubquery(sub, conditions, nullAware, _) => - // TODO: support null-aware join +case Exists(sub, conditions, exprId) => val exists = AttributeReference("exists", BooleanType, nullable = false)() - newPlan = Join(newPlan, sub, ExistenceJoin(exists), conditions.reduceLeftOption(And)) + newPlan = Join(newPlan, sub, +ExistenceJoin(exists), conditions.reduceLeftOption(And)) exists -} +case In(e, Seq(l@ ListQuery(sub, conditions, exprId))) => + val exists = AttributeReference("exists", BooleanType, nullable = false)() + val inConditions = getValueExpression(e).zip(sub.output).map(EqualTo.tupled) + newPlan = Join(newPlan, sub, +ExistenceJoin(exists), (inConditions ++ conditions).reduceLeftOption(And)) + exists + } } (newExprs.reduceOption(And), newPlan) } } + /** + * Pull out all (outer) correlated predicates from a given subquery. This method removes the + * correlated predicates from subquery [[Filter]]s and adds the references of these predicates + * to all intermediate [[Project]] and [[Aggregate]] clauses (if they are missing) in order to + * be able to evaluate the predicates at the top level. + * + * TODO: Look to merge this rule with RewritePredicateSubquery. + */ +object PullupCorrelatedPredicates extends Rule[LogicalPlan] with PredicateHelper { + /** +* Returns the correlated predicates and a updated plan that removes the outer references. +*/ + private def pullOutCorrelatedPredicates( + sub: LogicalPlan, + outer: Seq[LogicalPlan]): (LogicalPlan, Seq[Expression]) = { +val predicateMap = scala.collection.mutable.Map.empty[LogicalPlan, Seq[Expression]] + +/** Determine which correlated predicate references are missing from this plan. */ +def missingReferences(p: LogicalPlan): AttributeSet = { + val localPredicateReferences = p.collect(predicateMap) +.flatten +.map(_.references) +.reduceOption(_ ++ _) +.getOrElse(AttributeSet.empty) + localPredicateReferences -- p.outputSet +} + +// Simplify the predicates before pulling them out. +val transformed = BooleanSimplification(sub) transformUp { + case f @ Filter(cond, child) => +val (correlated, local) = + splitConjunctivePredicates(cond).partition(SubExprUtils.containsOuter) + +// Rewrite the filter without the correlated predicates if any. +correlated match { + case Nil => f + case xs if local.nonEmpty => +val newFilter = Filter(local.reduce(And), child) +predicateMap += newFilter -> xs +newFilter + case xs => +predicateMap += child -> xs +child +} + case p @ Project(expressions, child) => +val referencesToAdd = missingReferences(p) +if (referencesToAdd.nonEmpty) { + Project(expressions ++ referencesToAdd, child) +} else { + p +} + case a @ Aggregate(grouping, expressions, child) => +val referencesToAdd = missingReferences(a) +if (referencesToAdd.nonEmpty) { + Aggregate(grouping ++ referencesToAdd, expressions ++ referencesToAdd, child) +} else { + a +
[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/16954#discussion_r103853389 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala --- @@ -83,29 +95,150 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] with PredicateHelper { } /** - * Given a predicate expression and an input plan, it rewrites - * any embedded existential sub-query into an existential join. - * It returns the rewritten expression together with the updated plan. - * Currently, it does not support null-aware joins. Embedded NOT IN predicates - * are blocked in the Analyzer. + * Given a predicate expression and an input plan, it rewrites any embedded existential sub-query + * into an existential join. It returns the rewritten expression together with the updated plan. + * Currently, it does not support NOT IN nested inside a NOT expression. This case is blocked in + * the Analyzer. */ private def rewriteExistentialExpr( exprs: Seq[Expression], plan: LogicalPlan): (Option[Expression], LogicalPlan) = { var newPlan = plan val newExprs = exprs.map { e => e transformUp { -case PredicateSubquery(sub, conditions, nullAware, _) => - // TODO: support null-aware join +case Exists(sub, conditions, exprId) => val exists = AttributeReference("exists", BooleanType, nullable = false)() - newPlan = Join(newPlan, sub, ExistenceJoin(exists), conditions.reduceLeftOption(And)) + newPlan = Join(newPlan, sub, +ExistenceJoin(exists), conditions.reduceLeftOption(And)) exists -} +case In(e, Seq(l@ ListQuery(sub, conditions, exprId))) => + val exists = AttributeReference("exists", BooleanType, nullable = false)() + val inConditions = getValueExpression(e).zip(sub.output).map(EqualTo.tupled) + newPlan = Join(newPlan, sub, +ExistenceJoin(exists), (inConditions ++ conditions).reduceLeftOption(And)) + exists + } } (newExprs.reduceOption(And), newPlan) } } + /** + * Pull out all (outer) correlated predicates from a given subquery. This method removes the + * correlated predicates from subquery [[Filter]]s and adds the references of these predicates + * to all intermediate [[Project]] and [[Aggregate]] clauses (if they are missing) in order to + * be able to evaluate the predicates at the top level. + * + * TODO: Look to merge this rule with RewritePredicateSubquery. + */ +object PullupCorrelatedPredicates extends Rule[LogicalPlan] with PredicateHelper { + /** +* Returns the correlated predicates and a updated plan that removes the outer references. +*/ + private def pullOutCorrelatedPredicates( + sub: LogicalPlan, + outer: Seq[LogicalPlan]): (LogicalPlan, Seq[Expression]) = { +val predicateMap = scala.collection.mutable.Map.empty[LogicalPlan, Seq[Expression]] + +/** Determine which correlated predicate references are missing from this plan. */ +def missingReferences(p: LogicalPlan): AttributeSet = { + val localPredicateReferences = p.collect(predicateMap) +.flatten +.map(_.references) +.reduceOption(_ ++ _) +.getOrElse(AttributeSet.empty) + localPredicateReferences -- p.outputSet +} + +// Simplify the predicates before pulling them out. +val transformed = BooleanSimplification(sub) transformUp { + case f @ Filter(cond, child) => +val (correlated, local) = + splitConjunctivePredicates(cond).partition(SubExprUtils.containsOuter) + +// Rewrite the filter without the correlated predicates if any. +correlated match { + case Nil => f + case xs if local.nonEmpty => +val newFilter = Filter(local.reduce(And), child) +predicateMap += newFilter -> xs +newFilter + case xs => +predicateMap += child -> xs +child +} + case p @ Project(expressions, child) => +val referencesToAdd = missingReferences(p) +if (referencesToAdd.nonEmpty) { + Project(expressions ++ referencesToAdd, child) +} else { + p +} + case a @ Aggregate(grouping, expressions, child) => +val referencesToAdd = missingReferences(a) +if (referencesToAdd.nonEmpty) { + Aggregate(grouping ++ referencesToAdd, expressions ++ referencesToAdd, child) +} else { + a +
[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/16954#discussion_r103853275 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala --- @@ -83,29 +95,150 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] with PredicateHelper { } /** - * Given a predicate expression and an input plan, it rewrites - * any embedded existential sub-query into an existential join. - * It returns the rewritten expression together with the updated plan. - * Currently, it does not support null-aware joins. Embedded NOT IN predicates - * are blocked in the Analyzer. + * Given a predicate expression and an input plan, it rewrites any embedded existential sub-query + * into an existential join. It returns the rewritten expression together with the updated plan. + * Currently, it does not support NOT IN nested inside a NOT expression. This case is blocked in + * the Analyzer. */ private def rewriteExistentialExpr( exprs: Seq[Expression], plan: LogicalPlan): (Option[Expression], LogicalPlan) = { var newPlan = plan val newExprs = exprs.map { e => e transformUp { -case PredicateSubquery(sub, conditions, nullAware, _) => - // TODO: support null-aware join +case Exists(sub, conditions, exprId) => val exists = AttributeReference("exists", BooleanType, nullable = false)() - newPlan = Join(newPlan, sub, ExistenceJoin(exists), conditions.reduceLeftOption(And)) + newPlan = Join(newPlan, sub, +ExistenceJoin(exists), conditions.reduceLeftOption(And)) exists -} +case In(e, Seq(l@ ListQuery(sub, conditions, exprId))) => + val exists = AttributeReference("exists", BooleanType, nullable = false)() + val inConditions = getValueExpression(e).zip(sub.output).map(EqualTo.tupled) + newPlan = Join(newPlan, sub, +ExistenceJoin(exists), (inConditions ++ conditions).reduceLeftOption(And)) + exists + } } (newExprs.reduceOption(And), newPlan) } } + /** + * Pull out all (outer) correlated predicates from a given subquery. This method removes the + * correlated predicates from subquery [[Filter]]s and adds the references of these predicates + * to all intermediate [[Project]] and [[Aggregate]] clauses (if they are missing) in order to + * be able to evaluate the predicates at the top level. + * + * TODO: Look to merge this rule with RewritePredicateSubquery. + */ +object PullupCorrelatedPredicates extends Rule[LogicalPlan] with PredicateHelper { + /** +* Returns the correlated predicates and a updated plan that removes the outer references. +*/ + private def pullOutCorrelatedPredicates( + sub: LogicalPlan, + outer: Seq[LogicalPlan]): (LogicalPlan, Seq[Expression]) = { +val predicateMap = scala.collection.mutable.Map.empty[LogicalPlan, Seq[Expression]] + +/** Determine which correlated predicate references are missing from this plan. */ +def missingReferences(p: LogicalPlan): AttributeSet = { + val localPredicateReferences = p.collect(predicateMap) +.flatten +.map(_.references) +.reduceOption(_ ++ _) +.getOrElse(AttributeSet.empty) + localPredicateReferences -- p.outputSet +} + +// Simplify the predicates before pulling them out. +val transformed = BooleanSimplification(sub) transformUp { + case f @ Filter(cond, child) => +val (correlated, local) = + splitConjunctivePredicates(cond).partition(SubExprUtils.containsOuter) + +// Rewrite the filter without the correlated predicates if any. +correlated match { + case Nil => f + case xs if local.nonEmpty => +val newFilter = Filter(local.reduce(And), child) +predicateMap += newFilter -> xs +newFilter + case xs => +predicateMap += child -> xs +child +} + case p @ Project(expressions, child) => +val referencesToAdd = missingReferences(p) +if (referencesToAdd.nonEmpty) { + Project(expressions ++ referencesToAdd, child) +} else { + p +} + case a @ Aggregate(grouping, expressions, child) => +val referencesToAdd = missingReferences(a) +if (referencesToAdd.nonEmpty) { + Aggregate(grouping ++ referencesToAdd, expressions ++ referencesToAdd, child) +} else { + a +
[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/16954#discussion_r103853209 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala --- @@ -83,29 +95,150 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] with PredicateHelper { } /** - * Given a predicate expression and an input plan, it rewrites - * any embedded existential sub-query into an existential join. - * It returns the rewritten expression together with the updated plan. - * Currently, it does not support null-aware joins. Embedded NOT IN predicates - * are blocked in the Analyzer. + * Given a predicate expression and an input plan, it rewrites any embedded existential sub-query + * into an existential join. It returns the rewritten expression together with the updated plan. + * Currently, it does not support NOT IN nested inside a NOT expression. This case is blocked in + * the Analyzer. */ private def rewriteExistentialExpr( exprs: Seq[Expression], plan: LogicalPlan): (Option[Expression], LogicalPlan) = { var newPlan = plan val newExprs = exprs.map { e => e transformUp { -case PredicateSubquery(sub, conditions, nullAware, _) => - // TODO: support null-aware join +case Exists(sub, conditions, exprId) => val exists = AttributeReference("exists", BooleanType, nullable = false)() - newPlan = Join(newPlan, sub, ExistenceJoin(exists), conditions.reduceLeftOption(And)) + newPlan = Join(newPlan, sub, +ExistenceJoin(exists), conditions.reduceLeftOption(And)) exists -} +case In(e, Seq(l@ ListQuery(sub, conditions, exprId))) => + val exists = AttributeReference("exists", BooleanType, nullable = false)() + val inConditions = getValueExpression(e).zip(sub.output).map(EqualTo.tupled) + newPlan = Join(newPlan, sub, +ExistenceJoin(exists), (inConditions ++ conditions).reduceLeftOption(And)) + exists + } } (newExprs.reduceOption(And), newPlan) } } + /** + * Pull out all (outer) correlated predicates from a given subquery. This method removes the + * correlated predicates from subquery [[Filter]]s and adds the references of these predicates + * to all intermediate [[Project]] and [[Aggregate]] clauses (if they are missing) in order to + * be able to evaluate the predicates at the top level. + * + * TODO: Look to merge this rule with RewritePredicateSubquery. + */ +object PullupCorrelatedPredicates extends Rule[LogicalPlan] with PredicateHelper { + /** +* Returns the correlated predicates and a updated plan that removes the outer references. +*/ + private def pullOutCorrelatedPredicates( + sub: LogicalPlan, + outer: Seq[LogicalPlan]): (LogicalPlan, Seq[Expression]) = { +val predicateMap = scala.collection.mutable.Map.empty[LogicalPlan, Seq[Expression]] + +/** Determine which correlated predicate references are missing from this plan. */ +def missingReferences(p: LogicalPlan): AttributeSet = { + val localPredicateReferences = p.collect(predicateMap) +.flatten +.map(_.references) +.reduceOption(_ ++ _) +.getOrElse(AttributeSet.empty) + localPredicateReferences -- p.outputSet +} + +// Simplify the predicates before pulling them out. +val transformed = BooleanSimplification(sub) transformUp { + case f @ Filter(cond, child) => +val (correlated, local) = + splitConjunctivePredicates(cond).partition(SubExprUtils.containsOuter) + +// Rewrite the filter without the correlated predicates if any. +correlated match { + case Nil => f + case xs if local.nonEmpty => +val newFilter = Filter(local.reduce(And), child) +predicateMap += newFilter -> xs +newFilter + case xs => +predicateMap += child -> xs +child +} + case p @ Project(expressions, child) => +val referencesToAdd = missingReferences(p) +if (referencesToAdd.nonEmpty) { + Project(expressions ++ referencesToAdd, child) +} else { + p +} + case a @ Aggregate(grouping, expressions, child) => +val referencesToAdd = missingReferences(a) +if (referencesToAdd.nonEmpty) { + Aggregate(grouping ++ referencesToAdd, expressions ++ referencesToAdd, child) +} else { + a +
[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/16954#discussion_r103852913 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala --- @@ -83,29 +95,150 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] with PredicateHelper { } /** - * Given a predicate expression and an input plan, it rewrites - * any embedded existential sub-query into an existential join. - * It returns the rewritten expression together with the updated plan. - * Currently, it does not support null-aware joins. Embedded NOT IN predicates - * are blocked in the Analyzer. + * Given a predicate expression and an input plan, it rewrites any embedded existential sub-query + * into an existential join. It returns the rewritten expression together with the updated plan. + * Currently, it does not support NOT IN nested inside a NOT expression. This case is blocked in + * the Analyzer. */ private def rewriteExistentialExpr( exprs: Seq[Expression], plan: LogicalPlan): (Option[Expression], LogicalPlan) = { var newPlan = plan val newExprs = exprs.map { e => e transformUp { -case PredicateSubquery(sub, conditions, nullAware, _) => - // TODO: support null-aware join +case Exists(sub, conditions, exprId) => val exists = AttributeReference("exists", BooleanType, nullable = false)() - newPlan = Join(newPlan, sub, ExistenceJoin(exists), conditions.reduceLeftOption(And)) + newPlan = Join(newPlan, sub, +ExistenceJoin(exists), conditions.reduceLeftOption(And)) exists -} +case In(e, Seq(l@ ListQuery(sub, conditions, exprId))) => + val exists = AttributeReference("exists", BooleanType, nullable = false)() + val inConditions = getValueExpression(e).zip(sub.output).map(EqualTo.tupled) + newPlan = Join(newPlan, sub, +ExistenceJoin(exists), (inConditions ++ conditions).reduceLeftOption(And)) + exists + } } (newExprs.reduceOption(And), newPlan) } } + /** + * Pull out all (outer) correlated predicates from a given subquery. This method removes the + * correlated predicates from subquery [[Filter]]s and adds the references of these predicates + * to all intermediate [[Project]] and [[Aggregate]] clauses (if they are missing) in order to + * be able to evaluate the predicates at the top level. + * + * TODO: Look to merge this rule with RewritePredicateSubquery. + */ +object PullupCorrelatedPredicates extends Rule[LogicalPlan] with PredicateHelper { + /** +* Returns the correlated predicates and a updated plan that removes the outer references. +*/ + private def pullOutCorrelatedPredicates( + sub: LogicalPlan, + outer: Seq[LogicalPlan]): (LogicalPlan, Seq[Expression]) = { +val predicateMap = scala.collection.mutable.Map.empty[LogicalPlan, Seq[Expression]] + +/** Determine which correlated predicate references are missing from this plan. */ +def missingReferences(p: LogicalPlan): AttributeSet = { + val localPredicateReferences = p.collect(predicateMap) +.flatten +.map(_.references) +.reduceOption(_ ++ _) +.getOrElse(AttributeSet.empty) + localPredicateReferences -- p.outputSet +} + +// Simplify the predicates before pulling them out. +val transformed = BooleanSimplification(sub) transformUp { + case f @ Filter(cond, child) => +val (correlated, local) = + splitConjunctivePredicates(cond).partition(SubExprUtils.containsOuter) + +// Rewrite the filter without the correlated predicates if any. +correlated match { + case Nil => f + case xs if local.nonEmpty => +val newFilter = Filter(local.reduce(And), child) +predicateMap += newFilter -> xs +newFilter + case xs => +predicateMap += child -> xs +child +} + case p @ Project(expressions, child) => +val referencesToAdd = missingReferences(p) +if (referencesToAdd.nonEmpty) { + Project(expressions ++ referencesToAdd, child) +} else { + p +} + case a @ Aggregate(grouping, expressions, child) => +val referencesToAdd = missingReferences(a) +if (referencesToAdd.nonEmpty) { + Aggregate(grouping ++ referencesToAdd, expressions ++ referencesToAdd, child) +} else { + a +
[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/16954#discussion_r103852419 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala --- @@ -83,29 +95,150 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] with PredicateHelper { } /** - * Given a predicate expression and an input plan, it rewrites - * any embedded existential sub-query into an existential join. - * It returns the rewritten expression together with the updated plan. - * Currently, it does not support null-aware joins. Embedded NOT IN predicates - * are blocked in the Analyzer. + * Given a predicate expression and an input plan, it rewrites any embedded existential sub-query + * into an existential join. It returns the rewritten expression together with the updated plan. + * Currently, it does not support NOT IN nested inside a NOT expression. This case is blocked in + * the Analyzer. */ private def rewriteExistentialExpr( exprs: Seq[Expression], plan: LogicalPlan): (Option[Expression], LogicalPlan) = { var newPlan = plan val newExprs = exprs.map { e => e transformUp { -case PredicateSubquery(sub, conditions, nullAware, _) => - // TODO: support null-aware join +case Exists(sub, conditions, exprId) => val exists = AttributeReference("exists", BooleanType, nullable = false)() - newPlan = Join(newPlan, sub, ExistenceJoin(exists), conditions.reduceLeftOption(And)) + newPlan = Join(newPlan, sub, +ExistenceJoin(exists), conditions.reduceLeftOption(And)) exists -} +case In(e, Seq(l@ ListQuery(sub, conditions, exprId))) => + val exists = AttributeReference("exists", BooleanType, nullable = false)() + val inConditions = getValueExpression(e).zip(sub.output).map(EqualTo.tupled) --- End diff -- `getValueExpression(e)` -> `getValueExpression(value)` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/16954#discussion_r103852353 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala --- @@ -83,29 +95,150 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] with PredicateHelper { } /** - * Given a predicate expression and an input plan, it rewrites - * any embedded existential sub-query into an existential join. - * It returns the rewritten expression together with the updated plan. - * Currently, it does not support null-aware joins. Embedded NOT IN predicates - * are blocked in the Analyzer. + * Given a predicate expression and an input plan, it rewrites any embedded existential sub-query + * into an existential join. It returns the rewritten expression together with the updated plan. + * Currently, it does not support NOT IN nested inside a NOT expression. This case is blocked in + * the Analyzer. */ private def rewriteExistentialExpr( exprs: Seq[Expression], plan: LogicalPlan): (Option[Expression], LogicalPlan) = { var newPlan = plan val newExprs = exprs.map { e => e transformUp { -case PredicateSubquery(sub, conditions, nullAware, _) => - // TODO: support null-aware join +case Exists(sub, conditions, exprId) => val exists = AttributeReference("exists", BooleanType, nullable = false)() - newPlan = Join(newPlan, sub, ExistenceJoin(exists), conditions.reduceLeftOption(And)) + newPlan = Join(newPlan, sub, +ExistenceJoin(exists), conditions.reduceLeftOption(And)) exists -} +case In(e, Seq(l@ ListQuery(sub, conditions, exprId))) => --- End diff -- `case In(value, Seq(ListQuery(sub, conditions, _)))` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/16954#discussion_r103852210 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala --- @@ -83,29 +95,150 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] with PredicateHelper { } /** - * Given a predicate expression and an input plan, it rewrites - * any embedded existential sub-query into an existential join. - * It returns the rewritten expression together with the updated plan. - * Currently, it does not support null-aware joins. Embedded NOT IN predicates - * are blocked in the Analyzer. + * Given a predicate expression and an input plan, it rewrites any embedded existential sub-query + * into an existential join. It returns the rewritten expression together with the updated plan. + * Currently, it does not support NOT IN nested inside a NOT expression. This case is blocked in + * the Analyzer. */ private def rewriteExistentialExpr( exprs: Seq[Expression], plan: LogicalPlan): (Option[Expression], LogicalPlan) = { var newPlan = plan val newExprs = exprs.map { e => e transformUp { -case PredicateSubquery(sub, conditions, nullAware, _) => - // TODO: support null-aware join +case Exists(sub, conditions, exprId) => val exists = AttributeReference("exists", BooleanType, nullable = false)() - newPlan = Join(newPlan, sub, ExistenceJoin(exists), conditions.reduceLeftOption(And)) + newPlan = Join(newPlan, sub, +ExistenceJoin(exists), conditions.reduceLeftOption(And)) exists -} +case In(e, Seq(l@ ListQuery(sub, conditions, exprId))) => + val exists = AttributeReference("exists", BooleanType, nullable = false)() + val inConditions = getValueExpression(e).zip(sub.output).map(EqualTo.tupled) + newPlan = Join(newPlan, sub, +ExistenceJoin(exists), (inConditions ++ conditions).reduceLeftOption(And)) --- End diff -- ``` val newConditions = (inConditions ++ conditions).reduceLeftOption(And) newPlan = Join(newPlan, sub, ExistenceJoin(exists), newConditions) ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/16954#discussion_r103852067 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala --- @@ -83,29 +95,150 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] with PredicateHelper { } /** - * Given a predicate expression and an input plan, it rewrites - * any embedded existential sub-query into an existential join. - * It returns the rewritten expression together with the updated plan. - * Currently, it does not support null-aware joins. Embedded NOT IN predicates - * are blocked in the Analyzer. + * Given a predicate expression and an input plan, it rewrites any embedded existential sub-query + * into an existential join. It returns the rewritten expression together with the updated plan. + * Currently, it does not support NOT IN nested inside a NOT expression. This case is blocked in + * the Analyzer. */ private def rewriteExistentialExpr( exprs: Seq[Expression], plan: LogicalPlan): (Option[Expression], LogicalPlan) = { var newPlan = plan val newExprs = exprs.map { e => e transformUp { -case PredicateSubquery(sub, conditions, nullAware, _) => - // TODO: support null-aware join +case Exists(sub, conditions, exprId) => val exists = AttributeReference("exists", BooleanType, nullable = false)() - newPlan = Join(newPlan, sub, ExistenceJoin(exists), conditions.reduceLeftOption(And)) + newPlan = Join(newPlan, sub, --- End diff -- `newPlan = Join(newPlan, sub, ExistenceJoin(exists), conditions.reduceLeftOption(And))` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/16954#discussion_r103852025 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala --- @@ -83,29 +95,150 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] with PredicateHelper { } /** - * Given a predicate expression and an input plan, it rewrites - * any embedded existential sub-query into an existential join. - * It returns the rewritten expression together with the updated plan. - * Currently, it does not support null-aware joins. Embedded NOT IN predicates - * are blocked in the Analyzer. + * Given a predicate expression and an input plan, it rewrites any embedded existential sub-query + * into an existential join. It returns the rewritten expression together with the updated plan. + * Currently, it does not support NOT IN nested inside a NOT expression. This case is blocked in + * the Analyzer. */ private def rewriteExistentialExpr( exprs: Seq[Expression], plan: LogicalPlan): (Option[Expression], LogicalPlan) = { var newPlan = plan val newExprs = exprs.map { e => e transformUp { -case PredicateSubquery(sub, conditions, nullAware, _) => - // TODO: support null-aware join +case Exists(sub, conditions, exprId) => --- End diff -- `case Exists(sub, conditions, _)` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/16954#discussion_r103851862 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala --- @@ -54,20 +61,25 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] with PredicateHelper { // Filter the plan by applying left semi and left anti joins. withSubquery.foldLeft(newFilter) { -case (p, PredicateSubquery(sub, conditions, _, _)) => +case (p, Exists(sub, conditions, _)) => val (joinCond, outerPlan) = rewriteExistentialExpr(conditions, p) Join(outerPlan, sub, LeftSemi, joinCond) -case (p, Not(PredicateSubquery(sub, conditions, false, _))) => +case (p, Not(Exists(sub, conditions, _))) => val (joinCond, outerPlan) = rewriteExistentialExpr(conditions, p) Join(outerPlan, sub, LeftAnti, joinCond) -case (p, Not(PredicateSubquery(sub, conditions, true, _))) => +case (p, In(e, Seq(l @ ListQuery(sub, conditions, _ => + val inConditions = getValueExpression(e).zip(sub.output).map(EqualTo.tupled) + val (joinCond, outerPlan) = rewriteExistentialExpr(inConditions ++ conditions, p) + Join(outerPlan, sub, LeftSemi, joinCond) +case (p, Not(In(e, Seq(l @ ListQuery(sub, conditions, _) => --- End diff -- `case (p, Not(In(e, Seq(ListQuery(sub, conditions, _) =>` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/16954#discussion_r103851814 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala --- @@ -54,20 +61,25 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] with PredicateHelper { // Filter the plan by applying left semi and left anti joins. withSubquery.foldLeft(newFilter) { -case (p, PredicateSubquery(sub, conditions, _, _)) => +case (p, Exists(sub, conditions, _)) => val (joinCond, outerPlan) = rewriteExistentialExpr(conditions, p) Join(outerPlan, sub, LeftSemi, joinCond) -case (p, Not(PredicateSubquery(sub, conditions, false, _))) => +case (p, Not(Exists(sub, conditions, _))) => val (joinCond, outerPlan) = rewriteExistentialExpr(conditions, p) Join(outerPlan, sub, LeftAnti, joinCond) -case (p, Not(PredicateSubquery(sub, conditions, true, _))) => +case (p, In(e, Seq(l @ ListQuery(sub, conditions, _ => --- End diff -- `case (p, In(e, Seq(ListQuery(sub, conditions, _` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/16954#discussion_r103850635 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala --- @@ -365,17 +385,66 @@ object TypeCoercion { } /** - * Convert the value and in list expressions to the common operator type - * by looking at all the argument types and finding the closest one that - * all the arguments can be cast to. When no common operator type is found - * the original expression will be returned and an Analysis Exception will - * be raised at type checking phase. + * Handles type coercion for both IN expression with subquery and IN + * expressions without subquery. + * 1. In the first case, find the common type by comparing the left hand side + *expression types against corresponding right hand side expression derived + *from the subquery expression's plan output. Inject appropriate casts in the + *LHS and RHS side of IN expression. + * + * 2. In the second case, convert the value and in list expressions to the + *common operator type by looking at all the argument types and finding + *the closest one that all the arguments can be cast to. When no common + *operator type is found the original expression will be returned and an + *Analysis Exception will be raised at the type checking phase. */ object InConversion extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan resolveExpressions { // Skip nodes who's children have not been resolved yet. case e if !e.childrenResolved => e + // Handle type casting required between value expression and subquery output + // in IN subquery. + case i @ In(a, Seq(ListQuery(sub, children, exprId))) if !i.resolved => +// lhs is the value expression of IN subquery. +val lhs = a match { + // Multi columns in IN clause is represented as a CreateNamedStruct. + // flatten the named struct to get the list of expressions. + case cns: CreateNamedStruct => cns.valExprs + case expr => Seq(expr) +} + +// rhs is the subquery output. +val rhs = sub.output +require(lhs.length == rhs.length) + +val commonTypes = lhs.zip(rhs).flatMap { case (l, r) => + findCommonTypeForBinaryComparison(l.dataType, r.dataType) +} + +if (commonTypes.length == lhs.length) { + val castedRhs = rhs.zip(commonTypes).map { +case (e, dt) if e.dataType != dt => Alias(Cast(e, dt), e.name)() +case (e, _) => e + } + val castedLhs = lhs.zip(commonTypes).map { +case (e, dt) if e.dataType != dt => Cast(e, dt) +case (e, _) => e + } + + // Before constructing the In expression, wrap the multi values in lhs + // in a CreatedNamedStruct. + val newLhs = a match { +case cns: CreateNamedStruct => + val nameValue = cns.nameExprs.zip(castedLhs).flatMap(pair => Seq(pair._1, pair._2)) --- End diff -- Please use `case (name, value) =>` instead of `pair` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/16954#discussion_r103850482 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala --- @@ -365,17 +385,66 @@ object TypeCoercion { } /** - * Convert the value and in list expressions to the common operator type - * by looking at all the argument types and finding the closest one that - * all the arguments can be cast to. When no common operator type is found - * the original expression will be returned and an Analysis Exception will - * be raised at type checking phase. + * Handles type coercion for both IN expression with subquery and IN + * expressions without subquery. + * 1. In the first case, find the common type by comparing the left hand side + *expression types against corresponding right hand side expression derived + *from the subquery expression's plan output. Inject appropriate casts in the + *LHS and RHS side of IN expression. + * + * 2. In the second case, convert the value and in list expressions to the + *common operator type by looking at all the argument types and finding + *the closest one that all the arguments can be cast to. When no common + *operator type is found the original expression will be returned and an + *Analysis Exception will be raised at the type checking phase. */ object InConversion extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan resolveExpressions { // Skip nodes who's children have not been resolved yet. case e if !e.childrenResolved => e + // Handle type casting required between value expression and subquery output + // in IN subquery. + case i @ In(a, Seq(ListQuery(sub, children, exprId))) if !i.resolved => +// lhs is the value expression of IN subquery. +val lhs = a match { + // Multi columns in IN clause is represented as a CreateNamedStruct. + // flatten the named struct to get the list of expressions. + case cns: CreateNamedStruct => cns.valExprs + case expr => Seq(expr) +} + +// rhs is the subquery output. +val rhs = sub.output +require(lhs.length == rhs.length) + +val commonTypes = lhs.zip(rhs).flatMap { case (l, r) => + findCommonTypeForBinaryComparison(l.dataType, r.dataType) +} + +if (commonTypes.length == lhs.length) { --- End diff -- Please add a comment to explain what this condition means. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/16954#discussion_r103850385 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala --- @@ -365,17 +385,66 @@ object TypeCoercion { } /** - * Convert the value and in list expressions to the common operator type - * by looking at all the argument types and finding the closest one that - * all the arguments can be cast to. When no common operator type is found - * the original expression will be returned and an Analysis Exception will - * be raised at type checking phase. + * Handles type coercion for both IN expression with subquery and IN + * expressions without subquery. + * 1. In the first case, find the common type by comparing the left hand side + *expression types against corresponding right hand side expression derived + *from the subquery expression's plan output. Inject appropriate casts in the + *LHS and RHS side of IN expression. + * + * 2. In the second case, convert the value and in list expressions to the + *common operator type by looking at all the argument types and finding + *the closest one that all the arguments can be cast to. When no common + *operator type is found the original expression will be returned and an + *Analysis Exception will be raised at the type checking phase. */ object InConversion extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan resolveExpressions { // Skip nodes who's children have not been resolved yet. case e if !e.childrenResolved => e + // Handle type casting required between value expression and subquery output + // in IN subquery. + case i @ In(a, Seq(ListQuery(sub, children, exprId))) if !i.resolved => +// lhs is the value expression of IN subquery. --- End diff -- `lhs` -> `LHS`. Please correct all the similar cases in comments. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/16954#discussion_r103850171 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala --- @@ -365,17 +385,66 @@ object TypeCoercion { } /** - * Convert the value and in list expressions to the common operator type - * by looking at all the argument types and finding the closest one that - * all the arguments can be cast to. When no common operator type is found - * the original expression will be returned and an Analysis Exception will - * be raised at type checking phase. + * Handles type coercion for both IN expression with subquery and IN + * expressions without subquery. + * 1. In the first case, find the common type by comparing the left hand side + *expression types against corresponding right hand side expression derived --- End diff -- `left hand side` -> `left hand side (LHS)` `right hand side` -> `right hand side (RHS)` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/16954#discussion_r103849859 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala --- @@ -202,14 +194,9 @@ trait CheckAnalysis extends PredicateHelper { s"filter expression '${f.condition.sql}' " + s"of type ${f.condition.dataType.simpleString} is not a boolean.") - case f @ Filter(condition, child) => -splitConjunctivePredicates(condition).foreach { - case _: PredicateSubquery | Not(_: PredicateSubquery) => - case e if PredicateSubquery.hasNullAwarePredicateWithinNot(e) => -failAnalysis(s"Null-aware predicate sub-queries cannot be used in nested" + - s" conditions: $e") - case e => -} + case Filter(condition, _) if SubExprUtils.hasNullAwarePredicateWithinNot(condition) => +failAnalysis(s"Null-aware predicate sub-queries cannot be used in nested" + + s" conditions: $condition") --- End diff -- Nit: `s"` -> `"` and move the space from the beginning of the second line to the end of the first line. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/16954#discussion_r103600773 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala --- @@ -40,19 +42,179 @@ abstract class PlanExpression[T <: QueryPlan[_]] extends Expression { /** * A base interface for expressions that contain a [[LogicalPlan]]. */ -abstract class SubqueryExpression extends PlanExpression[LogicalPlan] { +abstract class SubqueryExpression( +plan: LogicalPlan, +children: Seq[Expression], +exprId: ExprId) extends PlanExpression[LogicalPlan] { + + override lazy val resolved: Boolean = childrenResolved && plan.resolved + override lazy val references: AttributeSet = +if (plan.resolved) super.references -- plan.outputSet else super.references override def withNewPlan(plan: LogicalPlan): SubqueryExpression + override def semanticEquals(o: Expression): Boolean = o match { +case p: SubqueryExpression => + this.getClass.getName.equals(p.getClass.getName) && plan.sameResult(p.plan) && +children.length == p.children.length && +children.zip(p.children).forall(p => p._1.semanticEquals(p._2)) +case _ => false + } } object SubqueryExpression { + /** + * Returns true when an expression contains an IN or EXISTS subquery and false otherwise. + */ + def hasInOrExistsSubquery(e: Expression): Boolean = { +e.find { + case _: ListQuery | _: Exists => true + case _ => false +}.isDefined + } + + /** + * Returns true when an expression contains a subquery that has outer reference(s). The outer + * reference attributes are kept as children of subquery expression by + * [[org.apache.spark.sql.catalyst.analysis.Analyzer.ResolveSubquery]] + */ def hasCorrelatedSubquery(e: Expression): Boolean = { e.find { - case e: SubqueryExpression if e.children.nonEmpty => true + case s: SubqueryExpression if s.children.nonEmpty => true case _ => false }.isDefined } } +object SubExprUtils extends PredicateHelper { + /** + * Returns true when an expression contains correlated predicates i.e outer references and + * returns false otherwise. + */ + def containsOuter(e: Expression): Boolean = { +e.find(_.isInstanceOf[OuterReference]).isDefined + } + + /** + * Returns whether there are any null-aware predicate subqueries inside Not. If not, we could + * turn the null-aware predicate into not-null-aware predicate. + */ + def hasNullAwarePredicateWithinNot(e: Expression): Boolean = { +e.find{ x => + x.isInstanceOf[Not] && e.find { --- End diff -- @hvanhovell I have refactored the code a bit and i hope, this function is bit clearer to follow. I have moved the code from its caller in checkAnalysis to this function. Please let me know what you think. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...
Github user nsyca commented on a diff in the pull request: https://github.com/apache/spark/pull/16954#discussion_r103461455 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -707,13 +709,85 @@ class Analyzer( } transformUp { case other => other transformExpressions { case a: Attribute => - attributeRewrites.get(a).getOrElse(a).withQualifier(a.qualifier) +dedupAttr(a, attributeRewrites) + case s: SubqueryExpression => +s.withNewPlan(dedupOuterReferencesInSubquery(s.plan, attributeRewrites)) } } newRight } } +private def dedupAttr(attr: Attribute, attrMap: AttributeMap[Attribute]): Attribute = { + attrMap.get(attr).getOrElse(attr).withQualifier(attr.qualifier) +} + +/** + * The outer plan may have been de-duplicated and the function below updates the + * outer references to refer to the de-duplicated attributes. + * + * For example (SQL): + * {{{ + * SELECT * FROM t1 + * WHERE EXISTS (SELECT 1 + * FROM t2 + * WHERE t1.c1 = t2.c1) + * INTERSECT + * SELECT * FROM t1 + * WHERE EXISTS (SELECT 1 + * FROM t2 + * WHERE t1.c1 = t2.c1) + * }}} + * Plan before resolveReference rule. --- End diff -- @dilipbiswal You probably are already on the right track that one of the subquery expression is not required. I guess either `T1 intersect T1 where exists (T2)` or a self-join scenario like `T1, T1 Tx where exists (T2 where .col = T2.col)` with `` be `T1` or `Tx` should also do. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/16954#discussion_r103426702 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -1110,31 +1184,24 @@ class Analyzer( } /** - * Pull out all (outer) correlated predicates from a given subquery. This method removes the - * correlated predicates from subquery [[Filter]]s and adds the references of these predicates - * to all intermediate [[Project]] and [[Aggregate]] clauses (if they are missing) in order to - * be able to evaluate the predicates at the top level. - * - * This method returns the rewritten subquery and correlated predicates. + * Validates to make sure the outer references appearing inside the subquery + * are legal. This function also returns the list of expressions + * that contain outer references. These outer references would be kept as children + * of subquery expressions by the caller of this function. */ -private def pullOutCorrelatedPredicates(sub: LogicalPlan): (LogicalPlan, Seq[Expression]) = { - val predicateMap = scala.collection.mutable.Map.empty[LogicalPlan, Seq[Expression]] +private def checkAndGetOuterReferences(sub: LogicalPlan): Seq[Expression] = { + val outerReferences = scala.collection.mutable.ArrayBuffer.empty[Seq[Expression]] --- End diff -- @hvanhovell Thank you. I will change to use ArrayBuffer[Expression] instead. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/16954#discussion_r103415750 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -1398,42 +1399,46 @@ class Analyzer( } } while (!current.resolved && !current.fastEquals(previous)) - // Step 2: Pull out the predicates if the plan is resolved. + // Step 2: pull the outer references and record them as children of SubqueryExpression if (current.resolved) { // Make sure the resolved query has the required number of output columns. This is only // needed for Scalar and IN subqueries. if (requiredColumns > 0 && requiredColumns != current.output.size) { failAnalysis(s"The number of columns in the subquery (${current.output.size}) " + s"does not match the required number of columns ($requiredColumns)") } -// Pullout predicates and construct a new plan. -f.tupled(rewriteSubQuery(current, plans)) +// Validate the outer reference and record the outer references as children of +// subquery expression. +f.tupled(current, checkAndGetOuterReferences(current)) --- End diff -- @hvanhovell Got it. Thank you !! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/16954#discussion_r103412751 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -1110,31 +1184,24 @@ class Analyzer( } /** - * Pull out all (outer) correlated predicates from a given subquery. This method removes the - * correlated predicates from subquery [[Filter]]s and adds the references of these predicates - * to all intermediate [[Project]] and [[Aggregate]] clauses (if they are missing) in order to - * be able to evaluate the predicates at the top level. - * - * This method returns the rewritten subquery and correlated predicates. + * Validates to make sure the outer references appearing inside the subquery + * are legal. This function also returns the list of expressions + * that contain outer references. These outer references would be kept as children + * of subquery expressions by the caller of this function. */ -private def pullOutCorrelatedPredicates(sub: LogicalPlan): (LogicalPlan, Seq[Expression]) = { - val predicateMap = scala.collection.mutable.Map.empty[LogicalPlan, Seq[Expression]] +private def checkAndGetOuterReferences(sub: LogicalPlan): Seq[Expression] = { --- End diff -- Ok, then leave as it is. I thought it was only doing validation. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/16954#discussion_r103411993 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala --- @@ -40,19 +42,179 @@ abstract class PlanExpression[T <: QueryPlan[_]] extends Expression { /** * A base interface for expressions that contain a [[LogicalPlan]]. */ -abstract class SubqueryExpression extends PlanExpression[LogicalPlan] { +abstract class SubqueryExpression( +plan: LogicalPlan, +children: Seq[Expression], +exprId: ExprId) extends PlanExpression[LogicalPlan] { + + override lazy val resolved: Boolean = childrenResolved && plan.resolved + override lazy val references: AttributeSet = +if (plan.resolved) super.references -- plan.outputSet else super.references override def withNewPlan(plan: LogicalPlan): SubqueryExpression + override def semanticEquals(o: Expression): Boolean = o match { +case p: SubqueryExpression => + this.getClass.getName.equals(p.getClass.getName) && plan.sameResult(p.plan) && +children.length == p.children.length && +children.zip(p.children).forall(p => p._1.semanticEquals(p._2)) +case _ => false + } } object SubqueryExpression { + /** + * Returns true when an expression contains an IN or EXISTS subquery and false otherwise. + */ + def hasInOrExistsSubquery(e: Expression): Boolean = { +e.find { + case _: ListQuery | _: Exists => true + case _ => false +}.isDefined + } + + /** + * Returns true when an expression contains a subquery that has outer reference(s). The outer + * reference attributes are kept as children of subquery expression by + * [[org.apache.spark.sql.catalyst.analysis.Analyzer.ResolveSubquery]] + */ def hasCorrelatedSubquery(e: Expression): Boolean = { e.find { - case e: SubqueryExpression if e.children.nonEmpty => true + case s: SubqueryExpression if s.children.nonEmpty => true case _ => false }.isDefined } } +object SubExprUtils extends PredicateHelper { + /** + * Returns true when an expression contains correlated predicates i.e outer references and + * returns false otherwise. + */ + def containsOuter(e: Expression): Boolean = { +e.find(_.isInstanceOf[OuterReference]).isDefined + } + + /** + * Returns whether there are any null-aware predicate subqueries inside Not. If not, we could + * turn the null-aware predicate into not-null-aware predicate. + */ + def hasNullAwarePredicateWithinNot(e: Expression): Boolean = { +e.find{ x => + x.isInstanceOf[Not] && e.find { --- End diff -- @hvanhovell I actually had debugged this before and had completely forgotten about it. The case you mention actually works ok because of the way we invoke this functions. Please see the caller at - [code](https://github.com/dilipbiswal/spark/blob/33844bb8c6e72c35e12a7ea9f124ad3c6b8bd43c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala#L197-L204) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/16954#discussion_r103406735 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -2512,3 +2522,67 @@ object ResolveCreateNamedStruct extends Rule[LogicalPlan] { CreateNamedStruct(children.toList) } } + +/** + * The aggregate expressions from subquery referencing outer query block are pushed + * down to the outer query block for evaluation. This rule below updates such outer references + * as AttributeReference referring attributes from the parent/outer query block. + * + * For example (SQL): + * {{{ + * SELECT l.a FROM l GROUP BY 1 HAVING EXISTS (SELECT 1 FROM r WHERE r.d < min(l.b)) + * }}} + * Plan before the rule. + *Project [a#226] + *+- Filter exists#245 [min(b#227)#249] + * : +- Project [1 AS 1#247] + * : +- Filter (d#238 < min(outer(b#227))) <- + * :+- SubqueryAlias r + * : +- Project [_1#234 AS c#237, _2#235 AS d#238] + * : +- LocalRelation [_1#234, _2#235] + * +- Aggregate [a#226], [a#226, min(b#227) AS min(b#227)#249] + * +- SubqueryAlias l + * +- Project [_1#223 AS a#226, _2#224 AS b#227] + *+- LocalRelation [_1#223, _2#224] + * Plan after the rule. + *Project [a#226] + *+- Filter exists#245 [min(b#227)#249] + * : +- Project [1 AS 1#247] + * : +- Filter (d#238 < outer(min(b#227)#249)) <- + * :+- SubqueryAlias r + * : +- Project [_1#234 AS c#237, _2#235 AS d#238] + * : +- LocalRelation [_1#234, _2#235] + * +- Aggregate [a#226], [a#226, min(b#227) AS min(b#227)#249] + * +- SubqueryAlias l + * +- Project [_1#223 AS a#226, _2#224 AS b#227] + *+- LocalRelation [_1#223, _2#224] + */ +object UpdateOuterReferences extends Rule[LogicalPlan] { + private def stripAlias(expr: Expression): Expression = expr match { case a: Alias => a.child } + + private def updateOuterReferenceInSubquery( + plan: LogicalPlan, + refExprs: Seq[Expression]): LogicalPlan = { +plan transformAllExpressions { case e => + val outerAlias = + refExprs.find(stripAlias(_).semanticEquals(SubExprUtils.stripOuterReference(e))) + outerAlias match { +case Some(a: Alias) => OuterReference(a.toAttribute) +case _ => e + } +} + } + + def apply(plan: LogicalPlan): LogicalPlan = { +plan transform { + case f @ Filter(_, a: Aggregate) if f.resolved => --- End diff -- Ok, awesome. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/16954#discussion_r103406494 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -1398,42 +1399,46 @@ class Analyzer( } } while (!current.resolved && !current.fastEquals(previous)) - // Step 2: Pull out the predicates if the plan is resolved. + // Step 2: pull the outer references and record them as children of SubqueryExpression if (current.resolved) { // Make sure the resolved query has the required number of output columns. This is only // needed for Scalar and IN subqueries. if (requiredColumns > 0 && requiredColumns != current.output.size) { failAnalysis(s"The number of columns in the subquery (${current.output.size}) " + s"does not match the required number of columns ($requiredColumns)") } -// Pullout predicates and construct a new plan. -f.tupled(rewriteSubQuery(current, plans)) +// Validate the outer reference and record the outer references as children of +// subquery expression. +f.tupled(current, checkAndGetOuterReferences(current)) --- End diff -- I think we can. The reason I used `f.tupled` was because `rewriteSubQuery(..)` returned a tuple. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/16954#discussion_r103398818 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -707,13 +709,85 @@ class Analyzer( } transformUp { case other => other transformExpressions { case a: Attribute => - attributeRewrites.get(a).getOrElse(a).withQualifier(a.qualifier) +dedupAttr(a, attributeRewrites) + case s: SubqueryExpression => +s.withNewPlan(dedupOuterReferencesInSubquery(s.plan, attributeRewrites)) } } newRight } } +private def dedupAttr(attr: Attribute, attrMap: AttributeMap[Attribute]): Attribute = { + attrMap.get(attr).getOrElse(attr).withQualifier(attr.qualifier) +} + +/** + * The outer plan may have been de-duplicated and the function below updates the + * outer references to refer to the de-duplicated attributes. + * + * For example (SQL): + * {{{ + * SELECT * FROM t1 + * WHERE EXISTS (SELECT 1 + * FROM t2 + * WHERE t1.c1 = t2.c1) + * INTERSECT + * SELECT * FROM t1 + * WHERE EXISTS (SELECT 1 + * FROM t2 + * WHERE t1.c1 = t2.c1) + * }}} + * Plan before resolveReference rule. --- End diff -- @hvanhovell I will work on a smaller example. I think we don't need the subquery expression in the left plan. That should cut down the plan size. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/16954#discussion_r103384302 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -1398,42 +1399,46 @@ class Analyzer( } } while (!current.resolved && !current.fastEquals(previous)) - // Step 2: Pull out the predicates if the plan is resolved. + // Step 2: pull the outer references and record them as children of SubqueryExpression if (current.resolved) { // Make sure the resolved query has the required number of output columns. This is only // needed for Scalar and IN subqueries. if (requiredColumns > 0 && requiredColumns != current.output.size) { failAnalysis(s"The number of columns in the subquery (${current.output.size}) " + s"does not match the required number of columns ($requiredColumns)") } -// Pullout predicates and construct a new plan. -f.tupled(rewriteSubQuery(current, plans)) +// Validate the outer reference and record the outer references as children of +// subquery expression. +f.tupled(current, checkAndGetOuterReferences(current)) --- End diff -- @hvanhovell We can remove tupled and just say f(current, ...) ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/16954#discussion_r103363226 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -707,13 +709,85 @@ class Analyzer( } transformUp { case other => other transformExpressions { case a: Attribute => - attributeRewrites.get(a).getOrElse(a).withQualifier(a.qualifier) +dedupAttr(a, attributeRewrites) + case s: SubqueryExpression => +s.withNewPlan(dedupOuterReferencesInSubquery(s.plan, attributeRewrites)) } } newRight } } +private def dedupAttr(attr: Attribute, attrMap: AttributeMap[Attribute]): Attribute = { + attrMap.get(attr).getOrElse(attr).withQualifier(attr.qualifier) +} + +/** + * The outer plan may have been de-duplicated and the function below updates the + * outer references to refer to the de-duplicated attributes. + * + * For example (SQL): + * {{{ + * SELECT * FROM t1 + * WHERE EXISTS (SELECT 1 + * FROM t2 + * WHERE t1.c1 = t2.c1) + * INTERSECT + * SELECT * FROM t1 + * WHERE EXISTS (SELECT 1 + * FROM t2 + * WHERE t1.c1 = t2.c1) + * }}} + * Plan before resolveReference rule. + *'Intersect + *:- 'Project [*] + *: +- Filter exists#271 [c1#250] + *: : +- Project [1 AS 1#295] + *: : +- Filter (outer(c1#250) = c1#263) + *: :+- SubqueryAlias t2 + *: : +- Relation[c1#263,c2#264] parquet + *: +- SubqueryAlias t1 + *:+- Relation[c1#250,c2#251] parquet + *+- 'Project [*] + * +- Filter exists#272 [c1#250] + * : +- Project [1 AS 1#298] + * : +- Filter (outer(c1#250) = c1#263) + * :+- SubqueryAlias t2 + * : +- Relation[c1#263,c2#264] parquet + * +- SubqueryAlias t1 + * +- Relation[c1#250,c2#251] parquet + * Plan after the resolveReference rule. + *Intersect + *:- Project [c1#250, c2#251] + *: +- Filter exists#271 [c1#250] + *: : +- Project [1 AS 1#295] + *: : +- Filter (outer(c1#250) = c1#263) + *: :+- SubqueryAlias t2 + *: : +- Relation[c1#263,c2#264] parquet + *: +- SubqueryAlias t1 + *:+- Relation[c1#250,c2#251] parquet + *+- Project [c1#299, c2#300] + *+- Filter exists#272 [c1#299] --- End diff -- @hvanhovell Thanks.. will change. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/16954#discussion_r103363171 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala --- @@ -40,19 +42,179 @@ abstract class PlanExpression[T <: QueryPlan[_]] extends Expression { /** * A base interface for expressions that contain a [[LogicalPlan]]. */ -abstract class SubqueryExpression extends PlanExpression[LogicalPlan] { +abstract class SubqueryExpression( +plan: LogicalPlan, +children: Seq[Expression], +exprId: ExprId) extends PlanExpression[LogicalPlan] { + + override lazy val resolved: Boolean = childrenResolved && plan.resolved + override lazy val references: AttributeSet = +if (plan.resolved) super.references -- plan.outputSet else super.references override def withNewPlan(plan: LogicalPlan): SubqueryExpression + override def semanticEquals(o: Expression): Boolean = o match { +case p: SubqueryExpression => + this.getClass.getName.equals(p.getClass.getName) && plan.sameResult(p.plan) && +children.length == p.children.length && +children.zip(p.children).forall(p => p._1.semanticEquals(p._2)) +case _ => false + } } object SubqueryExpression { + /** + * Returns true when an expression contains an IN or EXISTS subquery and false otherwise. + */ + def hasInOrExistsSubquery(e: Expression): Boolean = { +e.find { + case _: ListQuery | _: Exists => true + case _ => false +}.isDefined + } + + /** + * Returns true when an expression contains a subquery that has outer reference(s). The outer + * reference attributes are kept as children of subquery expression by + * [[org.apache.spark.sql.catalyst.analysis.Analyzer.ResolveSubquery]] + */ def hasCorrelatedSubquery(e: Expression): Boolean = { e.find { - case e: SubqueryExpression if e.children.nonEmpty => true + case s: SubqueryExpression if s.children.nonEmpty => true case _ => false }.isDefined } } +object SubExprUtils extends PredicateHelper { + /** + * Returns true when an expression contains correlated predicates i.e outer references and + * returns false otherwise. + */ + def containsOuter(e: Expression): Boolean = { +e.find(_.isInstanceOf[OuterReference]).isDefined + } + + /** + * Returns whether there are any null-aware predicate subqueries inside Not. If not, we could + * turn the null-aware predicate into not-null-aware predicate. + */ + def hasNullAwarePredicateWithinNot(e: Expression): Boolean = { +e.find{ x => + x.isInstanceOf[Not] && e.find { --- End diff -- @hvanhovell Actually I copied this function from existing PredicateSubquery.hasNullAwarePredicateWithinNot. I also found it a little odd that we are doing a e.find .. but was afraid to change this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/16954#discussion_r103362319 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala --- @@ -123,19 +123,36 @@ case class Not(child: Expression) */ @ExpressionDescription( usage = "expr1 _FUNC_(expr2, expr3, ...) - Returns true if `expr` equals to any valN.") -case class In(value: Expression, list: Seq[Expression]) extends Predicate -with ImplicitCastInputTypes { +case class In(value: Expression, list: Seq[Expression]) extends Predicate { require(list != null, "list should not be null") - - override def inputTypes: Seq[AbstractDataType] = value.dataType +: list.map(_.dataType) - override def checkInputDataTypes(): TypeCheckResult = { -if (list.exists(l => l.dataType != value.dataType)) { - TypeCheckResult.TypeCheckFailure( -"Arguments must be same type") -} else { - TypeCheckResult.TypeCheckSuccess +list match { + case ListQuery(sub, _, _) :: Nil => --- End diff -- @hvanhovell Actually previously checkInputDataTypes() called from checkAnalysis never had to deal with **In subquery expressions** as it gets rewritten to PredicateSubquery. With the change in this PR we now see the original IN subquery expressions and thus we need to make sure the types are same between LHS and RHS of the IN subquery expression. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/16954#discussion_r103359052 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala --- @@ -109,6 +109,26 @@ object TypeCoercion { } /** + * This function determines the target type of a comparison operator when one operand + * is a String and the other is not. It also handles when one op is a Date and the + * other is a Timestamp by making the target type to be String. Currently this is used + * to coerce types between LHS and RHS of the IN expression. + */ + val findCommonTypeForBinaryComparison: (DataType, DataType) => Option[DataType] = { +case (StringType, DateType) => Some(StringType) +case (DateType, StringType) => Some(StringType) +case (StringType, TimestampType) => Some(StringType) +case (TimestampType, StringType) => Some(StringType) +case (TimestampType, DateType) => Some(StringType) --- End diff -- @hvanhovell Here is where i saw the code that handles the promotion between date and timestamp types. https://github.com/dilipbiswal/spark/blob/33844bb8c6e72c35e12a7ea9f124ad3c6b8bd43c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala#L347:L360 Please let me know if i missed something here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/16954#discussion_r103354299 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala --- @@ -40,19 +42,179 @@ abstract class PlanExpression[T <: QueryPlan[_]] extends Expression { /** * A base interface for expressions that contain a [[LogicalPlan]]. */ -abstract class SubqueryExpression extends PlanExpression[LogicalPlan] { +abstract class SubqueryExpression( +plan: LogicalPlan, +children: Seq[Expression], +exprId: ExprId) extends PlanExpression[LogicalPlan] { + + override lazy val resolved: Boolean = childrenResolved && plan.resolved + override lazy val references: AttributeSet = +if (plan.resolved) super.references -- plan.outputSet else super.references override def withNewPlan(plan: LogicalPlan): SubqueryExpression + override def semanticEquals(o: Expression): Boolean = o match { +case p: SubqueryExpression => + this.getClass.getName.equals(p.getClass.getName) && plan.sameResult(p.plan) && +children.length == p.children.length && +children.zip(p.children).forall(p => p._1.semanticEquals(p._2)) +case _ => false + } } object SubqueryExpression { + /** + * Returns true when an expression contains an IN or EXISTS subquery and false otherwise. + */ + def hasInOrExistsSubquery(e: Expression): Boolean = { +e.find { + case _: ListQuery | _: Exists => true + case _ => false +}.isDefined + } + + /** + * Returns true when an expression contains a subquery that has outer reference(s). The outer + * reference attributes are kept as children of subquery expression by + * [[org.apache.spark.sql.catalyst.analysis.Analyzer.ResolveSubquery]] + */ def hasCorrelatedSubquery(e: Expression): Boolean = { e.find { - case e: SubqueryExpression if e.children.nonEmpty => true + case s: SubqueryExpression if s.children.nonEmpty => true case _ => false }.isDefined } } +object SubExprUtils extends PredicateHelper { + /** + * Returns true when an expression contains correlated predicates i.e outer references and + * returns false otherwise. + */ + def containsOuter(e: Expression): Boolean = { +e.find(_.isInstanceOf[OuterReference]).isDefined + } + + /** + * Returns whether there are any null-aware predicate subqueries inside Not. If not, we could + * turn the null-aware predicate into not-null-aware predicate. + */ + def hasNullAwarePredicateWithinNot(e: Expression): Boolean = { +e.find{ x => + x.isInstanceOf[Not] && e.find { +case In(_, Seq(_: ListQuery)) => true +case _ => false + }.isDefined +}.isDefined + } + + /** + * Returns an expression after removing the OuterReference shell. + */ + def stripOuterReference(e: Expression): Expression = { +e.transform { + case OuterReference(r) => r +} + } + + /** + * Returns the list of expressions after removing the OuterReference shell from each of + * the expression. + */ + def stripOuterReferences(e: Seq[Expression]): Seq[Expression] = e.map(stripOuterReference) + + /** + * Returns the logical plan after removing the OuterReference shell from all the expressions + * of the input logical plan. + */ + def stripOuterReferences(p: LogicalPlan): LogicalPlan = { +p.transformAllExpressions { + case OuterReference(a) => a +} + } + + /** + * Given a list of expressions, returns the expressions which have outer references. Aggregate + * expressions are treated in a special way. If the children of aggregate expression contains an + * outer reference, then the entire aggregate expression is marked as an outer reference. + * Example (SQL): + * {{{ + * SELECT a FROM l GROUP by 1 HAVING EXISTS (SELECT 1 FROM r WHERE d < min(b)) + * }}} + * In the above case, we want to mark the entire min(b) as an outer reference + * OuterReference(min(b)) instead of min(OuterReference(b)). + * TODO: Currently we don't allow deep correlation. Also, we don't allow mixing of + * outer references and local references under an aggregate expression. + * For example (SQL): + * {{{ + * SELECT .. FROM p1 + * WHERE EXISTS (SELECT ... + * FROM p2 + * WHERE EXISTS (SELECT ... + * FROM sq + * WHERE min(p1.a + p2.b) = sq.c)) + * + *
[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/16954#discussion_r103353840 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -2512,3 +2522,67 @@ object ResolveCreateNamedStruct extends Rule[LogicalPlan] { CreateNamedStruct(children.toList) } } + +/** + * The aggregate expressions from subquery referencing outer query block are pushed + * down to the outer query block for evaluation. This rule below updates such outer references + * as AttributeReference referring attributes from the parent/outer query block. + * + * For example (SQL): + * {{{ + * SELECT l.a FROM l GROUP BY 1 HAVING EXISTS (SELECT 1 FROM r WHERE r.d < min(l.b)) + * }}} + * Plan before the rule. + *Project [a#226] + *+- Filter exists#245 [min(b#227)#249] + * : +- Project [1 AS 1#247] + * : +- Filter (d#238 < min(outer(b#227))) <- + * :+- SubqueryAlias r + * : +- Project [_1#234 AS c#237, _2#235 AS d#238] + * : +- LocalRelation [_1#234, _2#235] + * +- Aggregate [a#226], [a#226, min(b#227) AS min(b#227)#249] + * +- SubqueryAlias l + * +- Project [_1#223 AS a#226, _2#224 AS b#227] + *+- LocalRelation [_1#223, _2#224] + * Plan after the rule. + *Project [a#226] + *+- Filter exists#245 [min(b#227)#249] + * : +- Project [1 AS 1#247] + * : +- Filter (d#238 < outer(min(b#227)#249)) <- + * :+- SubqueryAlias r + * : +- Project [_1#234 AS c#237, _2#235 AS d#238] + * : +- LocalRelation [_1#234, _2#235] + * +- Aggregate [a#226], [a#226, min(b#227) AS min(b#227)#249] + * +- SubqueryAlias l + * +- Project [_1#223 AS a#226, _2#224 AS b#227] + *+- LocalRelation [_1#223, _2#224] + */ +object UpdateOuterReferences extends Rule[LogicalPlan] { + private def stripAlias(expr: Expression): Expression = expr match { case a: Alias => a.child } + + private def updateOuterReferenceInSubquery( + plan: LogicalPlan, + refExprs: Seq[Expression]): LogicalPlan = { +plan transformAllExpressions { case e => + val outerAlias = + refExprs.find(stripAlias(_).semanticEquals(SubExprUtils.stripOuterReference(e))) + outerAlias match { +case Some(a: Alias) => OuterReference(a.toAttribute) +case _ => e + } +} + } + + def apply(plan: LogicalPlan): LogicalPlan = { +plan transform { + case f @ Filter(_, a: Aggregate) if f.resolved => --- End diff -- @hvanhovell At the time of executing this rule, the aggregate expressions from the subquery plan are already pushed down to the Aggregate operator through ResolveAggregateFunctions. Here we are just updating the outer references in the subquery plan. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/16954#discussion_r103347540 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -2332,6 +2337,11 @@ class Analyzer( override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveExpressions { case e: TimeZoneAwareExpression if e.timeZoneId.isEmpty => e.withTimeZone(conf.sessionLocalTimeZone) + // Casts could be added in the subquery plan through the rule TypeCoercion while coercing + // the types between the value expression and list query expression of IN expression. + // We need to subject the subquery plan through ResolveTimeZone again to setup timezone + // information for time zone aware expressions. + case e: ListQuery => e.withNewPlan(ResolveTimeZone.apply(e.plan)) --- End diff -- @hvanhovell Thank you.. I will change. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/16954#discussion_r103346762 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -1110,31 +1184,24 @@ class Analyzer( } /** - * Pull out all (outer) correlated predicates from a given subquery. This method removes the - * correlated predicates from subquery [[Filter]]s and adds the references of these predicates - * to all intermediate [[Project]] and [[Aggregate]] clauses (if they are missing) in order to - * be able to evaluate the predicates at the top level. - * - * This method returns the rewritten subquery and correlated predicates. + * Validates to make sure the outer references appearing inside the subquery + * are legal. This function also returns the list of expressions + * that contain outer references. These outer references would be kept as children + * of subquery expressions by the caller of this function. */ -private def pullOutCorrelatedPredicates(sub: LogicalPlan): (LogicalPlan, Seq[Expression]) = { - val predicateMap = scala.collection.mutable.Map.empty[LogicalPlan, Seq[Expression]] +private def checkAndGetOuterReferences(sub: LogicalPlan): Seq[Expression] = { --- End diff -- @hvanhovell The code here validates the correlated references as well as collects them to rewrite the outer plan to record the outer references. You are suggesting to move the "check" portion to checkAnalysis ? I will change to use foreachUp. Thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/16954#discussion_r103340921 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala --- @@ -40,19 +42,179 @@ abstract class PlanExpression[T <: QueryPlan[_]] extends Expression { /** * A base interface for expressions that contain a [[LogicalPlan]]. */ -abstract class SubqueryExpression extends PlanExpression[LogicalPlan] { +abstract class SubqueryExpression( +plan: LogicalPlan, +children: Seq[Expression], +exprId: ExprId) extends PlanExpression[LogicalPlan] { + + override lazy val resolved: Boolean = childrenResolved && plan.resolved + override lazy val references: AttributeSet = +if (plan.resolved) super.references -- plan.outputSet else super.references override def withNewPlan(plan: LogicalPlan): SubqueryExpression + override def semanticEquals(o: Expression): Boolean = o match { +case p: SubqueryExpression => + this.getClass.getName.equals(p.getClass.getName) && plan.sameResult(p.plan) && +children.length == p.children.length && +children.zip(p.children).forall(p => p._1.semanticEquals(p._2)) +case _ => false + } } object SubqueryExpression { + /** + * Returns true when an expression contains an IN or EXISTS subquery and false otherwise. + */ + def hasInOrExistsSubquery(e: Expression): Boolean = { +e.find { + case _: ListQuery | _: Exists => true + case _ => false +}.isDefined + } + + /** + * Returns true when an expression contains a subquery that has outer reference(s). The outer + * reference attributes are kept as children of subquery expression by + * [[org.apache.spark.sql.catalyst.analysis.Analyzer.ResolveSubquery]] + */ def hasCorrelatedSubquery(e: Expression): Boolean = { e.find { - case e: SubqueryExpression if e.children.nonEmpty => true + case s: SubqueryExpression if s.children.nonEmpty => true case _ => false }.isDefined } } +object SubExprUtils extends PredicateHelper { + /** + * Returns true when an expression contains correlated predicates i.e outer references and + * returns false otherwise. + */ + def containsOuter(e: Expression): Boolean = { +e.find(_.isInstanceOf[OuterReference]).isDefined + } + + /** + * Returns whether there are any null-aware predicate subqueries inside Not. If not, we could + * turn the null-aware predicate into not-null-aware predicate. + */ + def hasNullAwarePredicateWithinNot(e: Expression): Boolean = { +e.find{ x => + x.isInstanceOf[Not] && e.find { +case In(_, Seq(_: ListQuery)) => true +case _ => false + }.isDefined +}.isDefined + } + + /** + * Returns an expression after removing the OuterReference shell. + */ + def stripOuterReference(e: Expression): Expression = { +e.transform { + case OuterReference(r) => r +} + } + + /** + * Returns the list of expressions after removing the OuterReference shell from each of + * the expression. + */ + def stripOuterReferences(e: Seq[Expression]): Seq[Expression] = e.map(stripOuterReference) + + /** + * Returns the logical plan after removing the OuterReference shell from all the expressions + * of the input logical plan. + */ + def stripOuterReferences(p: LogicalPlan): LogicalPlan = { +p.transformAllExpressions { + case OuterReference(a) => a +} + } + + /** + * Given a list of expressions, returns the expressions which have outer references. Aggregate + * expressions are treated in a special way. If the children of aggregate expression contains an + * outer reference, then the entire aggregate expression is marked as an outer reference. + * Example (SQL): + * {{{ + * SELECT a FROM l GROUP by 1 HAVING EXISTS (SELECT 1 FROM r WHERE d < min(b)) + * }}} + * In the above case, we want to mark the entire min(b) as an outer reference + * OuterReference(min(b)) instead of min(OuterReference(b)). + * TODO: Currently we don't allow deep correlation. Also, we don't allow mixing of + * outer references and local references under an aggregate expression. + * For example (SQL): + * {{{ + * SELECT .. FROM p1 + * WHERE EXISTS (SELECT ... + * FROM p2 + * WHERE EXISTS (SELECT ... + * FROM sq + * WHERE min(p1.a + p2.b) = sq.c)) + * + *
[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/16954#discussion_r103337724 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -2512,3 +2522,67 @@ object ResolveCreateNamedStruct extends Rule[LogicalPlan] { CreateNamedStruct(children.toList) } } + +/** + * The aggregate expressions from subquery referencing outer query block are pushed + * down to the outer query block for evaluation. This rule below updates such outer references + * as AttributeReference referring attributes from the parent/outer query block. + * + * For example (SQL): + * {{{ + * SELECT l.a FROM l GROUP BY 1 HAVING EXISTS (SELECT 1 FROM r WHERE r.d < min(l.b)) + * }}} + * Plan before the rule. + *Project [a#226] + *+- Filter exists#245 [min(b#227)#249] + * : +- Project [1 AS 1#247] + * : +- Filter (d#238 < min(outer(b#227))) <- + * :+- SubqueryAlias r + * : +- Project [_1#234 AS c#237, _2#235 AS d#238] + * : +- LocalRelation [_1#234, _2#235] + * +- Aggregate [a#226], [a#226, min(b#227) AS min(b#227)#249] + * +- SubqueryAlias l + * +- Project [_1#223 AS a#226, _2#224 AS b#227] + *+- LocalRelation [_1#223, _2#224] + * Plan after the rule. + *Project [a#226] + *+- Filter exists#245 [min(b#227)#249] + * : +- Project [1 AS 1#247] + * : +- Filter (d#238 < outer(min(b#227)#249)) <- + * :+- SubqueryAlias r + * : +- Project [_1#234 AS c#237, _2#235 AS d#238] + * : +- LocalRelation [_1#234, _2#235] + * +- Aggregate [a#226], [a#226, min(b#227) AS min(b#227)#249] + * +- SubqueryAlias l + * +- Project [_1#223 AS a#226, _2#224 AS b#227] + *+- LocalRelation [_1#223, _2#224] + */ +object UpdateOuterReferences extends Rule[LogicalPlan] { + private def stripAlias(expr: Expression): Expression = expr match { case a: Alias => a.child } + + private def updateOuterReferenceInSubquery( + plan: LogicalPlan, + refExprs: Seq[Expression]): LogicalPlan = { +plan transformAllExpressions { case e => + val outerAlias = + refExprs.find(stripAlias(_).semanticEquals(SubExprUtils.stripOuterReference(e))) + outerAlias match { +case Some(a: Alias) => OuterReference(a.toAttribute) +case _ => e + } +} + } + + def apply(plan: LogicalPlan): LogicalPlan = { +plan transform { + case f @ Filter(_, a: Aggregate) if f.resolved => --- End diff -- This only works with aggregates that are already in the `Aggregate` operator, this seems like a regression. What does Hive do? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/16954#discussion_r102165726 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -707,13 +709,85 @@ class Analyzer( } transformUp { case other => other transformExpressions { case a: Attribute => - attributeRewrites.get(a).getOrElse(a).withQualifier(a.qualifier) +dedupAttr(a, attributeRewrites) + case s: SubqueryExpression => +s.withNewPlan(dedupOuterReferencesInSubquery(s.plan, attributeRewrites)) } } newRight } } +private def dedupAttr(attr: Attribute, attrMap: AttributeMap[Attribute]): Attribute = { + attrMap.get(attr).getOrElse(attr).withQualifier(attr.qualifier) +} + +/** + * The outer plan may have been de-duplicated and the function below updates the + * outer references to refer to the de-duplicated attributes. + * + * For example (SQL): + * {{{ + * SELECT * FROM t1 + * WHERE EXISTS (SELECT 1 + * FROM t2 + * WHERE t1.c1 = t2.c1) + * INTERSECT + * SELECT * FROM t1 + * WHERE EXISTS (SELECT 1 + * FROM t2 + * WHERE t1.c1 = t2.c1) + * }}} + * Plan before resolveReference rule. + *'Intersect + *:- 'Project [*] + *: +- Filter exists#271 [c1#250] + *: : +- Project [1 AS 1#295] + *: : +- Filter (outer(c1#250) = c1#263) + *: :+- SubqueryAlias t2 + *: : +- Relation[c1#263,c2#264] parquet + *: +- SubqueryAlias t1 + *:+- Relation[c1#250,c2#251] parquet + *+- 'Project [*] + * +- Filter exists#272 [c1#250] + * : +- Project [1 AS 1#298] + * : +- Filter (outer(c1#250) = c1#263) + * :+- SubqueryAlias t2 + * : +- Relation[c1#263,c2#264] parquet + * +- SubqueryAlias t1 + * +- Relation[c1#250,c2#251] parquet + * Plan after the resolveReference rule. + *Intersect + *:- Project [c1#250, c2#251] + *: +- Filter exists#271 [c1#250] + *: : +- Project [1 AS 1#295] + *: : +- Filter (outer(c1#250) = c1#263) + *: :+- SubqueryAlias t2 + *: : +- Relation[c1#263,c2#264] parquet + *: +- SubqueryAlias t1 + *:+- Relation[c1#250,c2#251] parquet + *+- Project [c1#299, c2#300] + *+- Filter exists#272 [c1#299] --- End diff -- Nit: spacing is of here --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/16954#discussion_r102256790 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -2332,6 +2337,11 @@ class Analyzer( override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveExpressions { case e: TimeZoneAwareExpression if e.timeZoneId.isEmpty => e.withTimeZone(conf.sessionLocalTimeZone) + // Casts could be added in the subquery plan through the rule TypeCoercion while coercing + // the types between the value expression and list query expression of IN expression. + // We need to subject the subquery plan through ResolveTimeZone again to setup timezone + // information for time zone aware expressions. + case e: ListQuery => e.withNewPlan(ResolveTimeZone.apply(e.plan)) --- End diff -- Nit: just use `apply`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/16954#discussion_r103340692 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala --- @@ -40,19 +42,179 @@ abstract class PlanExpression[T <: QueryPlan[_]] extends Expression { /** * A base interface for expressions that contain a [[LogicalPlan]]. */ -abstract class SubqueryExpression extends PlanExpression[LogicalPlan] { +abstract class SubqueryExpression( +plan: LogicalPlan, +children: Seq[Expression], +exprId: ExprId) extends PlanExpression[LogicalPlan] { + + override lazy val resolved: Boolean = childrenResolved && plan.resolved + override lazy val references: AttributeSet = +if (plan.resolved) super.references -- plan.outputSet else super.references override def withNewPlan(plan: LogicalPlan): SubqueryExpression + override def semanticEquals(o: Expression): Boolean = o match { +case p: SubqueryExpression => + this.getClass.getName.equals(p.getClass.getName) && plan.sameResult(p.plan) && +children.length == p.children.length && +children.zip(p.children).forall(p => p._1.semanticEquals(p._2)) +case _ => false + } } object SubqueryExpression { + /** + * Returns true when an expression contains an IN or EXISTS subquery and false otherwise. + */ + def hasInOrExistsSubquery(e: Expression): Boolean = { +e.find { + case _: ListQuery | _: Exists => true + case _ => false +}.isDefined + } + + /** + * Returns true when an expression contains a subquery that has outer reference(s). The outer + * reference attributes are kept as children of subquery expression by + * [[org.apache.spark.sql.catalyst.analysis.Analyzer.ResolveSubquery]] + */ def hasCorrelatedSubquery(e: Expression): Boolean = { e.find { - case e: SubqueryExpression if e.children.nonEmpty => true + case s: SubqueryExpression if s.children.nonEmpty => true case _ => false }.isDefined } } +object SubExprUtils extends PredicateHelper { + /** + * Returns true when an expression contains correlated predicates i.e outer references and + * returns false otherwise. + */ + def containsOuter(e: Expression): Boolean = { +e.find(_.isInstanceOf[OuterReference]).isDefined + } + + /** + * Returns whether there are any null-aware predicate subqueries inside Not. If not, we could + * turn the null-aware predicate into not-null-aware predicate. + */ + def hasNullAwarePredicateWithinNot(e: Expression): Boolean = { +e.find{ x => + x.isInstanceOf[Not] && e.find { --- End diff -- I am pretty sure this is wrong. This will work on the following expression: `NOT(a) AND b IN(SELECT q FROM t)` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/16954#discussion_r103340272 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala --- @@ -123,19 +123,36 @@ case class Not(child: Expression) */ @ExpressionDescription( usage = "expr1 _FUNC_(expr2, expr3, ...) - Returns true if `expr` equals to any valN.") -case class In(value: Expression, list: Seq[Expression]) extends Predicate -with ImplicitCastInputTypes { +case class In(value: Expression, list: Seq[Expression]) extends Predicate { require(list != null, "list should not be null") - - override def inputTypes: Seq[AbstractDataType] = value.dataType +: list.map(_.dataType) - override def checkInputDataTypes(): TypeCheckResult = { -if (list.exists(l => l.dataType != value.dataType)) { - TypeCheckResult.TypeCheckFailure( -"Arguments must be same type") -} else { - TypeCheckResult.TypeCheckSuccess +list match { + case ListQuery(sub, _, _) :: Nil => --- End diff -- IIUC this is all done to get a better error message right? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/16954#discussion_r103339031 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala --- @@ -109,6 +109,26 @@ object TypeCoercion { } /** + * This function determines the target type of a comparison operator when one operand + * is a String and the other is not. It also handles when one op is a Date and the + * other is a Timestamp by making the target type to be String. Currently this is used + * to coerce types between LHS and RHS of the IN expression. + */ + val findCommonTypeForBinaryComparison: (DataType, DataType) => Option[DataType] = { +case (StringType, DateType) => Some(StringType) +case (DateType, StringType) => Some(StringType) +case (StringType, TimestampType) => Some(StringType) +case (TimestampType, StringType) => Some(StringType) +case (TimestampType, DateType) => Some(StringType) --- End diff -- This seems weird. Is this also the current behavior? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/16954#discussion_r102167746 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -1110,31 +1184,24 @@ class Analyzer( } /** - * Pull out all (outer) correlated predicates from a given subquery. This method removes the - * correlated predicates from subquery [[Filter]]s and adds the references of these predicates - * to all intermediate [[Project]] and [[Aggregate]] clauses (if they are missing) in order to - * be able to evaluate the predicates at the top level. - * - * This method returns the rewritten subquery and correlated predicates. + * Validates to make sure the outer references appearing inside the subquery + * are legal. This function also returns the list of expressions + * that contain outer references. These outer references would be kept as children + * of subquery expressions by the caller of this function. */ -private def pullOutCorrelatedPredicates(sub: LogicalPlan): (LogicalPlan, Seq[Expression]) = { - val predicateMap = scala.collection.mutable.Map.empty[LogicalPlan, Seq[Expression]] +private def checkAndGetOuterReferences(sub: LogicalPlan): Seq[Expression] = { --- End diff -- Also use `foreachUp` instead of `transformUp` for the tree traversal in this method. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/16954#discussion_r103336411 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -1398,42 +1399,46 @@ class Analyzer( } } while (!current.resolved && !current.fastEquals(previous)) - // Step 2: Pull out the predicates if the plan is resolved. + // Step 2: pull the outer references and record them as children of SubqueryExpression if (current.resolved) { // Make sure the resolved query has the required number of output columns. This is only // needed for Scalar and IN subqueries. if (requiredColumns > 0 && requiredColumns != current.output.size) { failAnalysis(s"The number of columns in the subquery (${current.output.size}) " + s"does not match the required number of columns ($requiredColumns)") } -// Pullout predicates and construct a new plan. -f.tupled(rewriteSubQuery(current, plans)) +// Validate the outer reference and record the outer references as children of +// subquery expression. +f.tupled(current, checkAndGetOuterReferences(current)) --- End diff -- NIT: Do we still need `f.tupled`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/16954#discussion_r102168672 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -1110,31 +1184,24 @@ class Analyzer( } /** - * Pull out all (outer) correlated predicates from a given subquery. This method removes the - * correlated predicates from subquery [[Filter]]s and adds the references of these predicates - * to all intermediate [[Project]] and [[Aggregate]] clauses (if they are missing) in order to - * be able to evaluate the predicates at the top level. - * - * This method returns the rewritten subquery and correlated predicates. + * Validates to make sure the outer references appearing inside the subquery + * are legal. This function also returns the list of expressions + * that contain outer references. These outer references would be kept as children + * of subquery expressions by the caller of this function. */ -private def pullOutCorrelatedPredicates(sub: LogicalPlan): (LogicalPlan, Seq[Expression]) = { - val predicateMap = scala.collection.mutable.Map.empty[LogicalPlan, Seq[Expression]] +private def checkAndGetOuterReferences(sub: LogicalPlan): Seq[Expression] = { + val outerReferences = scala.collection.mutable.ArrayBuffer.empty[Seq[Expression]] --- End diff -- Nit: `ArrayBuffer.empty[Seq[Expression]]` should also work. Why is it useful to collect sequences of expressions instead of just expressions? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/16954#discussion_r102168299 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -707,13 +709,85 @@ class Analyzer( } transformUp { case other => other transformExpressions { case a: Attribute => - attributeRewrites.get(a).getOrElse(a).withQualifier(a.qualifier) +dedupAttr(a, attributeRewrites) + case s: SubqueryExpression => +s.withNewPlan(dedupOuterReferencesInSubquery(s.plan, attributeRewrites)) } } newRight } } +private def dedupAttr(attr: Attribute, attrMap: AttributeMap[Attribute]): Attribute = { + attrMap.get(attr).getOrElse(attr).withQualifier(attr.qualifier) +} + +/** + * The outer plan may have been de-duplicated and the function below updates the + * outer references to refer to the de-duplicated attributes. + * + * For example (SQL): + * {{{ + * SELECT * FROM t1 + * WHERE EXISTS (SELECT 1 + * FROM t2 + * WHERE t1.c1 = t2.c1) + * INTERSECT + * SELECT * FROM t1 + * WHERE EXISTS (SELECT 1 + * FROM t2 + * WHERE t1.c1 = t2.c1) + * }}} + * Plan before resolveReference rule. --- End diff -- So I am all for decent documentation, but do you think we can find a somewhat smaller example? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/16954#discussion_r102167233 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -1110,31 +1184,24 @@ class Analyzer( } /** - * Pull out all (outer) correlated predicates from a given subquery. This method removes the - * correlated predicates from subquery [[Filter]]s and adds the references of these predicates - * to all intermediate [[Project]] and [[Aggregate]] clauses (if they are missing) in order to - * be able to evaluate the predicates at the top level. - * - * This method returns the rewritten subquery and correlated predicates. + * Validates to make sure the outer references appearing inside the subquery + * are legal. This function also returns the list of expressions + * that contain outer references. These outer references would be kept as children + * of subquery expressions by the caller of this function. */ -private def pullOutCorrelatedPredicates(sub: LogicalPlan): (LogicalPlan, Seq[Expression]) = { - val predicateMap = scala.collection.mutable.Map.empty[LogicalPlan, Seq[Expression]] +private def checkAndGetOuterReferences(sub: LogicalPlan): Seq[Expression] = { --- End diff -- Should we move this into `CheckAnalysis`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/16954#discussion_r102168200 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -1110,31 +1184,24 @@ class Analyzer( } /** - * Pull out all (outer) correlated predicates from a given subquery. This method removes the - * correlated predicates from subquery [[Filter]]s and adds the references of these predicates - * to all intermediate [[Project]] and [[Aggregate]] clauses (if they are missing) in order to - * be able to evaluate the predicates at the top level. - * - * This method returns the rewritten subquery and correlated predicates. + * Validates to make sure the outer references appearing inside the subquery + * are legal. This function also returns the list of expressions + * that contain outer references. These outer references would be kept as children + * of subquery expressions by the caller of this function. */ -private def pullOutCorrelatedPredicates(sub: LogicalPlan): (LogicalPlan, Seq[Expression]) = { - val predicateMap = scala.collection.mutable.Map.empty[LogicalPlan, Seq[Expression]] +private def checkAndGetOuterReferences(sub: LogicalPlan): Seq[Expression] = { + val outerReferences = scala.collection.mutable.ArrayBuffer.empty[Seq[Expression]] // Make sure a plan's subtree does not contain outer references def failOnOuterReferenceInSubTree(p: LogicalPlan): Unit = { -if (p.collectFirst(predicateMap).nonEmpty) { +if (p.find(SubExprUtils.getOuterReferences(_).nonEmpty).nonEmpty) { --- End diff -- This might be somewhat expensive. We could also use something that terminates early. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/16954#discussion_r101618763 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala --- @@ -158,17 +160,7 @@ trait CheckAnalysis extends PredicateHelper { // For projects, do the necessary mapping and skip to its child. def cleanQuery(p: LogicalPlan): LogicalPlan = p match { case s: SubqueryAlias => cleanQuery(s.child) -case p: Project => - // SPARK-18814: Map any aliases to their AttributeReference children - // for the checking in the Aggregate operators below this Project. - subqueryColumns = subqueryColumns.map { -xs => p.projectList.collectFirst { - case e @ Alias(child : AttributeReference, _) if e.exprId == xs.exprId => -child -}.getOrElse(xs) - } - - cleanQuery(p.child) +case p: Project => cleanQuery(p.child) --- End diff -- Now the de-duplication of sub plan happens in optimizer when we actually pull up the correlated predicates. Thus the project case is simplified. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...
Github user nsyca commented on a diff in the pull request: https://github.com/apache/spark/pull/16954#discussion_r101594561 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala --- @@ -40,19 +42,179 @@ abstract class PlanExpression[T <: QueryPlan[_]] extends Expression { /** * A base interface for expressions that contain a [[LogicalPlan]]. */ -abstract class SubqueryExpression extends PlanExpression[LogicalPlan] { +abstract class SubqueryExpression( +plan: LogicalPlan, +children: Seq[Expression], +exprId: ExprId) extends PlanExpression[LogicalPlan] { + + override lazy val resolved: Boolean = childrenResolved && plan.resolved + override lazy val references: AttributeSet = +if (plan.resolved) super.references -- plan.outputSet else super.references override def withNewPlan(plan: LogicalPlan): SubqueryExpression + override def semanticEquals(o: Expression): Boolean = o match { +case p: SubqueryExpression => + this.getClass.getName.equals(p.getClass.getName) && plan.sameResult(p.plan) && +children.length == p.children.length && +children.zip(p.children).forall(p => p._1.semanticEquals(p._2)) +case _ => false + } } object SubqueryExpression { + /** + * Returns true when an expression contains an IN or EXISTS subquery and false otherwise. + */ + def hasInOrExistsSubquery(e: Expression): Boolean = { +e.find { + case _: ListQuery | _: Exists => true + case _ => false +}.isDefined + } + + /** + * Returns true when an expression contains a subquery that has outer reference(s). The outer + * reference attributes are kept as children of subquery expression by + * [[org.apache.spark.sql.catalyst.analysis.Analyzer.ResolveSubquery]] + */ def hasCorrelatedSubquery(e: Expression): Boolean = { e.find { - case e: SubqueryExpression if e.children.nonEmpty => true + case s: SubqueryExpression if s.children.nonEmpty => true case _ => false }.isDefined } } +object SubExprUtils extends PredicateHelper { + /** + * Returns true when an expression contains correlated predicates i.e outer references and + * returns false otherwise. + */ + def containsOuter(e: Expression): Boolean = { +e.find(_.isInstanceOf[OuterReference]).isDefined + } + + /** + * Returns whether there are any null-aware predicate subqueries inside Not. If not, we could + * turn the null-aware predicate into not-null-aware predicate. + */ + def hasNullAwarePredicateWithinNot(e: Expression): Boolean = { +e.find{ x => + x.isInstanceOf[Not] && e.find { +case In(_, Seq(_: ListQuery)) => true +case _ => false + }.isDefined +}.isDefined + } + + /** + * Returns an expression after removing the OuterReference shell. + */ --- End diff -- The following three definitions of `stripOuterReference(s)` is taken from a segment of code embedded in `ResolveSubquery.rewriteSubQuery` (`Analyzer.scala`). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...
Github user nsyca commented on a diff in the pull request: https://github.com/apache/spark/pull/16954#discussion_r101596895 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala --- @@ -83,29 +95,150 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] with PredicateHelper { } /** - * Given a predicate expression and an input plan, it rewrites - * any embedded existential sub-query into an existential join. - * It returns the rewritten expression together with the updated plan. - * Currently, it does not support null-aware joins. Embedded NOT IN predicates - * are blocked in the Analyzer. + * Given a predicate expression and an input plan, it rewrites any embedded existential sub-query + * into an existential join. It returns the rewritten expression together with the updated plan. + * Currently, it does not support NOT IN nested inside a NOT expression. This case is blocked in + * the Analyzer. */ private def rewriteExistentialExpr( exprs: Seq[Expression], plan: LogicalPlan): (Option[Expression], LogicalPlan) = { var newPlan = plan val newExprs = exprs.map { e => e transformUp { -case PredicateSubquery(sub, conditions, nullAware, _) => - // TODO: support null-aware join +case Exists(sub, conditions, exprId) => val exists = AttributeReference("exists", BooleanType, nullable = false)() - newPlan = Join(newPlan, sub, ExistenceJoin(exists), conditions.reduceLeftOption(And)) + newPlan = Join(newPlan, sub, +ExistenceJoin(exists), conditions.reduceLeftOption(And)) exists -} +case In(e, Seq(l@ ListQuery(sub, conditions, exprId))) => + val exists = AttributeReference("exists", BooleanType, nullable = false)() + val inConditions = getValueExpression(e).zip(sub.output).map(EqualTo.tupled) + newPlan = Join(newPlan, sub, +ExistenceJoin(exists), (inConditions ++ conditions).reduceLeftOption(And)) + exists + } } (newExprs.reduceOption(And), newPlan) } } + /** + * Pull out all (outer) correlated predicates from a given subquery. This method removes the + * correlated predicates from subquery [[Filter]]s and adds the references of these predicates + * to all intermediate [[Project]] and [[Aggregate]] clauses (if they are missing) in order to + * be able to evaluate the predicates at the top level. + * + * TODO: Look to merge this rule with RewritePredicateSubquery. + */ +object PullupCorrelatedPredicates extends Rule[LogicalPlan] with PredicateHelper { + /** +* Returns the correlated predicates and a updated plan that removes the outer references. +*/ + private def pullOutCorrelatedPredicates( + sub: LogicalPlan, + outer: Seq[LogicalPlan]): (LogicalPlan, Seq[Expression]) = { +val predicateMap = scala.collection.mutable.Map.empty[LogicalPlan, Seq[Expression]] + +/** Determine which correlated predicate references are missing from this plan. */ --- End diff -- The following definition is moved unchanged from a nested definition of the same name in `ResolveSubquery.pullOutCorrelatedPredicates` (`Analyzer.scala`) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...
Github user nsyca commented on a diff in the pull request: https://github.com/apache/spark/pull/16954#discussion_r101590145 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala --- @@ -40,19 +42,179 @@ abstract class PlanExpression[T <: QueryPlan[_]] extends Expression { /** * A base interface for expressions that contain a [[LogicalPlan]]. */ -abstract class SubqueryExpression extends PlanExpression[LogicalPlan] { +abstract class SubqueryExpression( +plan: LogicalPlan, +children: Seq[Expression], +exprId: ExprId) extends PlanExpression[LogicalPlan] { + + override lazy val resolved: Boolean = childrenResolved && plan.resolved + override lazy val references: AttributeSet = +if (plan.resolved) super.references -- plan.outputSet else super.references override def withNewPlan(plan: LogicalPlan): SubqueryExpression + override def semanticEquals(o: Expression): Boolean = o match { +case p: SubqueryExpression => + this.getClass.getName.equals(p.getClass.getName) && plan.sameResult(p.plan) && +children.length == p.children.length && +children.zip(p.children).forall(p => p._1.semanticEquals(p._2)) +case _ => false + } } object SubqueryExpression { + /** + * Returns true when an expression contains an IN or EXISTS subquery and false otherwise. + */ --- End diff -- The following definition is moved from `PredicateSubquery.hasPredicateSubquery` with the removing of the PredicateSubquery in the first `case` pattern. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...
Github user nsyca commented on a diff in the pull request: https://github.com/apache/spark/pull/16954#discussion_r101590658 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala --- @@ -40,19 +42,179 @@ abstract class PlanExpression[T <: QueryPlan[_]] extends Expression { /** * A base interface for expressions that contain a [[LogicalPlan]]. */ -abstract class SubqueryExpression extends PlanExpression[LogicalPlan] { +abstract class SubqueryExpression( +plan: LogicalPlan, +children: Seq[Expression], +exprId: ExprId) extends PlanExpression[LogicalPlan] { + + override lazy val resolved: Boolean = childrenResolved && plan.resolved + override lazy val references: AttributeSet = +if (plan.resolved) super.references -- plan.outputSet else super.references override def withNewPlan(plan: LogicalPlan): SubqueryExpression + override def semanticEquals(o: Expression): Boolean = o match { +case p: SubqueryExpression => + this.getClass.getName.equals(p.getClass.getName) && plan.sameResult(p.plan) && +children.length == p.children.length && +children.zip(p.children).forall(p => p._1.semanticEquals(p._2)) +case _ => false + } } object SubqueryExpression { + /** + * Returns true when an expression contains an IN or EXISTS subquery and false otherwise. + */ + def hasInOrExistsSubquery(e: Expression): Boolean = { +e.find { + case _: ListQuery | _: Exists => true + case _ => false +}.isDefined + } + + /** + * Returns true when an expression contains a subquery that has outer reference(s). The outer + * reference attributes are kept as children of subquery expression by + * [[org.apache.spark.sql.catalyst.analysis.Analyzer.ResolveSubquery]] + */ def hasCorrelatedSubquery(e: Expression): Boolean = { e.find { - case e: SubqueryExpression if e.children.nonEmpty => true + case s: SubqueryExpression if s.children.nonEmpty => true case _ => false }.isDefined } } +object SubExprUtils extends PredicateHelper { + /** + * Returns true when an expression contains correlated predicates i.e outer references and + * returns false otherwise. + */ + def containsOuter(e: Expression): Boolean = { +e.find(_.isInstanceOf[OuterReference]).isDefined + } + + /** + * Returns whether there are any null-aware predicate subqueries inside Not. If not, we could + * turn the null-aware predicate into not-null-aware predicate. + */ --- End diff -- The following definition is moved from `PredicateSubquery.hasNullAwarePredicateWithinNot` with a code change to reflect the new representation of IN subquery. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...
Github user nsyca commented on a diff in the pull request: https://github.com/apache/spark/pull/16954#discussion_r101593305 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala --- @@ -40,19 +42,179 @@ abstract class PlanExpression[T <: QueryPlan[_]] extends Expression { /** * A base interface for expressions that contain a [[LogicalPlan]]. */ -abstract class SubqueryExpression extends PlanExpression[LogicalPlan] { +abstract class SubqueryExpression( +plan: LogicalPlan, +children: Seq[Expression], +exprId: ExprId) extends PlanExpression[LogicalPlan] { + + override lazy val resolved: Boolean = childrenResolved && plan.resolved + override lazy val references: AttributeSet = +if (plan.resolved) super.references -- plan.outputSet else super.references override def withNewPlan(plan: LogicalPlan): SubqueryExpression + override def semanticEquals(o: Expression): Boolean = o match { +case p: SubqueryExpression => + this.getClass.getName.equals(p.getClass.getName) && plan.sameResult(p.plan) && +children.length == p.children.length && +children.zip(p.children).forall(p => p._1.semanticEquals(p._2)) +case _ => false + } } object SubqueryExpression { + /** + * Returns true when an expression contains an IN or EXISTS subquery and false otherwise. + */ + def hasInOrExistsSubquery(e: Expression): Boolean = { +e.find { + case _: ListQuery | _: Exists => true + case _ => false +}.isDefined + } + + /** + * Returns true when an expression contains a subquery that has outer reference(s). The outer + * reference attributes are kept as children of subquery expression by + * [[org.apache.spark.sql.catalyst.analysis.Analyzer.ResolveSubquery]] + */ def hasCorrelatedSubquery(e: Expression): Boolean = { e.find { - case e: SubqueryExpression if e.children.nonEmpty => true + case s: SubqueryExpression if s.children.nonEmpty => true case _ => false }.isDefined } } +object SubExprUtils extends PredicateHelper { + /** + * Returns true when an expression contains correlated predicates i.e outer references and + * returns false otherwise. + */ --- End diff -- The following definition is moved from a nested definition of the same name in `ResovleSubquery.pullOuterCorrelatedPredicates` in `Analyzer.scala`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/16954#discussion_r101598818 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala --- @@ -365,17 +385,66 @@ object TypeCoercion { } /** - * Convert the value and in list expressions to the common operator type - * by looking at all the argument types and finding the closest one that - * all the arguments can be cast to. When no common operator type is found - * the original expression will be returned and an Analysis Exception will - * be raised at type checking phase. + * Handles type coercion for both IN expression with subquery and IN + * expressions without subquery. + * 1. In the first case, find the common type by comparing the left hand side + *expression types against corresponding right hand side expression derived + *from the subquery expression's plan output. Inject appropriate casts in the + *LHS and RHS side of IN expression. + * + * 2. In the second case, convert the value and in list expressions to the + *common operator type by looking at all the argument types and finding + *the closest one that all the arguments can be cast to. When no common + *operator type is found the original expression will be returned and an + *Analysis Exception will be raised at the type checking phase. */ object InConversion extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan resolveExpressions { // Skip nodes who's children have not been resolved yet. case e if !e.childrenResolved => e + // Handle type casting required between value expression and subquery output + // in IN subquery. + case i @ In(a, Seq(ListQuery(sub, children, exprId))) if !i.resolved => --- End diff -- Plan example is available in [Code Changes](https://docs.google.com/document/d/18mqjhL9V1An-tNta7aVE13HkALRZ5GZ24AATA-Vqqf0/edit#) in section 3. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org