erikerlandson commented on issue #25024: [SPARK-27296][SQL] User Defined Aggregators that do not ser/de on each input row URL: https://github.com/apache/spark/pull/25024#issuecomment-545039511 @rdblue a thing we could do to improve the usage of implicit `Encoder`, without altering the `Aggregator` class signature, would be a companion object: ```scala object Aggregator { def apply[IN, BUF: Encoder, OUT: Encoder]( z: () => BUF, r: (BUF, IN) => BUF, m: (BUF, BUF) => BUF, f: BUF => OUT): Aggregator[IN, BUF, OUT] = new Aggregator[IN, BUF, OUT] { def zero: BUF = z() def reduce(b: BUF, a: IN): BUF = r(b, a) def merge(b1: BUF, b2: BUF): BUF = m(b1, b2) def finish(reduction: BUF): OUT = f(reduction) val bufferEncoder: Encoder[BUF] = implicitly[Encoder[BUF]] val outputEncoder: Encoder[OUT] = implicitly[Encoder[OUT]] } } ``` Out of the box, that will pick up the `Encoder` generators defined in `org.apache.spark.sql.SQLImplicits` ```scala scala> val agg = Aggregator(()=>0, (s: Int, x: Int)=>s + x, (s1: Int, s2: Int)=>s1+s2, (s: Int) => s) agg: org.apache.spark.sql.expressions.Aggregator[Int,Int,Int] = org.apache.spark.sql.expressions.Aggregator$$anon$1@5c65001c scala> val data = sc.parallelize(Vector.fill(50000){(nextInt(2), nextGaussian, nextGaussian.toFloat)}, 5).toDF("cat", "x1", "x2") data: org.apache.spark.sql.DataFrame = [cat: int, x1: double ... 1 more field] scala> import org.apache.spark.sql.expressions.UserDefinedAggregator import org.apache.spark.sql.expressions.UserDefinedAggregator scala> val uda = UserDefinedAggregator(agg) uda: org.apache.spark.sql.expressions.UserDefinedAggregator[Int,Int,Int] = UserDefinedAggregator(org.apache.spark.sql.expressions.Aggregator$$anon$1@5c65001c,None,true,true) scala> data.agg(uda($"cat")) res0: org.apache.spark.sql.DataFrame = [anon$1(cat): int] scala> res0.first res1: org.apache.spark.sql.Row = [25002] ``` This happens to not work for my `TDigest` class because it breaks the `implicit def newProductArrayEncoder[A <: Product]` generator. I'd also propose modernizing the encoder implicits by putting them into the `Encoder` companion object, and doing atomic encoders only, with a low-priority fallback to `Encoders.kryo` for everything else: ```scala trait EncoderFallbacks { // anything non-atomic just punt to kryo implicit def fallbackEncoder[T: ClassTag]: Encoder[T] = Encoders.kryo[T] } object Encoder extends EncoderFallbacks { // all the atomic encoders here implicit val doubleEncoder = Encoders.scalaDouble } ``` All of this is outside the scope of this PR but might be done for the 3.0 release if there is interest
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
