dtenedor commented on code in PR #39592:
URL: https://github.com/apache/spark/pull/39592#discussion_r1072600184
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala:
##########
@@ -275,8 +276,37 @@ object LogicalPlanIntegrity {
* Some plan transformers (e.g., `RemoveNoopOperators`) rewrite logical
* plans based on this assumption.
*/
- def checkIfExprIdsAreGloballyUnique(plan: LogicalPlan): Boolean = {
- checkIfSameExprIdNotReused(plan) && hasUniqueExprIdsForOutput(plan)
+ def validateExprIdUniqueness(plan: LogicalPlan): Unit = {
Review Comment:
what do you think about giving each of these validations a name? then we
could report the name in the error message. One way to easily implement it
could be to just push the name on some global stack that we can reference in
the event of an exception, then we don't have to do any weird exception
wranging.
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala:
##########
@@ -275,8 +276,37 @@ object LogicalPlanIntegrity {
* Some plan transformers (e.g., `RemoveNoopOperators`) rewrite logical
* plans based on this assumption.
*/
- def checkIfExprIdsAreGloballyUnique(plan: LogicalPlan): Boolean = {
- checkIfSameExprIdNotReused(plan) && hasUniqueExprIdsForOutput(plan)
+ def validateExprIdUniqueness(plan: LogicalPlan): Unit = {
+ if (!LogicalPlanIntegrity.checkIfSameExprIdNotReused(plan)) {
+ throw SparkException.internalError("Cannot reuse the exprId in Alias")
+ }
+ if (!LogicalPlanIntegrity.hasUniqueExprIdsForOutput(plan)) {
+ throw SparkException.internalError(
+ "Some Attributes have the same exprId but different data types")
+ }
+ }
+
+ /**
+ * Validate the structural integrity of an optimized plan.
+ * Currently we check after the execution of each rule if a plan:
+ * - is still resolved
+ * - only host special expressions in supported operators
+ * - has globally-unique attribute IDs
+ * - optimized plan have same schema with previous plan.
+ */
+ def validateOptimizedPlan(
+ previousPlan: LogicalPlan,
+ currentPlan: LogicalPlan): Unit = {
+ if (!currentPlan.resolved) {
+ throw SparkException.internalError("The plan becomes unresolved")
+ }
+ if
(currentPlan.exists(PlanHelper.specialExpressionsInUnsupportedOperator(_).nonEmpty))
{
+ throw SparkException.internalError("Special expressions are placed in
the wrong plan")
Review Comment:
can we put the expression and operator in the error message as well?
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala:
##########
@@ -275,8 +276,37 @@ object LogicalPlanIntegrity {
* Some plan transformers (e.g., `RemoveNoopOperators`) rewrite logical
* plans based on this assumption.
*/
- def checkIfExprIdsAreGloballyUnique(plan: LogicalPlan): Boolean = {
- checkIfSameExprIdNotReused(plan) && hasUniqueExprIdsForOutput(plan)
+ def validateExprIdUniqueness(plan: LogicalPlan): Unit = {
+ if (!LogicalPlanIntegrity.checkIfSameExprIdNotReused(plan)) {
+ throw SparkException.internalError("Cannot reuse the exprId in Alias")
+ }
+ if (!LogicalPlanIntegrity.hasUniqueExprIdsForOutput(plan)) {
+ throw SparkException.internalError(
+ "Some Attributes have the same exprId but different data types")
+ }
+ }
+
+ /**
+ * Validate the structural integrity of an optimized plan.
+ * Currently we check after the execution of each rule if a plan:
+ * - is still resolved
+ * - only host special expressions in supported operators
+ * - has globally-unique attribute IDs
+ * - optimized plan have same schema with previous plan.
+ */
+ def validateOptimizedPlan(
+ previousPlan: LogicalPlan,
+ currentPlan: LogicalPlan): Unit = {
+ if (!currentPlan.resolved) {
+ throw SparkException.internalError("The plan becomes unresolved")
Review Comment:
optional: should we put a debug string of the plan in here? (not sure if we
should or not, it could be quite large)
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala:
##########
@@ -191,10 +194,14 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]]
extends Logging {
val tracker: Option[QueryPlanningTracker] = QueryPlanningTracker.get
val beforeMetrics = RuleExecutor.getCurrentMetrics()
- // Run the structural integrity checker against the initial input
- if (!isPlanIntegral(plan, plan)) {
- throw
QueryExecutionErrors.structuralIntegrityOfInputPlanIsBrokenInClassError(
- this.getClass.getName.stripSuffix("$"))
+ // Validate the initial input
+ try {
+ validate(plan, plan)
+ } catch {
+ case e: SparkException if
SparkThrowableHelper.isInternalError(e.getErrorClass) =>
+ val ruleExecutorName = this.getClass.getName.stripSuffix("$")
+ throw SparkException.internalError(
+ "The structural integrity of the input plan is broken in " +
ruleExecutorName, e)
Review Comment:
optional: Maybe instead of reporting that the structural integrity is
broken, we could just mention that "rule XXXXXX generated an invalid plan
because the invariant YYYYYY was violated". This can make it easier to debug
since we know which rule had the bug and (hopefully) some indication of what
the bug was before we start digging into a reproduction. Same for L240 below.
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala:
##########
@@ -275,8 +276,37 @@ object LogicalPlanIntegrity {
* Some plan transformers (e.g., `RemoveNoopOperators`) rewrite logical
* plans based on this assumption.
*/
- def checkIfExprIdsAreGloballyUnique(plan: LogicalPlan): Boolean = {
- checkIfSameExprIdNotReused(plan) && hasUniqueExprIdsForOutput(plan)
+ def validateExprIdUniqueness(plan: LogicalPlan): Unit = {
+ if (!LogicalPlanIntegrity.checkIfSameExprIdNotReused(plan)) {
+ throw SparkException.internalError("Cannot reuse the exprId in Alias")
+ }
+ if (!LogicalPlanIntegrity.hasUniqueExprIdsForOutput(plan)) {
+ throw SparkException.internalError(
+ "Some Attributes have the same exprId but different data types")
Review Comment:
```suggestion
"Some Attributes have the same exprId but different data types")
```
```suggestion
"Multiple attributes have the same expression ID but different data
types")
```
also can this mention the names, data types, and expression IDs of the
attributes to help with future potential debugging?
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala:
##########
@@ -275,8 +276,37 @@ object LogicalPlanIntegrity {
* Some plan transformers (e.g., `RemoveNoopOperators`) rewrite logical
* plans based on this assumption.
*/
- def checkIfExprIdsAreGloballyUnique(plan: LogicalPlan): Boolean = {
- checkIfSameExprIdNotReused(plan) && hasUniqueExprIdsForOutput(plan)
+ def validateExprIdUniqueness(plan: LogicalPlan): Unit = {
+ if (!LogicalPlanIntegrity.checkIfSameExprIdNotReused(plan)) {
+ throw SparkException.internalError("Cannot reuse the exprId in Alias")
+ }
+ if (!LogicalPlanIntegrity.hasUniqueExprIdsForOutput(plan)) {
+ throw SparkException.internalError(
+ "Some Attributes have the same exprId but different data types")
+ }
+ }
+
+ /**
+ * Validate the structural integrity of an optimized plan.
+ * Currently we check after the execution of each rule if a plan:
Review Comment:
```suggestion
* For example, we can check after the execution of each rule that each
plan:
```
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala:
##########
@@ -151,12 +152,14 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]]
extends Logging {
protected val excludedOnceBatches: Set[String] = Set.empty
/**
- * Defines a check function that checks for structural integrity of the plan
after the execution
- * of each rule. For example, we can check whether a plan is still resolved
after each rule in
- * `Optimizer`, so we can catch rules that return invalid plans. The check
function returns
- * `false` if the given plan doesn't pass the structural integrity check.
+ * Defines a validate function that validates the plan after the execution
of each rule, to make
+ * sure these rule still keep the structural integrity of the plan. For
example, we can check
+ * whether a plan is still resolved after each rule in `Optimizer`, so we
can catch rules that
+ * return invalid plans.
Review Comment:
```suggestion
* return invalid plans. As another example, we can check that no
attribute references are dangling in the plan.
```
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala:
##########
@@ -275,8 +276,37 @@ object LogicalPlanIntegrity {
* Some plan transformers (e.g., `RemoveNoopOperators`) rewrite logical
* plans based on this assumption.
*/
- def checkIfExprIdsAreGloballyUnique(plan: LogicalPlan): Boolean = {
- checkIfSameExprIdNotReused(plan) && hasUniqueExprIdsForOutput(plan)
+ def validateExprIdUniqueness(plan: LogicalPlan): Unit = {
+ if (!LogicalPlanIntegrity.checkIfSameExprIdNotReused(plan)) {
+ throw SparkException.internalError("Cannot reuse the exprId in Alias")
Review Comment:
```suggestion
throw SparkException.internalError("One or more aliases reuse the same
expression ID as previously present in attribute references, which is invalid")
```
also can this mention the names, data types, and expression IDs of the
aliases and attributes to help with future potential debugging?
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala:
##########
@@ -275,8 +276,37 @@ object LogicalPlanIntegrity {
* Some plan transformers (e.g., `RemoveNoopOperators`) rewrite logical
* plans based on this assumption.
*/
- def checkIfExprIdsAreGloballyUnique(plan: LogicalPlan): Boolean = {
- checkIfSameExprIdNotReused(plan) && hasUniqueExprIdsForOutput(plan)
+ def validateExprIdUniqueness(plan: LogicalPlan): Unit = {
+ if (!LogicalPlanIntegrity.checkIfSameExprIdNotReused(plan)) {
+ throw SparkException.internalError("Cannot reuse the exprId in Alias")
+ }
+ if (!LogicalPlanIntegrity.hasUniqueExprIdsForOutput(plan)) {
+ throw SparkException.internalError(
+ "Some Attributes have the same exprId but different data types")
+ }
+ }
+
+ /**
+ * Validate the structural integrity of an optimized plan.
+ * Currently we check after the execution of each rule if a plan:
+ * - is still resolved
+ * - only host special expressions in supported operators
+ * - has globally-unique attribute IDs
+ * - optimized plan have same schema with previous plan.
+ */
Review Comment:
```
* - has no dangling attribute references
```
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala:
##########
@@ -275,8 +276,37 @@ object LogicalPlanIntegrity {
* Some plan transformers (e.g., `RemoveNoopOperators`) rewrite logical
* plans based on this assumption.
*/
- def checkIfExprIdsAreGloballyUnique(plan: LogicalPlan): Boolean = {
- checkIfSameExprIdNotReused(plan) && hasUniqueExprIdsForOutput(plan)
+ def validateExprIdUniqueness(plan: LogicalPlan): Unit = {
+ if (!LogicalPlanIntegrity.checkIfSameExprIdNotReused(plan)) {
+ throw SparkException.internalError("Cannot reuse the exprId in Alias")
+ }
+ if (!LogicalPlanIntegrity.hasUniqueExprIdsForOutput(plan)) {
+ throw SparkException.internalError(
+ "Some Attributes have the same exprId but different data types")
+ }
+ }
+
+ /**
+ * Validate the structural integrity of an optimized plan.
+ * Currently we check after the execution of each rule if a plan:
+ * - is still resolved
+ * - only host special expressions in supported operators
+ * - has globally-unique attribute IDs
+ * - optimized plan have same schema with previous plan.
+ */
+ def validateOptimizedPlan(
+ previousPlan: LogicalPlan,
+ currentPlan: LogicalPlan): Unit = {
+ if (!currentPlan.resolved) {
+ throw SparkException.internalError("The plan becomes unresolved")
+ }
+ if
(currentPlan.exists(PlanHelper.specialExpressionsInUnsupportedOperator(_).nonEmpty))
{
+ throw SparkException.internalError("Special expressions are placed in
the wrong plan")
+ }
+ LogicalPlanIntegrity.validateExprIdUniqueness(currentPlan)
+ if (!DataType.equalsIgnoreNullability(previousPlan.schema,
currentPlan.schema)) {
+ throw SparkException.internalError("The plan output schema has changed")
Review Comment:
same, can we put the previous and new schema in the error message?
##########
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/RuleExecutorSuite.scala:
##########
@@ -71,43 +71,45 @@ class RuleExecutorSuite extends SparkFunSuite {
assert(message.contains("Max iterations (10) reached for batch
fixedPoint"))
}
- test("structural integrity checker - verify initial input") {
+ test("structural integrity validation - verify initial input") {
object WithSIChecker extends RuleExecutor[Expression] {
- override protected def isPlanIntegral(
+ override protected def validate(
previousPlan: Expression,
- currentPlan: Expression): Boolean = currentPlan match {
- case IntegerLiteral(_) => true
- case _ => false
+ currentPlan: Expression): Unit = currentPlan match {
+ case IntegerLiteral(_) =>
+ case _ => throw SparkException.internalError("not integer")
}
val batches = Batch("once", FixedPoint(1), DecrementLiterals) :: Nil
}
assert(WithSIChecker.execute(Literal(10)) === Literal(9))
- val message = intercept[RuntimeException] {
+ val e = intercept[SparkException] {
// The input is already invalid as determined by
WithSIChecker.isPlanIntegral
WithSIChecker.execute(Literal(10.1))
- }.getMessage
- assert(message.contains("The structural integrity of the input plan is
broken"))
+ }
+ assert(e.getMessage.contains("The structural integrity of the input plan
is broken"))
Review Comment:
I guess there is no reason why we cannot make an error class for this :)
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -304,6 +304,14 @@ object SQLConf {
.stringConf
.createOptional
+ val PLAN_CHANGE_VALIDATION = buildConf("spark.sql.planChangeValidation")
+ .internal()
+ .doc("If true, Spark will validate all the plan changes made by
analyzer/optimizer and other " +
+ "catalyst rules, to make sure every rule returns a valid plan")
+ .version("3.4.0")
+ .booleanConf
+ .createWithDefault(false)
Review Comment:
I like the Utils.isTesting coverage we have. Should we continue to use that
and enable the checks in that case? Also we can use this config to allow
explicit opt-in too.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]