Github user cloud-fan commented on a diff in the pull request:
https://github.com/apache/spark/pull/7535#discussion_r35230482
--- Diff:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
---
@@ -38,99 +38,105 @@ trait CheckAnalysis {
throw new AnalysisException(msg)
}
- def containsMultipleGenerators(exprs: Seq[Expression]): Boolean = {
+ protected def containsMultipleGenerators(exprs: Seq[Expression]):
Boolean = {
exprs.flatMap(_.collect {
- case e: Generator => true
- }).nonEmpty
+ case e: Generator => e
+ }).length > 1
}
def checkAnalysis(plan: LogicalPlan): Unit = {
// We transform up and order the rules so as to catch the first
possible failure instead
// of the result of cascading resolution failures.
- plan.foreachUp {
-
- case operator: LogicalPlan =>
- operator transformExpressionsUp {
- case a: Attribute if !a.resolved =>
- val from = operator.inputSet.map(_.name).mkString(", ")
- a.failAnalysis(s"cannot resolve '${a.prettyString}' given
input columns $from")
-
- case e: Expression if e.checkInputDataTypes().isFailure =>
- e.checkInputDataTypes() match {
- case TypeCheckResult.TypeCheckFailure(message) =>
- e.failAnalysis(
- s"cannot resolve '${e.prettyString}' due to data type
mismatch: $message")
- }
-
- case c: Cast if !c.resolved =>
- failAnalysis(
- s"invalid cast from ${c.child.dataType.simpleString} to
${c.dataType.simpleString}")
-
- case WindowExpression(UnresolvedWindowFunction(name, _), _) =>
- failAnalysis(
- s"Could not resolve window function '$name'. " +
- "Note that, using window functions currently requires a
HiveContext")
-
- case w @ WindowExpression(windowFunction, windowSpec) if
windowSpec.validate.nonEmpty =>
- // The window spec is not valid.
- val reason = windowSpec.validate.get
- failAnalysis(s"Window specification $windowSpec is not valid
because $reason")
- }
-
- operator match {
- case f: Filter if f.condition.dataType != BooleanType =>
- failAnalysis(
- s"filter expression '${f.condition.prettyString}' " +
- s"of type ${f.condition.dataType.simpleString} is not a
boolean.")
-
- case Aggregate(groupingExprs, aggregateExprs, child) =>
- def checkValidAggregateExpression(expr: Expression): Unit =
expr match {
- case _: AggregateExpression => // OK
- case e: Attribute if
!groupingExprs.exists(_.semanticEquals(e)) =>
- failAnalysis(
- s"expression '${e.prettyString}' is neither present in
the group by, " +
- s"nor is it an aggregate function. " +
- "Add to group by or wrap in first() if you don't care
which value you get.")
- case e if groupingExprs.exists(_.semanticEquals(e)) => // OK
- case e if e.references.isEmpty => // OK
- case e => e.children.foreach(checkValidAggregateExpression)
- }
-
- aggregateExprs.foreach(checkValidAggregateExpression)
-
- case _ => // Fallbacks to the following checks
- }
-
- operator match {
- case o if o.children.nonEmpty && o.missingInput.nonEmpty =>
- val missingAttributes = o.missingInput.mkString(",")
- val input = o.inputSet.mkString(",")
-
- failAnalysis(
- s"resolved attribute(s) $missingAttributes missing from
$input " +
- s"in operator ${operator.simpleString}")
-
- case p @ Project(exprs, _) if containsMultipleGenerators(exprs)
=>
- failAnalysis(
- s"""Only a single table generating function is allowed in a
SELECT clause, found:
- |
${exprs.map(_.prettyString).mkString(",")}""".stripMargin)
-
- // Special handling for cases when self-join introduce duplicate
expression ids.
- case j @ Join(left, right, _, _) if
left.outputSet.intersect(right.outputSet).nonEmpty =>
- val conflictingAttributes =
left.outputSet.intersect(right.outputSet)
- failAnalysis(
- s"""
- |Failure when resolving conflicting references in Join:
- |$plan
- |Conflicting attributes:
${conflictingAttributes.mkString(",")}
- |""".stripMargin)
-
- case o if !o.resolved =>
- failAnalysis(
- s"unresolved operator ${operator.simpleString}")
-
- case _ => // Analysis successful!
- }
+ plan.foreachUp { operator =>
+ operator transformExpressionsUp {
+ case a: Attribute if !a.resolved =>
+ val from = operator.inputSet.map(_.name).mkString(", ")
+ a.failAnalysis(s"cannot resolve '${a.prettyString}' given input
columns $from")
+
+ case e: Expression if e.checkInputDataTypes().isFailure =>
+ e.checkInputDataTypes() match {
+ case TypeCheckResult.TypeCheckFailure(message) =>
+ e.failAnalysis(
+ s"cannot resolve '${e.prettyString}' due to data type
mismatch: $message")
+ }
+
+ case c: Cast if !c.resolved =>
+ failAnalysis(
+ s"invalid cast from ${c.child.dataType.simpleString} to
${c.dataType.simpleString}")
+
+ case WindowExpression(UnresolvedWindowFunction(name, _), _) =>
+ failAnalysis(
+ s"Could not resolve window function '$name'. " +
+ "Note that, using window functions currently requires a
HiveContext")
+
+ case w @ WindowExpression(windowFunction, windowSpec) if
windowSpec.validate.nonEmpty =>
+ // The window spec is not valid.
+ val reason = windowSpec.validate.get
+ failAnalysis(s"Window specification $windowSpec is not valid
because $reason")
+ }
+
+ operator match {
+ case f: Filter if f.condition.dataType != BooleanType =>
+ failAnalysis(
+ s"filter expression '${f.condition.prettyString}' " +
+ s"of type ${f.condition.dataType.simpleString} is not a
boolean.")
+
+ case Aggregate(groupingExprs, aggregateExprs, child) =>
+ def checkValidAggregateExpression(expr: Expression): Unit = expr
match {
+ case _: AggregateExpression => // OK
+ case e: Attribute if
!groupingExprs.exists(_.semanticEquals(e)) =>
+ failAnalysis(
+ s"expression '${e.prettyString}' is neither present in the
group by, " +
+ s"nor is it an aggregate function. " +
+ "Add to group by or wrap in first() if you don't care
which value you get.")
+ case e if groupingExprs.exists(_.semanticEquals(e)) => // OK
+ case e if e.references.isEmpty => // OK
+ case e => e.children.foreach(checkValidAggregateExpression)
+ }
+
+ aggregateExprs.foreach(checkValidAggregateExpression)
+
+ case _ => // Fallbacks to the following checks
+ }
+
+ operator match {
+ case o if o.children.nonEmpty && o.missingInput.nonEmpty =>
+ val missingAttributes = o.missingInput.mkString(",")
+ val input = o.inputSet.mkString(",")
+
+ failAnalysis(
+ s"resolved attribute(s) $missingAttributes missing from $input
" +
+ s"in operator ${operator.simpleString}")
+
+ case p @ Project(exprs, _) if containsMultipleGenerators(exprs) =>
+ failAnalysis(
+ s"""Only a single table generating function is allowed in a
SELECT clause, found:
+ | ${exprs.map(_.prettyString).mkString(",")}""".stripMargin)
+
+ // Special handling for cases when self-join introduce duplicate
expression ids.
+ case j @ Join(left, right, _, _) if
left.outputSet.intersect(right.outputSet).nonEmpty =>
+ val conflictingAttributes =
left.outputSet.intersect(right.outputSet)
+ failAnalysis(
+ s"""
+ |Failure when resolving conflicting references in Join:
+ |$plan
+ |Conflicting attributes:
${conflictingAttributes.mkString(",")}
+ |""".stripMargin)
+
+ case o if !o.resolved =>
+ failAnalysis(
+ s"unresolved operator ${operator.simpleString}")
+
+ case o if o.expressions.exists(!_.deterministic) &&
--- End diff --
here is the newly added checking rule for nondeterministic expressions.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]