Hi all, I am trying to use my custom Aggregator on a GroupedDataset of case classes to create a hash map using Spark SQL 1.6.2. My Encoder[Map[Int, String]] is not capable to reconstruct the reduced values if I define it via ExpressionEncoder(). However, everything works fine if I define it as Encoders.kryo[Map[Int, String]]. I would like to know if I am doing anything wrong.
I have the following use case: implicit val intStringMapEncoder: Encoder[Map[Int, String]] = ExpressionEncoder() val sparkContext = ... val sparkSqlContext = new SQLContext(sparkContext) import sparkSqlContext.implicits._ case class StopPoint(line: String, sequenceNumber: Int, id: String) val stopPointDS = Seq(StopPoint("33", 1, "1"), StopPoint("33", 2, "2")).toDS() val stopPointSequenceMap = new Aggregator[StopPoint, Map[Int, String], Map[Int, String]] { override def zero = Map[Int, String]() override def reduce(map: Map[Int, String], stopPoint: StopPoint) = { map.updated(stopPoint.sequenceNumber, stopPoint.id) } override def merge(map: Map[Int, String], anotherMap: Map[Int, String]) = { map ++ anotherMap } override def finish(reduction: Map[Int, String]) = reduction }.toColumn val resultMap = stopPointDS .groupBy(_.line) .agg(stopPointSequenceMap) .collect() .toMap In spark.sql.execution.TypedAggregateExpression.scala, I see that each entry is inserted into the initial map correctly (i.e. reduce() method works properly). However, my encoder cannot reconstruct the map from the reduce phase in the merge phase and I get an empty Map as a result of the merge method. If I replace my expression-based encoder with org.apache.spark.sql.Encoders.kryo[Map[Int, String]], I will get the correct result. (33, Map(1 -> 1, 2 -> 2)) Any ideas/suggestions are more than welcome. Sincerely, Anton Okolnychyi