peter-toth commented on a change in pull request #28885:
URL: https://github.com/apache/spark/pull/28885#discussion_r443198465



##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
##########
@@ -336,12 +336,13 @@ object QueryPlan extends PredicateHelper {
    * `Attribute`, and replace it with `BoundReference` will cause error.
    */
   def normalizeExpressions[T <: Expression](e: T, input: AttributeSeq): T = {
+    type T2 = QueryPlan[_]
     e.transformUp {
-      case s: PlanExpression[QueryPlan[_] @unchecked] =>
+      case s: PlanExpression[T2 @unchecked] =>
         // Normalize the outer references in the subquery plan.
         val normalizedPlan = s.plan.transformAllExpressions {
           case OuterReference(r) => 
OuterReference(QueryPlan.normalizeExpressions(r, input))
-        }
+        }.canonicalized.asInstanceOf[T2]

Review comment:
       As we traverse bottom-up manner across the whole plan, we might run into 
a case when there is a reuse opportunity contains a subquery. If we don't 
canonicalize that subquery then we miss it.
   
   An example is "join key with multiple references on the filtering plan" test 
in `DynamicPartitionPruningSuiteBase where without this line of change only a 
"smaller" subquery reuse happens:
   ```
   == Physical Plan ==
   *(2) Project [id#2729L, a#2730, b#2731]
   +- *(2) BroadcastHashJoin 
[knownfloatingpointnormalized(normalizenanandzero((cast(b#2731 as double) + 
cast(a#2730 as double))))], 
[knownfloatingpointnormalized(normalizenanandzero((cast(y#2734 as double) + 
cast(z#2735 as double))))], Inner, BuildRight
      :- *(2) ColumnarToRow
      :  +- FileScan parquet default.fact[id#2729L,a#2730,b#2731] Batched: 
true, DataFilters: [], Format: Parquet, Location: 
InMemoryFileIndex[file:/Users/petertoth/git/apache/spark/sql/core/spark-warehouse/org.apache.spar...,
 PartitionFilters: [isnotnull(b#2731), isnotnull(a#2730), 
dynamicpruningexpression(knownfloatingpointnormalized(norm..., PushedFilters: 
[], ReadSchema: struct<id:bigint>
      :        +- SubqueryBroadcast dynamicpruning#2742, 0, 
[knownfloatingpointnormalized(normalizenanandzero((cast(y#2734 as double) + 
cast(z#2735 as double))))], [id=#274]
      :           +- BroadcastExchange 
HashedRelationBroadcastMode(List(knownfloatingpointnormalized(normalizenanandzero((cast(input[0,
 string, true] as double) + cast(input[1, string, true] as double)))))), 
[id=#273]
      :              +- *(1) Project [y#2734, z#2735]
      :                 +- *(1) Filter (((isnotnull(x#2733) AND (cast(x#2733 as 
double) = Subquery scalar-subquery#2728, [id=#262])) AND isnotnull(y#2734)) AND 
isnotnull(z#2735))
      :                    :  +- Subquery scalar-subquery#2728, [id=#262]
      :                    :     +- *(2) HashAggregate(keys=[], 
functions=[avg(cast(w#2736 as double))])
      :                    :        +- Exchange SinglePartition, true, [id=#258]
      :                    :           +- *(1) HashAggregate(keys=[], 
functions=[partial_avg(cast(w#2736 as double))])
      :                    :              +- *(1) ColumnarToRow
      :                    :                 +- FileScan parquet 
default.dim[w#2736] Batched: true, DataFilters: [], Format: Parquet, Location: 
InMemoryFileIndex[file:/Users/petertoth/git/apache/spark/sql/core/spark-warehouse/org.apache.spar...,
 PartitionFilters: [], PushedFilters: [], ReadSchema: struct<w:string>
      :                    +- *(1) ColumnarToRow
      :                       +- FileScan parquet 
default.dim[x#2733,y#2734,z#2735] Batched: true, DataFilters: 
[isnotnull(x#2733), isnotnull(y#2734), isnotnull(z#2735)], Format: Parquet, 
Location: 
InMemoryFileIndex[file:/Users/petertoth/git/apache/spark/sql/core/spark-warehouse/org.apache.spar...,
 PartitionFilters: [], PushedFilters: [IsNotNull(x), IsNotNull(y), 
IsNotNull(z)], ReadSchema: struct<x:string,y:string,z:string>
      +- BroadcastExchange 
HashedRelationBroadcastMode(List(knownfloatingpointnormalized(normalizenanandzero((cast(input[0,
 string, true] as double) + cast(input[1, string, true] as double)))))), 
[id=#359]
         +- *(1) Project [y#2734, z#2735]
            +- *(1) Filter (((isnotnull(x#2733) AND (cast(x#2733 as double) = 
ReusedSubquery Subquery scalar-subquery#2728, [id=#262])) AND 
isnotnull(y#2734)) AND isnotnull(z#2735))
               :  +- ReusedSubquery Subquery scalar-subquery#2728, [id=#262]
               +- *(1) ColumnarToRow
                  +- FileScan parquet default.dim[x#2733,y#2734,z#2735] 
Batched: true, DataFilters: [isnotnull(x#2733), isnotnull(y#2734), 
isnotnull(z#2735)], Format: Parquet, Location: 
InMemoryFileIndex[file:/Users/petertoth/git/apache/spark/sql/core/spark-warehouse/org.apache.spar...,
 PartitionFilters: [], PushedFilters: [IsNotNull(x), IsNotNull(y), 
IsNotNull(z)], ReadSchema: struct<x:string,y:string,z:string>
   ```
   but with this line of change we get a "bigger" exchange reuse:
   ```
   *(2) Project [id#2729L, a#2730, b#2731]
   +- *(2) BroadcastHashJoin 
[knownfloatingpointnormalized(normalizenanandzero((cast(b#2731 as double) + 
cast(a#2730 as double))))], 
[knownfloatingpointnormalized(normalizenanandzero((cast(y#2734 as double) + 
cast(z#2735 as double))))], Inner, BuildRight
      :- *(2) ColumnarToRow
      :  +- FileScan parquet default.fact[id#2729L,a#2730,b#2731] Batched: 
true, DataFilters: [], Format: Parquet, Location: 
InMemoryFileIndex[file:/Users/petertoth/git/apache/spark/sql/core/spark-warehouse/org.apache.spar...,
 PartitionFilters: [isnotnull(b#2731), isnotnull(a#2730), 
dynamicpruningexpression(knownfloatingpointnormalized(norm..., PushedFilters: 
[], ReadSchema: struct<id:bigint>
      :        +- SubqueryBroadcast dynamicpruning#2742, 0, 
[knownfloatingpointnormalized(normalizenanandzero((cast(y#2734 as double) + 
cast(z#2735 as double))))], [id=#274]
      :           +- BroadcastExchange 
HashedRelationBroadcastMode(List(knownfloatingpointnormalized(normalizenanandzero((cast(input[0,
 string, true] as double) + cast(input[1, string, true] as double)))))), 
[id=#273]
      :              +- *(1) Project [y#2734, z#2735]
      :                 +- *(1) Filter (((isnotnull(x#2733) AND (cast(x#2733 as 
double) = Subquery scalar-subquery#2728, [id=#262])) AND isnotnull(y#2734)) AND 
isnotnull(z#2735))
      :                    :  +- Subquery scalar-subquery#2728, [id=#262]
      :                    :     +- *(2) HashAggregate(keys=[], 
functions=[avg(cast(w#2736 as double))])
      :                    :        +- Exchange SinglePartition, true, [id=#258]
      :                    :           +- *(1) HashAggregate(keys=[], 
functions=[partial_avg(cast(w#2736 as double))])
      :                    :              +- *(1) ColumnarToRow
      :                    :                 +- FileScan parquet 
default.dim[w#2736] Batched: true, DataFilters: [], Format: Parquet, Location: 
InMemoryFileIndex[file:/Users/petertoth/git/apache/spark/sql/core/spark-warehouse/org.apache.spar...,
 PartitionFilters: [], PushedFilters: [], ReadSchema: struct<w:string>
      :                    +- *(1) ColumnarToRow
      :                       +- FileScan parquet 
default.dim[x#2733,y#2734,z#2735] Batched: true, DataFilters: 
[isnotnull(x#2733), isnotnull(y#2734), isnotnull(z#2735)], Format: Parquet, 
Location: 
InMemoryFileIndex[file:/Users/petertoth/git/apache/spark/sql/core/spark-warehouse/org.apache.spar...,
 PartitionFilters: [], PushedFilters: [IsNotNull(x), IsNotNull(y), 
IsNotNull(z)], ReadSchema: struct<x:string,y:string,z:string>
      +- ReusedExchange [y#2734, z#2735], BroadcastExchange 
HashedRelationBroadcastMode(List(knownfloatingpointnormalized(normalizenanandzero((cast(input[0,
 string, true] as double) + cast(input[1, string, true] as double)))))), 
[id=#273]
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to