This is an automated email from the ASF dual-hosted git repository.

yamamuro pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 6955638  [SPARK-26459][SQL] replace 
UpdateNullabilityInAttributeReferences with FixNullability
6955638 is described below

commit 6955638eae99cbe0a890a50e0c61c17641e7269f
Author: Wenchen Fan <wenc...@databricks.com>
AuthorDate: Thu Jan 10 20:15:25 2019 +0900

    [SPARK-26459][SQL] replace UpdateNullabilityInAttributeReferences with 
FixNullability
    
    ## What changes were proposed in this pull request?
    
    This is a followup of https://github.com/apache/spark/pull/18576
    
    The newly added rule `UpdateNullabilityInAttributeReferences` does the same 
thing the `FixNullability` does, we only need to keep one of them.
    
    This PR removes `UpdateNullabilityInAttributeReferences`, and use 
`FixNullability` to replace it. Also rename it to `UpdateAttributeNullability`
    
    ## How was this patch tested?
    
    existing tests
    
    Closes #23390 from cloud-fan/nullable.
    
    Authored-by: Wenchen Fan <wenc...@databricks.com>
    Signed-off-by: Takeshi Yamamuro <yamam...@apache.org>
---
 .../spark/sql/catalyst/analysis/Analyzer.scala     | 38 +--------------
 .../analysis/UpdateAttributeNullability.scala      | 57 ++++++++++++++++++++++
 .../spark/sql/catalyst/optimizer/Optimizer.scala   | 18 +------
 ...dateAttributeNullabilityInOptimizerSuite.scala} |  9 ++--
 4 files changed, 65 insertions(+), 57 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 2aa0f21..a84bb76 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -197,8 +197,8 @@ class Analyzer(
       PullOutNondeterministic),
     Batch("UDF", Once,
       HandleNullInputsForUDF),
-    Batch("FixNullability", Once,
-      FixNullability),
+    Batch("UpdateNullability", Once,
+      UpdateAttributeNullability),
     Batch("Subquery", Once,
       UpdateOuterReferences),
     Batch("Cleanup", fixedPoint,
@@ -1822,40 +1822,6 @@ class Analyzer(
   }
 
   /**
-   * Fixes nullability of Attributes in a resolved LogicalPlan by using the 
nullability of
-   * corresponding Attributes of its children output Attributes. This step is 
needed because
-   * users can use a resolved AttributeReference in the Dataset API and outer 
joins
-   * can change the nullability of an AttribtueReference. Without the fix, a 
nullable column's
-   * nullable field can be actually set as non-nullable, which cause illegal 
optimization
-   * (e.g., NULL propagation) and wrong answers.
-   * See SPARK-13484 and SPARK-13801 for the concrete queries of this case.
-   */
-  object FixNullability extends Rule[LogicalPlan] {
-
-    def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsUp {
-      case p if !p.resolved => p // Skip unresolved nodes.
-      case p: LogicalPlan if p.resolved =>
-        val childrenOutput = p.children.flatMap(c => 
c.output).groupBy(_.exprId).flatMap {
-          case (exprId, attributes) =>
-            // If there are multiple Attributes having the same ExprId, we 
need to resolve
-            // the conflict of nullable field. We do not really expect this 
happen.
-            val nullable = attributes.exists(_.nullable)
-            attributes.map(attr => attr.withNullability(nullable))
-        }.toSeq
-        // At here, we create an AttributeMap that only compare the exprId for 
the lookup
-        // operation. So, we can find the corresponding input attribute's 
nullability.
-        val attributeMap = AttributeMap[Attribute](childrenOutput.map(attr => 
attr -> attr))
-        // For an Attribute used by the current LogicalPlan, if it is from its 
children,
-        // we fix the nullable field by using the nullability setting of the 
corresponding
-        // output Attribute from the children.
-        p.transformExpressions {
-          case attr: Attribute if attributeMap.contains(attr) =>
-            attr.withNullability(attributeMap(attr).nullable)
-        }
-    }
-  }
-
-  /**
    * Extracts [[WindowExpression]]s from the projectList of a [[Project]] 
operator and
    * aggregateExpressions of an [[Aggregate]] operator and creates individual 
[[Window]]
    * operators for every distinct [[WindowSpecDefinition]].
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UpdateAttributeNullability.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UpdateAttributeNullability.scala
new file mode 100644
index 0000000..8655dec
--- /dev/null
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UpdateAttributeNullability.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan}
+import org.apache.spark.sql.catalyst.rules.Rule
+
+/**
+ * Updates nullability of Attributes in a resolved LogicalPlan by using the 
nullability of
+ * corresponding Attributes of its children output Attributes. This step is 
needed because
+ * users can use a resolved AttributeReference in the Dataset API and outer 
joins
+ * can change the nullability of an AttribtueReference. Without this rule, a 
nullable column's
+ * nullable field can be actually set as non-nullable, which cause illegal 
optimization
+ * (e.g., NULL propagation) and wrong answers.
+ * See SPARK-13484 and SPARK-13801 for the concrete queries of this case.
+ *
+ * This rule should be executed again at the end of optimization phase, as 
optimizer may change
+ * some expressions and their nullabilities as well. See SPARK-21351 for more 
details.
+ */
+object UpdateAttributeNullability extends Rule[LogicalPlan] {
+
+  def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsUp {
+    // Skip unresolved nodes.
+    case p if !p.resolved => p
+    // Skip leaf node, as it has no child and no need to update nullability.
+    case p: LeafNode => p
+    case p: LogicalPlan =>
+      val nullabilities = p.children.flatMap(c => 
c.output).groupBy(_.exprId).map {
+        // If there are multiple Attributes having the same ExprId, we need to 
resolve
+        // the conflict of nullable field. We do not really expect this to 
happen.
+        case (exprId, attributes) => exprId -> attributes.exists(_.nullable)
+      }
+      // For an Attribute used by the current LogicalPlan, if it is from its 
children,
+      // we fix the nullable field by using the nullability setting of the 
corresponding
+      // output Attribute from the children.
+      p.transformExpressions {
+        case attr: Attribute if nullabilities.contains(attr.exprId) =>
+          attr.withNullability(nullabilities(attr.exprId))
+      }
+  }
+}
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index d51dc66..d92f7f8 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -179,8 +179,7 @@ abstract class Optimizer(sessionCatalog: SessionCatalog)
       ColumnPruning,
       CollapseProject,
       RemoveNoopOperators) :+
