peter-toth opened a new pull request, #40016:
URL: https://github.com/apache/spark/pull/40016
### What changes were proposed in this pull request?
This PR improves `TreeNode.multiTransform()` to generate the alternative
sequences only if needed and fully dynamically. Consider the following
simplified example:
```
(a + b).multiTransform {
case a => Stream(1, 2)
case b => Stream(10, 20)
}
```
the result is the cartesian product: `Stream(1 + 10, 2 + 10, 1 + 20, 2 +
20)`.
Currently `multiTransform` calculates the 2 alternative streams for `a` and
`b` **before** start building building the cartesian product stream using `+`.
So kind of caches the "inner" `Stream(1, 2)` in the beginning and when the
"outer" stream (`Stream(10, 20)`) iterates from `10` to `20` reuses the cache.
Although this caching is sometimes useful it has 2 drawbacks:
- If the "outer" (`b` alternatives) stream returns `Seq.emtpy` (to indicate
pruning) the alternatives for the `a` are unecessary calculated and will be
discarded.
- The "inner" stream transformation can't depend on the current "outer"
stream alternative.
E.g. let's see the above `a + b` example but we want to transform both
`a` and `b` to `1` and `2`, and we want to have only those alternatives where
these 2 are transformed equal (`Stream(1 + 1, 2 + 2)`). This is currently it is
not possible with a single `multiTransform` call due to the inner stream
alternatives are calculated in advance and cached.
But, if `multiTransform` would be dynamic and the "inner" alternatives
stream would be recalculated when the "outer" alternatives stream iterates then
this would be possible:
```
// Cache
var a_or_b = None
(a + b).multiTransform {
case a | b =>
// Return alternatives from cache if this is not the first encounter
a_or_b.getOrElse(
// Besides returning the alternatives for the first encounter, also
set up a mechanism to
// update the cache when the new alternatives are requested.
Stream(Literal(1), Literal(2)).map { x =>
a_or_b = Some(Seq(x))
x
}.append {
a_or_b = None
Seq.empty
})
}
```
Please note:
- that this is a simplified example and we could have run 2 simple
`transforms` to get the exprected 2 expressions, but `multiTransform` can do
other orthogonal transformations in the same run (e.g. `c` -> `Seq(100, 200)`)
and `multiTransform` also has the advantage of returning the results lazlily as
a stream.
- the original behaviour of caching "inner" alternative streams is still
doable and actually our current usecases in `AliasAwareOutputExpression` and in
`BroadcastHashJoinExec` still do it as they store the alternatives in advance
in maps and the `multiTransform` call just gets the alternatives from those
maps when needed.
### Why are the changes needed?
Improvement to make `multiTransform` more versatile.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Added new UTs.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]