I've downloaded a nightly build of Spark 2.0 (from today 4/23) and was attempting to create an aggregator that will create a Seq[Rows], or specifically a Seq[Class1], my custom class.
When I attempt to run the following code in a spark-shell, it errors out: Gist: https://gist.github.com/dondrake/be6b92aff71433e9fb627b478b78b839 Code: import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.expressions.Aggregator import org.apache.spark.sql.{Encoder,Row} import org.apache.spark.sql.functions._ import java.util.Calendar case class C1(f1: String, f2: String, f3: String, f4:java.sql.Date, f5: Double) val teams = sc.parallelize(Seq( C1("hash1", "NLC", "Cubs", java.sql.Date.valueOf("2016-01-23"), 3253.21), C1("hash1", "NLC", "Cubs", java.sql.Date.valueOf("2014-01-23"), 353.88), C1("hash3", "NLW", "Dodgers", java.sql.Date.valueOf("2013-08-15"), 4322.12), C1("hash4", "NLE", "Red Sox", java.sql.Date.valueOf("2010-03-14"), 10283.72) )).toDS // https://docs.cloud.databricks.com/docs/spark/1.6/examples/Dataset%20Aggregator.html object C1Agg extends Aggregator[C1, Seq[C1], Seq[C1]] { def zero: Seq[C1] = null def reduce(b: Seq[C1], a: C1): Seq[C1] = b :+ a def merge(b1: Seq[C1], b2: Seq[C1]): Seq[C1] = b1 ++ b2 def finish(r: Seq[C1]): Seq[C1] = r override def bufferEncoder: Encoder[Seq[C1]] = ExpressionEncoder() override def outputEncoder: Encoder[Seq[C1]] = ExpressionEncoder() } val g_c1 = teams.select(C1Agg.toColumn) scala> val g_c1 = teams.select(C1Agg.toColumn) scala.ScalaReflectionException: object $line37.$read not found. at scala.reflect.internal.Mirrors$RootsBase.staticModule(Mirrors.scala:162) at scala.reflect.internal.Mirrors$RootsBase.staticModule(Mirrors.scala:22) at $typecreator1$1.apply(<console>:45) at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe$lzycompute(TypeTags.scala:232) at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe(TypeTags.scala:232) at org.apache.spark.sql.SQLImplicits$$typecreator9$1.apply(SQLImplicits.scala:125) at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe$lzycompute(TypeTags.scala:232) at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe(TypeTags.scala:232) at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:50) at org.apache.spark.sql.SQLImplicits.newProductSeqEncoder(SQLImplicits.scala:125) ... 52 elided If I tweak my teams to be a DataFrame instead of a DataSet, and leave everything else the same, I get a different error: scala> val g_c1 = teams.select(C1Agg.toColumn) org.apache.spark.sql.AnalysisException: unresolved operator 'Aggregate [(C1Agg(unknown),mode=Complete,isDistinct=false) AS c1agg(staticinvoke(class scala.collection.mutable.WrappedArray$, ObjectType(interface scala.collection.Seq), make, mapobjects(lambdavariable(MapObjects_loopValue26, MapObjects_loopIsNull27, StructField(f1,StringType,true), StructField(f2,StringType,true), StructField(f3,StringType,true), StructField(f4,DateType,true), StructField(f5,DoubleType,false)), if (isnull(lambdavariable(MapObjects_loopValue26, MapObjects_loopIsNull27, StructField(f1,StringType,true), StructField(f2,StringType,true), StructField(f3,StringType,true), StructField(f4,DateType,true), StructField(f5,DoubleType,false)))) null else newInstance(class C1), upcast(value, ArrayType(StructType(StructField(f1,StringType,true), StructField(f2,StringType,true), StructField(f3,StringType,true), StructField(f4,DateType,true), StructField(f5,DoubleType,false)),true), - root class: "scala.collection.Seq")).array, true))#63]; at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:39) at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:54) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:289) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:51) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:125) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:51) at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:54) at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:47) at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:61) at org.apache.spark.sql.Dataset.org $apache$spark$sql$Dataset$$withPlan(Dataset.scala:2443) at org.apache.spark.sql.Dataset.select(Dataset.scala:935) ... 52 elided I'm not sure how to diagnose those errors. Thoughts? -Don -- Donald Drake Drake Consulting http://www.drakeconsulting.com/ https://twitter.com/dondrake <http://www.MailLaunder.com/> 800-733-2143