bersprockets opened a new pull request, #44806:
URL: https://github.com/apache/spark/pull/44806

   ### What changes were proposed in this pull request?
   
   When canonicalizing `output` in `InMemoryRelation`, use `output` itself as 
the schema for determining the ordinals, rather than `cachedPlan.output`.
   
   ### Why are the changes needed?
   
   `InMemoryRelation.output` and `InMemoryRelation.cachedPlan.output` don't 
necessarily use the same exprIds. Because of this, `InMemoryRelation` will 
sometimes fail to fully canonicalize, resulting in cases where two semantically 
equivalent `InMemoryRelation` instances appear to be semantically nonequivalent.
   
   Example:
   ```
   create or replace temp view data(c1, c2) as values
   (1, 2),
   (1, 3),
   (3, 7),
   (4, 5);
   
   cache table data;
   
   select c1, (select count(*) from data d1 where d1.c1 = d2.c1), count(c2) 
from data d2 group by all;
   ```
   If plan change validation checking is on (i.e., 
`spark.sql.planChangeValidation=true`), the failure is:
   ```
   [PLAN_VALIDATION_FAILED_RULE_EXECUTOR] The input plan of 
org.apache.spark.sql.internal.BaseSessionStateBuilder$$anon$2 is invalid: 
Aggregate: Aggregate [c1#78, scalar-subquery#77 [c1#78]], [c1#78, 
scalar-subquery#77 [c1#78] AS scalarsubquery(c1)#90L, count(c2#79) AS 
count(c2)#83L]
   ...
   is not a valid aggregate expression: 
[SCALAR_SUBQUERY_IS_IN_GROUP_BY_OR_AGGREGATE_FUNCTION] The correlated scalar 
subquery '"scalarsubquery(c1)"' is neither present in GROUP BY, nor in an 
aggregate function.
   ```
   If plan change validation checking is off, the failure is more mysterious:
   ```
   [INTERNAL_ERROR] Couldn't find count(1)#163L in 
[c1#78,_groupingexpression#149L,count(1)#82L] SQLSTATE: XX000
   org.apache.spark.SparkException: [INTERNAL_ERROR] Couldn't find 
count(1)#163L in [c1#78,_groupingexpression#149L,count(1)#82L] SQLSTATE: XX000
   ```
   If you remove the cache command, the query succeeds.
   
   The above failures happen because the subquery in the aggregate expressions 
and the subquery in the grouping expressions seem semantically nonequivalent 
since the `InMemoryRelation` in one of the subquery plans failed to completely 
canonicalize.
   
   In `CacheManager#useCachedData`, two lookups for the same cached plan may 
create `InMemoryRelation` instances that have different exprIds in `output`. 
That's because the plan fragments used as lookup keys  may have been 
deduplicated by `DeduplicateRelations`, and thus have different exprIds in 
their respective output schemas. When `CacheManager#useCachedData` creates an 
`InMemoryRelation` instance, it borrows the output schema of the plan fragment 
used as the lookup key.
   
   The failure to fully canonicalize has other effects. For example, this query 
fails to reuse the exchange:
   ```
   create or replace temp view data(c1, c2) as values
   (1, 2),
   (1, 3),
   (2, 4),
   (3, 7),
   (7, 22);
   
   cache table data;
   
   set spark.sql.autoBroadcastJoinThreshold=-1;
   set spark.sql.adaptive.enabled=false;
   
   select *
   from data l
   join data r
   on l.c1 = r.c1;
   ```
   
   ### Does this PR introduce _any_ user-facing change?
   
   No.
   
   ### How was this patch tested?
   
   New tests.
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   No.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to