erikerlandson commented on issue #25024: [SPARK-27296][SQL] User Defined 
Aggregators that do not ser/de on each input row
URL: https://github.com/apache/spark/pull/25024#issuecomment-545039511
 
 
   @rdblue a thing we could do to improve the usage of implicit `Encoder`, 
without altering the `Aggregator` class signature, would be a companion object:
   ```scala
   object Aggregator {
     def apply[IN, BUF: Encoder, OUT: Encoder](
         z: () => BUF,
         r: (BUF, IN) => BUF,
         m: (BUF, BUF) => BUF,
         f: BUF => OUT): Aggregator[IN, BUF, OUT] =
       new Aggregator[IN, BUF, OUT] {
         def zero: BUF = z()
         def reduce(b: BUF, a: IN): BUF = r(b, a)
         def merge(b1: BUF, b2: BUF): BUF = m(b1, b2)
         def finish(reduction: BUF): OUT = f(reduction)
         val bufferEncoder: Encoder[BUF] = implicitly[Encoder[BUF]]
         val outputEncoder: Encoder[OUT] = implicitly[Encoder[OUT]]
       }
   }
   ```
   
   Out of the box, that will pick up the `Encoder` generators defined in 
`org.apache.spark.sql.SQLImplicits`
   ```scala
   scala> val agg = Aggregator(()=>0, (s: Int, x: Int)=>s + x, (s1: Int, s2: 
Int)=>s1+s2, (s: Int) => s)
   agg: org.apache.spark.sql.expressions.Aggregator[Int,Int,Int] = 
org.apache.spark.sql.expressions.Aggregator$$anon$1@5c65001c
   
   scala> val data = sc.parallelize(Vector.fill(50000){(nextInt(2), 
nextGaussian, nextGaussian.toFloat)}, 5).toDF("cat", "x1", "x2")
   data: org.apache.spark.sql.DataFrame = [cat: int, x1: double ... 1 more 
field]
   
   scala> import org.apache.spark.sql.expressions.UserDefinedAggregator
   import org.apache.spark.sql.expressions.UserDefinedAggregator
   
   scala> val uda = UserDefinedAggregator(agg)
   uda: org.apache.spark.sql.expressions.UserDefinedAggregator[Int,Int,Int] = 
UserDefinedAggregator(org.apache.spark.sql.expressions.Aggregator$$anon$1@5c65001c,None,true,true)
   
   scala> data.agg(uda($"cat"))
   res0: org.apache.spark.sql.DataFrame = [anon$1(cat): int]
   
   scala> res0.first
   res1: org.apache.spark.sql.Row = [25002]                                     
   
   ```
   
   This happens to not work for my `TDigest` class because it breaks the 
`implicit def newProductArrayEncoder[A <: Product]` generator. I'd also propose 
modernizing the encoder implicits by putting them into the `Encoder` companion 
object, and doing atomic encoders only, with a low-priority fallback to 
`Encoders.kryo` for everything else:
   ```scala
   trait EncoderFallbacks {
     // anything non-atomic just punt to kryo
     implicit def fallbackEncoder[T: ClassTag]: Encoder[T] =
       Encoders.kryo[T]
   }
   
   object Encoder extends EncoderFallbacks {
     // all the atomic encoders here
     implicit val doubleEncoder = Encoders.scalaDouble
   }
   ```
   All of this is outside the scope of this PR but might be done for the 3.0 
release if there is interest

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to