cloud-fan commented on issue #25024: [SPARK-27296][SQL] User Defined 
Aggregators that do not ser/de on each input row
URL: https://github.com/apache/spark/pull/25024#issuecomment-537768473
 
 
   What I want to see is a single UDAF API that works for all the use cases, if 
it's possible. It's really confusing to end-users if there are a lof of UDAF 
APIs in Spark and they don't know which one to use.
   
   Since ser/de is inevitable (`InternalRow` <-> `Row`), it's always preferred 
to do the ser/de only on partition boundaries. AFAIK other requirements are:
   1. can operate on multiple columns
   2. can be deterministic or not
   3. can cast the input columns to desired types
   4. can be registered as SQL function
   5. can operate on Dataset
   6. ... (please add more if I missed something)
   
   I think `Aggregator[IN, BUF, OUT]` is good enough to specify what the 
aggregating logic is. What we need to add is:
   1. an API to specify the input columns
   2. an API to specify the determinism
   2. a mechanism to cast input columns
   3. an API to register Aggregator as SQL function
   
   My proposal is:
   1. add a new method in `UDFRegistration`
   ```
   def register[IN: TypeTag, BUF: TypeTag, OUT: TypeTag](
       name: String, func: Aggregator[IN, BUF, OUT]): UserDefinedFunction
   ```
   2. add a new implementation of `UserDefinedFunction`
   ```
   class AggregatorAsFunction[IN: TypeTag, BUF: TypeTag, OUT: TypeTag](
       aggregator: Aggregator[IN, BUF, OUT]) extends UserDefinedFunction {
     def apply(exprs: Column*): Column = {
       // create a special expression which is smilar to `ScalaUDAF`. It 
projects the input row
       // according to the given `exprs`, and convert the projected internal 
row to `IN` type object
       // via the encoder, and feed the `IN` type object to the `Aggregator`. 
The encoder can cast
       // the input columns to desired types, please see `ScalaReflection`.
     }
   
     // this simply sets the nondeterministic flag of `AggregatorAsFunction`, 
which will be passed to
     // the new expression metioned in `apply`
     def asNondeterministic ...
   }
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to