peter-toth commented on PR #38034: URL: https://github.com/apache/spark/pull/38034#issuecomment-1367218779
@sigmod, > do you have more concrete examples in mind for multiTransform? I gave 3 concrete examples where `multiTranform` could help. All of these PRs contain 2 commits. The first commit is equal to this PR, the second contains the suggested change. - In https://github.com/apache/spark/pull/37525 I showed that `AliasAwareOutputPartitioning` is currently incorrect. When it builds the `outputPartitioning` it only takes the last alias into account. E.g. when we have a `SELECT a AS a1, a AS a2 ...` node then its `outputPartitioning` containes the child's partitioning where `a` is replaced to `a2` only. But in this case the correct partitioning should be a `PartitioningCollection` where the child's partitioning expression is duplicated and `a` is replaced to `a1` and `a2` respectively. - In https://github.com/apache/spark/pull/38038 we have a very similar issue. When we build `BroadcastHashJoinExec.outputPartitioning` the `expandOutputPartitioning()` method takes the mapping from stream to build side keys and calcualtes the final `PartitioningCollection`. This PR is only a refactor to show that the old `generateExprCombinations` is simplified to a `multiTransform` call. - In https://github.com/apache/spark/pull/38038 I use `muliTransform` to speed up the current constraint generation logic. The problem with the current code is that it takes the aliases one by one and applies the mapping on all the current elements of `allConstraints` buffer (https://github.com/apache/spark/pull/38035/files#diff-fbc6da30b8372e4f9aeb35ccf0d39eb796715d192c7eaeab109376584de0790eL187-L192). Consider we have project node with `n` attribute aliases (`a1 AS a1a, a2 AS a2a, ..., an AS ana`) and a child constraint as `a1 = 1 || a2 = 2 || ... || an = n` then we end up with 2^n elements in `allConstraints` buffer (`a1 = 1 || a2 = 2 || ... || an = n`, `a1a = 1 || a2 = 2 || ... || an = n`, `a1 = 1 || a2a = 2 || ... || an = n`, `a1a = 1 || a2a = 2 || ... || an = n`, ...) and later on 2^10 - 1 of these are simply dropped as the attributes are not in output set of the project node. The new `multiTransform` generates the only valid constraint: `a1a = 1 || a2a = 2 || ... || ana = n`. > It seems that we can easily run into exponential alternatives? That's correct but that's why 'multiTransform` returns a lazy `Stream`. To let the caller decide how many results they need and don't calculate the alternatives in advance. In the fist example the number of elements in the partitioning collection is limited by a new `spark.sql.aliasAwareOutputPartitioning.expandLimit` config. The 2nd example uses the already introduded `spark.sql.execution.broadcastHashJoin.outputPartitioningExpandLimit` config. The 3rd example doesn't limit the number of constraints as the original code doesn't do that. (But we could easily introduce a finer grained limit instead of just beeing able to enable/disable `spark.sql.constraintPropagation.enabled`.) > I assume you eventually will only choose one from those alternatives? No, the main pont in the above examples is that we need a set of results. But, this new method also gives the oportunitly to limit the number of alternatives returned by time. E.g. we could spend a certain max number of seconds generating and evaulating alternative trees to find the best one... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
