peter-toth commented on PR #38034:
URL: https://github.com/apache/spark/pull/38034#issuecomment-1367218779

   @sigmod,
   
   > do you have more concrete examples in mind for multiTransform?
   
   I gave 3 concrete examples where `multiTranform` could help. All of these 
PRs contain 2 commits. The first commit is equal to this PR, the second 
contains the suggested change.
   - In https://github.com/apache/spark/pull/37525 I showed that 
`AliasAwareOutputPartitioning` is currently incorrect. When it builds the 
`outputPartitioning` it only takes the last alias into account. E.g. when we 
have a `SELECT a AS a1, a AS a2 ...` node then its `outputPartitioning` 
containes the child's partitioning where `a` is replaced to `a2` only. But in 
this case the correct partitioning should be a `PartitioningCollection` where 
the child's partitioning expression is duplicated and `a` is replaced to `a1` 
and `a2` respectively.
   - In https://github.com/apache/spark/pull/38038 we have a very similar 
issue. When we build `BroadcastHashJoinExec.outputPartitioning` the 
`expandOutputPartitioning()` method takes the mapping from stream to build side 
keys and calcualtes the final `PartitioningCollection`. This PR is only a 
refactor to show that the old `generateExprCombinations` is simplified to a 
`multiTransform` call.
   - In https://github.com/apache/spark/pull/38038 I use `muliTransform` to 
speed up the current constraint generation logic. The problem with the current 
code is that it takes the aliases one by one and applies the mapping on all the 
current elements of `allConstraints` buffer 
(https://github.com/apache/spark/pull/38035/files#diff-fbc6da30b8372e4f9aeb35ccf0d39eb796715d192c7eaeab109376584de0790eL187-L192).
 Consider we have project node with `n` attribute aliases (`a1 AS a1a, a2 AS 
a2a, ..., an AS ana`) and a child constraint as `a1 = 1 || a2 = 2 || ... || an 
= n` then we end up with 2^n elements in `allConstraints` buffer (`a1 = 1 || a2 
= 2 || ... || an = n`, `a1a = 1 || a2 = 2 || ... || an = n`, `a1 = 1 || a2a = 2 
|| ... || an = n`, `a1a = 1 || a2a = 2 || ... || an = n`, ...) and later on 
2^10 - 1 of these are simply dropped as the attributes are not in output set of 
the project node.
   The new `multiTransform` generates the only valid constraint: `a1a = 1 || 
a2a = 2 || ... || ana = n`.
   
   > It seems that we can easily run into exponential alternatives?
   
   That's correct but that's why 'multiTransform` returns a lazy `Stream`. To 
let the caller decide how many results they need and don't calculate the 
alternatives in advance.
   In the fist example the number of elements in the partitioning collection is 
limited by a new `spark.sql.aliasAwareOutputPartitioning.expandLimit` config. 
The 2nd example uses the already introduded 
`spark.sql.execution.broadcastHashJoin.outputPartitioningExpandLimit` config. 
The 3rd example doesn't limit the number of constraints as the original code 
doesn't do that. (But we could easily introduce a finer grained limit instead 
of just beeing able to enable/disable 
`spark.sql.constraintPropagation.enabled`.)
   
   > I assume you eventually will only choose one from those alternatives?
   No, the main pont in the above examples is that we need a set of results.
   
   But, this new method also gives the oportunitly to limit the number of 
alternatives returned by time. E.g. we could spend a certain max number of 
seconds generating and evaulating alternative trees to find the best one...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to