asfgit closed pull request #23390: [SPARK-26459][SQL] replace
UpdateNullabilityInAttributeReferences with FixNullability
URL: https://github.com/apache/spark/pull/23390
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 2aa0f2117364c..a84bb7653c527 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -197,8 +197,8 @@ class Analyzer(
PullOutNondeterministic),
Batch("UDF", Once,
HandleNullInputsForUDF),
- Batch("FixNullability", Once,
- FixNullability),
+ Batch("UpdateNullability", Once,
+ UpdateAttributeNullability),
Batch("Subquery", Once,
UpdateOuterReferences),
Batch("Cleanup", fixedPoint,
@@ -1821,40 +1821,6 @@ class Analyzer(
}
}
- /**
- * Fixes nullability of Attributes in a resolved LogicalPlan by using the
nullability of
- * corresponding Attributes of its children output Attributes. This step is
needed because
- * users can use a resolved AttributeReference in the Dataset API and outer
joins
- * can change the nullability of an AttribtueReference. Without the fix, a
nullable column's
- * nullable field can be actually set as non-nullable, which cause illegal
optimization
- * (e.g., NULL propagation) and wrong answers.
- * See SPARK-13484 and SPARK-13801 for the concrete queries of this case.
- */
- object FixNullability extends Rule[LogicalPlan] {
-
- def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsUp {
- case p if !p.resolved => p // Skip unresolved nodes.
- case p: LogicalPlan if p.resolved =>
- val childrenOutput = p.children.flatMap(c =>
c.output).groupBy(_.exprId).flatMap {
- case (exprId, attributes) =>
- // If there are multiple Attributes having the same ExprId, we
need to resolve
- // the conflict of nullable field. We do not really expect this
happen.
- val nullable = attributes.exists(_.nullable)
- attributes.map(attr => attr.withNullability(nullable))
- }.toSeq
- // At here, we create an AttributeMap that only compare the exprId for
the lookup
- // operation. So, we can find the corresponding input attribute's
nullability.
- val attributeMap = AttributeMap[Attribute](childrenOutput.map(attr =>
attr -> attr))
- // For an Attribute used by the current LogicalPlan, if it is from its
children,
- // we fix the nullable field by using the nullability setting of the
corresponding
- // output Attribute from the children.
- p.transformExpressions {
- case attr: Attribute if attributeMap.contains(attr) =>
- attr.withNullability(attributeMap(attr).nullable)
- }
- }
- }
-
/**
* Extracts [[WindowExpression]]s from the projectList of a [[Project]]
operator and
* aggregateExpressions of an [[Aggregate]] operator and creates individual
[[Window]]
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UpdateAttributeNullability.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UpdateAttributeNullability.scala
new file mode 100644
index 0000000000000..8655decdcf278
--- /dev/null
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UpdateAttributeNullability.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan}
+import org.apache.spark.sql.catalyst.rules.Rule
+
+/**
+ * Updates nullability of Attributes in a resolved LogicalPlan by using the
nullability of
+ * corresponding Attributes of its children output Attributes. This step is
needed because
+ * users can use a resolved AttributeReference in the Dataset API and outer
joins
+ * can change the nullability of an AttribtueReference. Without this rule, a
nullable column's
+ * nullable field can be actually set as non-nullable, which cause illegal
optimization
+ * (e.g., NULL propagation) and wrong answers.
+ * See SPARK-13484 and SPARK-13801 for the concrete queries of this case.
+ *
+ * This rule should be executed again at the end of optimization phase, as
optimizer may change
+ * some expressions and their nullabilities as well. See SPARK-21351 for more
details.
+ */
+object UpdateAttributeNullability extends Rule[LogicalPlan] {
+
+ def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsUp {
+ // Skip unresolved nodes.
+ case p if !p.resolved => p
+ // Skip leaf node, as it has no child and no need to update nullability.
+ case p: LeafNode => p
+ case p: LogicalPlan =>
+ val nullabilities = p.children.flatMap(c =>
c.output).groupBy(_.exprId).map {
+ // If there are multiple Attributes having the same ExprId, we need to
resolve
+ // the conflict of nullable field. We do not really expect this to
happen.
+ case (exprId, attributes) => exprId -> attributes.exists(_.nullable)
+ }
+ // For an Attribute used by the current LogicalPlan, if it is from its
children,
+ // we fix the nullable field by using the nullability setting of the
corresponding
+ // output Attribute from the children.
+ p.transformExpressions {
+ case attr: Attribute if nullabilities.contains(attr.exprId) =>
+ attr.withNullability(nullabilities(attr.exprId))
+ }
+ }
+}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index d51dc6663d434..d92f7f860b1b8 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -179,8 +179,7 @@ abstract class Optimizer(sessionCatalog: SessionCatalog)
ColumnPruning,
CollapseProject,
RemoveNoopOperators) :+
- Batch("UpdateAttributeReferences", Once,
- UpdateNullabilityInAttributeReferences) :+
+ Batch("UpdateNullability", Once, UpdateAttributeNullability) :+
// This batch must be executed after the `RewriteSubquery` batch, which
creates joins.
Batch("NormalizeFloatingNumbers", Once, NormalizeFloatingNumbers)
}
@@ -1647,18 +1646,3 @@ object RemoveRepetitionFromGroupExpressions extends
Rule[LogicalPlan] {
}
}
}
-
-/**
- * Updates nullability in [[AttributeReference]]s if nullability is different
between
- * non-leaf plan's expressions and the children output.
- */
-object UpdateNullabilityInAttributeReferences extends Rule[LogicalPlan] {
- def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
- case p if !p.isInstanceOf[LeafNode] =>
- val nullabilityMap = AttributeMap(p.children.flatMap(_.output).map { x
=> x -> x.nullable })
- p transformExpressions {
- case ar: AttributeReference if nullabilityMap.contains(ar) =>
- ar.withNullability(nullabilityMap(ar))
- }
- }
-}
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/UpdateNullabilityInAttributeReferencesSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/UpdateAttributeNullabilityInOptimizerSuite.scala
similarity index 89%
rename from
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/UpdateNullabilityInAttributeReferencesSuite.scala
rename to
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/UpdateAttributeNullabilityInOptimizerSuite.scala
index 09b11f5aba2a0..6d6f799b830f3 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/UpdateNullabilityInAttributeReferencesSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/UpdateAttributeNullabilityInOptimizerSuite.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.catalyst.optimizer
+import org.apache.spark.sql.catalyst.analysis.UpdateAttributeNullability
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.expressions.{CreateArray, GetArrayItem}
@@ -25,7 +26,7 @@ import
org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.RuleExecutor
-class UpdateNullabilityInAttributeReferencesSuite extends PlanTest {
+class UpdateAttributeNullabilityInOptimizerSuite extends PlanTest {
object Optimizer extends RuleExecutor[LogicalPlan] {
val batches =
@@ -36,8 +37,8 @@ class UpdateNullabilityInAttributeReferencesSuite extends
PlanTest {
SimplifyConditionals,
SimplifyBinaryComparison,
SimplifyExtractValueOps) ::
- Batch("UpdateAttributeReferences", Once,
- UpdateNullabilityInAttributeReferences) :: Nil
+ Batch("UpdateNullability", Once,
+ UpdateAttributeNullability) :: Nil
}
test("update nullability in AttributeReference") {
@@ -46,7 +47,7 @@ class UpdateNullabilityInAttributeReferencesSuite extends
PlanTest {
// nullable AttributeReference to `b`, because both array indexing and map
lookup are
// nullable expressions. After optimization, the same attribute is now
non-nullable,
// but the AttributeReference is not updated to reflect this. So, we need
to update nullability
- // by the `UpdateNullabilityInAttributeReferences` rule.
+ // by the `UpdateAttributeNullability` rule.
val original = rel
.select(GetArrayItem(CreateArray(Seq('a, 'a + 1L)), 0) as "b")
.groupBy($"b")("1")
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]