asfgit closed pull request #23390: [SPARK-26459][SQL] replace 
UpdateNullabilityInAttributeReferences with FixNullability
URL: https://github.com/apache/spark/pull/23390
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 2aa0f2117364c..a84bb7653c527 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -197,8 +197,8 @@ class Analyzer(
       PullOutNondeterministic),
     Batch("UDF", Once,
       HandleNullInputsForUDF),
-    Batch("FixNullability", Once,
-      FixNullability),
+    Batch("UpdateNullability", Once,
+      UpdateAttributeNullability),
     Batch("Subquery", Once,
       UpdateOuterReferences),
     Batch("Cleanup", fixedPoint,
@@ -1821,40 +1821,6 @@ class Analyzer(
     }
   }
 
-  /**
-   * Fixes nullability of Attributes in a resolved LogicalPlan by using the 
nullability of
-   * corresponding Attributes of its children output Attributes. This step is 
needed because
-   * users can use a resolved AttributeReference in the Dataset API and outer 
joins
-   * can change the nullability of an AttribtueReference. Without the fix, a 
nullable column's
-   * nullable field can be actually set as non-nullable, which cause illegal 
optimization
-   * (e.g., NULL propagation) and wrong answers.
-   * See SPARK-13484 and SPARK-13801 for the concrete queries of this case.
-   */
-  object FixNullability extends Rule[LogicalPlan] {
-
-    def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsUp {
-      case p if !p.resolved => p // Skip unresolved nodes.
-      case p: LogicalPlan if p.resolved =>
-        val childrenOutput = p.children.flatMap(c => 
c.output).groupBy(_.exprId).flatMap {
-          case (exprId, attributes) =>
-            // If there are multiple Attributes having the same ExprId, we 
need to resolve
-            // the conflict of nullable field. We do not really expect this 
happen.
-            val nullable = attributes.exists(_.nullable)
-            attributes.map(attr => attr.withNullability(nullable))
-        }.toSeq
-        // At here, we create an AttributeMap that only compare the exprId for 
the lookup
-        // operation. So, we can find the corresponding input attribute's 
nullability.
-        val attributeMap = AttributeMap[Attribute](childrenOutput.map(attr => 
attr -> attr))
-        // For an Attribute used by the current LogicalPlan, if it is from its 
children,
-        // we fix the nullable field by using the nullability setting of the 
corresponding
-        // output Attribute from the children.
-        p.transformExpressions {
-          case attr: Attribute if attributeMap.contains(attr) =>
-            attr.withNullability(attributeMap(attr).nullable)
-        }
-    }
-  }
-
   /**
    * Extracts [[WindowExpression]]s from the projectList of a [[Project]] 
operator and
    * aggregateExpressions of an [[Aggregate]] operator and creates individual 
[[Window]]
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UpdateAttributeNullability.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UpdateAttributeNullability.scala
new file mode 100644
index 0000000000000..8655decdcf278
--- /dev/null
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UpdateAttributeNullability.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan}
+import org.apache.spark.sql.catalyst.rules.Rule
+
+/**
+ * Updates nullability of Attributes in a resolved LogicalPlan by using the 
nullability of
+ * corresponding Attributes of its children output Attributes. This step is 
needed because
+ * users can use a resolved AttributeReference in the Dataset API and outer 
joins
+ * can change the nullability of an AttribtueReference. Without this rule, a 
nullable column's
+ * nullable field can be actually set as non-nullable, which cause illegal 
optimization
+ * (e.g., NULL propagation) and wrong answers.
+ * See SPARK-13484 and SPARK-13801 for the concrete queries of this case.
+ *
+ * This rule should be executed again at the end of optimization phase, as 
optimizer may change
+ * some expressions and their nullabilities as well. See SPARK-21351 for more 
details.
+ */
+object UpdateAttributeNullability extends Rule[LogicalPlan] {
+
+  def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsUp {
+    // Skip unresolved nodes.
+    case p if !p.resolved => p
+    // Skip leaf node, as it has no child and no need to update nullability.
+    case p: LeafNode => p
+    case p: LogicalPlan =>
+      val nullabilities = p.children.flatMap(c => 
c.output).groupBy(_.exprId).map {
+        // If there are multiple Attributes having the same ExprId, we need to 
resolve
+        // the conflict of nullable field. We do not really expect this to 
happen.
+        case (exprId, attributes) => exprId -> attributes.exists(_.nullable)
+      }
+      // For an Attribute used by the current LogicalPlan, if it is from its 
children,
+      // we fix the nullable field by using the nullability setting of the 
corresponding
+      // output Attribute from the children.
+      p.transformExpressions {
+        case attr: Attribute if nullabilities.contains(attr.exprId) =>
+          attr.withNullability(nullabilities(attr.exprId))
+      }
+  }
+}
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index d51dc6663d434..d92f7f860b1b8 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -179,8 +179,7 @@ abstract class Optimizer(sessionCatalog: SessionCatalog)
       ColumnPruning,
       CollapseProject,
       RemoveNoopOperators) :+