-    Batch("UpdateAttributeReferences", Once,
-      UpdateNullabilityInAttributeReferences) :+
+    Batch("UpdateNullability", Once, UpdateAttributeNullability) :+
     // This batch must be executed after the `RewriteSubquery` batch, which 
creates joins.
     Batch("NormalizeFloatingNumbers", Once, NormalizeFloatingNumbers)
   }
@@ -1647,18 +1646,3 @@ object RemoveRepetitionFromGroupExpressions extends 
Rule[LogicalPlan] {
       }
   }
 }
-
-/**
- * Updates nullability in [[AttributeReference]]s if nullability is different 
between
- * non-leaf plan's expressions and the children output.
- */
-object UpdateNullabilityInAttributeReferences extends Rule[LogicalPlan] {
-  def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
-    case p if !p.isInstanceOf[LeafNode] =>
-      val nullabilityMap = AttributeMap(p.children.flatMap(_.output).map { x 
=> x -> x.nullable })
-      p transformExpressions {
-        case ar: AttributeReference if nullabilityMap.contains(ar) =>
-          ar.withNullability(nullabilityMap(ar))
-      }
-  }
-}
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/UpdateNullabilityInAttributeReferencesSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/UpdateAttributeNullabilityInOptimizerSuite.scala
similarity index 89%
rename from 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/UpdateNullabilityInAttributeReferencesSuite.scala
rename to 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/UpdateAttributeNullabilityInOptimizerSuite.scala
index 09b11f5..6d6f799 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/UpdateNullabilityInAttributeReferencesSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/UpdateAttributeNullabilityInOptimizerSuite.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.catalyst.optimizer
 
+import org.apache.spark.sql.catalyst.analysis.UpdateAttributeNullability
 import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.dsl.plans._
 import org.apache.spark.sql.catalyst.expressions.{CreateArray, GetArrayItem}
@@ -25,7 +26,7 @@ import 
org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
 import org.apache.spark.sql.catalyst.rules.RuleExecutor
 
 
-class UpdateNullabilityInAttributeReferencesSuite extends PlanTest {
+class UpdateAttributeNullabilityInOptimizerSuite extends PlanTest {
 
   object Optimizer extends RuleExecutor[LogicalPlan] {
     val batches =
@@ -36,8 +37,8 @@ class UpdateNullabilityInAttributeReferencesSuite extends 
PlanTest {
           SimplifyConditionals,
           SimplifyBinaryComparison,
           SimplifyExtractValueOps) ::
-      Batch("UpdateAttributeReferences", Once,
-        UpdateNullabilityInAttributeReferences) :: Nil
+      Batch("UpdateNullability", Once,
+        UpdateAttributeNullability) :: Nil
   }
 
   test("update nullability in AttributeReference")  {
@@ -46,7 +47,7 @@ class UpdateNullabilityInAttributeReferencesSuite extends 
PlanTest {
     // nullable AttributeReference to `b`, because both array indexing and map 
lookup are
     // nullable expressions. After optimization, the same attribute is now 
non-nullable,
     // but the AttributeReference is not updated to reflect this. So, we need 
to update nullability
-    // by the `UpdateNullabilityInAttributeReferences` rule.
+    // by the `UpdateAttributeNullability` rule.
     val original = rel
       .select(GetArrayItem(CreateArray(Seq('a, 'a + 1L)), 0) as "b")
       .groupBy($"b")("1")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to