peter-toth opened a new pull request, #40016:
URL: https://github.com/apache/spark/pull/40016

   ### What changes were proposed in this pull request?
   
   This PR improves `TreeNode.multiTransform()` to generate the alternative 
sequences only if needed and fully dynamically. Consider the following 
simplified example:
   ```
   (a + b).multiTransform {
     case a => Stream(1, 2)
     case b => Stream(10, 20)
   }
   ```
   the result is the cartesian product: `Stream(1 + 10, 2 + 10, 1 + 20, 2 + 
20)`.
   Currently `multiTransform` calculates the 2 alternative streams for `a` and 
`b` **before** start building building the cartesian product stream using `+`. 
So kind of caches the "inner" `Stream(1, 2)` in the beginning and when the 
"outer" stream (`Stream(10, 20)`) iterates from `10` to `20` reuses the cache. 
Although this caching is sometimes useful it has 2 drawbacks:
   - If the "outer" (`b` alternatives) stream returns `Seq.emtpy` (to indicate 
pruning) the alternatives for the `a` are unecessary calculated and will be 
discarded.
   - The "inner" stream transformation can't depend on the current "outer" 
stream alternative.
      E.g. let's see the above `a + b` example but we want to transform both 
`a` and `b` to `1` and `2`, and we want to have only those alternatives where 
these 2 are transformed equal (`Stream(1 + 1, 2 + 2)`). This is currently it is 
not possible with a single `multiTransform` call due to the inner stream 
alternatives are calculated in advance and cached. 
   But, if `multiTransform` would be dynamic and the "inner" alternatives 
stream would be recalculated when the "outer" alternatives stream iterates then 
this would be possible:
     ```
     // Cache
     var a_or_b = None
     (a + b).multiTransform {
       case a | b =>
         // Return alternatives from cache if this is not the first encounter
         a_or_b.getOrElse(
           // Besides returning the alternatives for the first encounter, also 
set up a mechanism to
           // update the cache when the new alternatives are requested.
           Stream(Literal(1), Literal(2)).map { x =>
             a_or_b = Some(Seq(x))
             x
           }.append {
             a_or_b = None
             Seq.empty
           })
     }
     ```
   Please note:
   - that this is a simplified example and we could have run 2 simple 
`transforms` to get the exprected 2 expressions, but `multiTransform` can do 
other orthogonal transformations in the same run (e.g. `c` -> `Seq(100, 200)`) 
and `multiTransform` also has the advantage of returning the results lazlily as 
a stream.
   - the original behaviour of caching "inner" alternative streams is still 
doable and actually our current usecases in `AliasAwareOutputExpression` and in 
`BroadcastHashJoinExec` still do it as they store the alternatives in advance 
in maps and the `multiTransform` call just gets the alternatives from those 
maps when needed. 
   
   ### Why are the changes needed?
   Improvement to make `multiTransform` more versatile.
   
   ### Does this PR introduce _any_ user-facing change?
   No.
   
   ### How was this patch tested?
   Added new UTs.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to