koert kuipers created SPARK-15810: ------------------------------------- Summary: Aggregator doesn't play nice with Option Key: SPARK-15810 URL: https://issues.apache.org/jira/browse/SPARK-15810 Project: Spark Issue Type: Bug Components: SQL Environment: spark 2.0.0-SNAPSHOT Reporter: koert kuipers
{noformat} val ds1 = List(("a", 1), ("a", 2), ("a", 3)).toDS val df1 = ds1.map{ case (k, v) => (k, if (v > 1) Some(v) else None) }.toDF("k", "v") val df2 = df1.groupBy("k").agg(new Aggregator[(String, Option[Int]), Option[Int], Option[Int]]{ def zero: Option[Int] = None def reduce(b: Option[Int], a: (String, Option[Int])): Option[Int] = b.map(bv => a._2.map(av => bv + av).getOrElse(bv)).orElse(a._2) def merge(b1: Option[Int], b2: Option[Int]): Option[Int] = b1.map(b1v => b2.map(b2v => b1v + b2v).getOrElse(b1v)).orElse(b2) def finish(reduction: Option[Int]): Option[Int] = reduction def bufferEncoder: Encoder[Option[Int]] = implicitly[Encoder[Option[Int]]] def outputEncoder: Encoder[Option[Int]] = implicitly[Encoder[Option[Int]]] }.toColumn) df2.printSchema df2.show {noformat} i get as output a somewhat odd looking schema, and after that the program just hangs pinning one cpu at 100%. the data never shows. output: {noformat} root |-- k: string (nullable = true) |-- $anon$1(org.apache.spark.sql.Row): struct (nullable = true) | |-- value: integer (nullable = true) {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org