-    Batch("UpdateAttributeReferences", Once,
-      UpdateNullabilityInAttributeReferences) :+
+    Batch("UpdateNullability", Once, UpdateAttributeNullability) :+
     // This batch must be executed after the `RewriteSubquery` batch, which 
creates joins.
     Batch("NormalizeFloatingNumbers", Once, NormalizeFloatingNumbers)
   }
@@ -1647,18 +1646,3 @@ object RemoveRepetitionFromGroupExpressions extends 
Rule[LogicalPlan] {
       }
   }
 }
-
-/**
- * Updates nullability in [[AttributeReference]]s if nullability is different 
between
- * non-leaf plan's expressions and the children output.
- */
-object UpdateNullabilityInAttributeReferences extends Rule[LogicalPlan] {
-  def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
-    case p if !p.isInstanceOf[LeafNode] =>
-      val nullabilityMap = AttributeMap(p.children.flatMap(_.output).map { x 
=> x -> x.nullable })
-      p transformExpressions {
-        case ar: AttributeReference if nullabilityMap.contains(ar) =>
-          ar.withNullability(nullabilityMap(ar))
-      }
-  }
-}
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/UpdateNullabilityInAttributeReferencesSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/UpdateAttributeNullabilityInOptimizerSuite.scala
similarity index 89%
rename from 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/UpdateNullabilityInAttributeReferencesSuite.scala
rename to 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/UpdateAttributeNullabilityInOptimizerSuite.scala
index 09b11f5aba2a0..6d6f799b830f3 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/UpdateNullabilityInAttributeReferencesSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/UpdateAttributeNullabilityInOptimizerSuite.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.catalyst.optimizer
 
+import org.apache.spark.sql.catalyst.analysis.UpdateAttributeNullability
 import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.dsl.plans._
 import org.apache.spark.sql.catalyst.expressions.{CreateArray, GetArrayItem}
@@ -25,7 +26,7 @@ import 
org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
 import org.apache.spark.sql.catalyst.rules.RuleExecutor
 
 
-class UpdateNullabilityInAttributeReferencesSuite extends PlanTest {
+class UpdateAttributeNullabilityInOptimizerSuite extends PlanTest {
 
   object Optimizer extends RuleExecutor[LogicalPlan] {
     val batches =
@@ -36,8 +37,8 @@ class UpdateNullabilityInAttributeReferencesSuite extends 
PlanTest {
           SimplifyConditionals,
           SimplifyBinaryComparison,
           SimplifyExtractValueOps) ::
-      Batch("UpdateAttributeReferences", Once,
-        UpdateNullabilityInAttributeReferences) :: Nil
+      Batch("UpdateNullability", Once,
+        UpdateAttributeNullability) :: Nil
   }
 
   test("update nullability in AttributeReference")  {
@@ -46,7 +47,7 @@ class UpdateNullabilityInAttributeReferencesSuite extends 
PlanTest {
     // nullable AttributeReference to `b`, because both array indexing and map 
lookup are
     // nullable expressions. After optimization, the same attribute is now 
non-nullable,
     // but the AttributeReference is not updated to reflect this. So, we need 
to update nullability
-    // by the `UpdateNullabilityInAttributeReferences` rule.
+    // by the `UpdateAttributeNullability` rule.
     val original = rel
       .select(GetArrayItem(CreateArray(Seq('a, 'a + 1L)), 0) as "b")
       .groupBy($"b")("1")


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to