averyqi-db commented on PR #48127:
URL: https://github.com/apache/spark/pull/48127#issuecomment-2359584452

   > > I don't quite agree with this. The underlying catalog/data source may be 
case-sensitive and It's important to keep the query schema(column names) 
unchanged
   > 
   > Yeah, I think that's a fair point. Don't have the full context here, I 
assume the intention of this PR is to avoid certain "false-alarms" raised by 
this validation, but if `AggregatePushdownThroughJoins` changes the column from 
`ppmonth` to `Ppmonth` with `spark.sql.caseSensitive` set to true, then we'd 
have a true alarm.
   > 
   > Maybe I'm missing something, maybe it'd be better to just fix 
`AggregatePushdownThroughJoins` to avoid changing the column names regardless 
of the setting of `spark.sql.caseSensitive`. cc @averyqi-db
   
   Sorry I should state more clearly about the context.
   
   So here's the context, users have this query running under 
`spark.sql.caseSensitive` = false configuration.
   ```
   SELECT * FROM (
             |    Select a.ppmonth,
             |    a.ppweek,
             |    case when a.retsubcategoryderived <= 1 then 'XXXXXXXXXXXXX'
             |    else
             |    'XXXXXX'
             |    end as mappedflag,
             |    b.name as subcategory_name,
             |    sum(a.totalvalue) as RDOLLARS
             |    from a, b
             |    where a.retsubcategoryderived = b.retsubcategoryderived
             |    group by a.Ppmonth,a.ppweek,a.retsubcategoryderived,b.name, 
mappedflag)
   ```
   Though they have stated ppmonth in two different ways, ppmonth and Ppmonth, 
as they are running under case insensitive mode, they expect this query to run 
successfully.
   
   However, they got error message saying that: 
`org.apache.spark.SparkException: [PLAN_VALIDATION_FAILED_RULE_IN_BATCH] Rule 
com.databricks.sql.optimizer.AggregatePushdownThroughJoins in batch 
AggregatePushdownThroughJoins generated an invalid plan: The plan output schema 
has changed`
   
   And this error message is because AggregatePushDownThroughJoins are trying 
to push down aggregation through join operator and change the plan from:
   ```
   Aggregate [Ppmonth#3L, ppweek#4L, retsubcategoryderived#7L, name#13, 
_groupingexpression#29], [ppmonth#3L, ppweek#4L, _groupingexpression#29 AS 
mappedflag#0, name#13 AS subcategory_name#1, sum(totalvalue#9L) AS RDOLLARS#2L]
   +- Project [ppmonth#3L, ppweek#4L, retsubcategoryderived#7L, totalvalue#9L, 
name#13, CASE WHEN (retsubcategoryderived#7L <= 1) THEN XXXXXXXXXXXXX ELSE 
XXXXXX END AS _groupingexpression#29]
      +- Join Inner, (retsubcategoryderived#7L = retsubcategoryderived#10L)
         :- Project [ppmonth#3L, ppweek#4L, retsubcategoryderived#7L, 
totalvalue#9L]
         :  +- Filter isnotnull(retsubcategoryderived#7L)
         :     +- Relation 
spark_catalog.default.a[ppmonth#3L,ppweek#4L,retcategorygroupderived#5L,rethidsubcategoryderived#6L,retsubcategoryderived#7L,retsupercategoryderived#8L,totalvalue#9L]
 parquet
         +- Project [retsubcategoryderived#10L, name#13]
            +- Filter isnotnull(retsubcategoryderived#10L)
               +- Relation 
spark_catalog.default.b[retsubcategoryderived#10L,description#11,displayorder#12L,name#13,shortname#14,startrange#15,endrange#16,retcategoryderived#17L,retcategorygroupderived#18L,retsupercategoryderived#19L,altbusiness#20L]
 parquet
   ```
   To
   ```
   Project [Ppmonth#3L, ppweek#4L, _groupingexpression#29 AS mappedflag#0, 
name#13 AS subcategory_name#1, sum(totalvalue#9L)#23L AS RDOLLARS#2L]
   +- AggregatePart [Ppmonth#3L, ppweek#4L, retsubcategoryderived#7L, name#13, 
_groupingexpression#29], [finalmerge_sum(merge sum#31L) AS 
sum(totalvalue#9L)#23L], true
      +- AggregatePart [Ppmonth#3L, ppweek#4L, retsubcategoryderived#7L, 
name#13, _groupingexpression#29], [merge_sum(merge sum#31L) AS sum#31L], false
         +- Project [Ppmonth#3L, ppweek#4L, retsubcategoryderived#7L, name#13, 
_groupingexpression#29, sum#31L]
            +- Join Inner, (retsubcategoryderived#7L = 
retsubcategoryderived#10L)
               :- AggregatePart [Ppmonth#3L, ppweek#4L, 
retsubcategoryderived#7L, CASE WHEN (retsubcategoryderived#7L <= 1) THEN 
XXXXXXXXXXXXX ELSE XXXXXX END AS _groupingexpression#29], 
[partial_sum(totalvalue#9L) AS sum#31L], false
               :  +- Project [ppmonth#3L, ppweek#4L, retsubcategoryderived#7L, 
totalvalue#9L]
               :     +- Filter isnotnull(retsubcategoryderived#7L)
               :        +- Relation 
spark_catalog.default.a[ppmonth#3L,ppweek#4L,retcategorygroupderived#5L,rethidsubcategoryderived#6L,retsubcategoryderived#7L,retsupercategoryderived#8L,totalvalue#9L]
 parquet
               +- Project [retsubcategoryderived#10L, name#13]
                  +- Filter isnotnull(retsubcategoryderived#10L)
                     +- Relation 
spark_catalog.default.b[retsubcategoryderived#10L,description#11,displayorder#12L,name#13,shortname#14,startrange#15,endrange#16,retcategoryderived#17L,retcategorygroupderived#18L,retsupercategoryderived#19L,altbusiness#20L]
 parquet
   ```
   And there's nothing wrong with `AggregatePushdownThroughJoins` cause all 
optimization is based on the assumption that the child output schema should 
equal to the input schema of the parent operator. And it is hard to distinguish 
between where Ppmonth or ppmonth appear in the optimization rule.
   
   The error appear because we're checking the schema in case sensitive mode 
even the caseSensitivity flag is set to false. And DataType actually provide 
caseSensitiveCheck and caseInsensitiveCheck, we just need to use corresponding 
checking method according to the config in validateSchemaOutput.
   
   I understand there might be risk introducing rules which actually change the 
case of schema, but as it can only pass the check under case insensitive mode, 
it should not raise much concerns as under case insensitive mode, it is 
acceptable IIUC.
   
   Please correct me if my understanding is wrong. For example, there might be 
high risks introduced by this new schema check.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to