Hi, Quickly looking around the attached, I found you wrongly passed the dataType of your aggregator output in line70. So, you need to at lease return `MapType` instead of `StructType`. The stacktrace you showed explicitly say this type unmatch.
// maropu On Thu, Jan 26, 2017 at 12:07 PM, Ankur Srivastava < ankur.srivast...@gmail.com> wrote: > Hi, > > I have a dataset with tuple of ID and Timestamp. I want to do a group by > on ID and then create a map with frequency per hour for the ID. > > Input: > 1| 20160106061005 > 1| 20160106061515 > 1| 20160106064010 > 1| 20160106050402 > 1| 20160106040101 > 2| 20160106040101 > 3| 20160106051451 > > Expected Output: > 1|{2016010604 <(201)%20601-0604>:1, 2016010605 <(201)%20601-0605>:1, > 2016010606 <(201)%20601-0606>:3} > 2|{2016010604 <(201)%20601-0604>:1} > 3|{2016010605 <(201)%20601-0605>:1} > > As I could not find a function in org.apache.spark.sql.functions library > that can do this aggregation I wrote a UDAF but when I execute it, it > throws below exception. > > I am using Dataset API from Spark 2.0 and am using Java library. Also > attached is the code with the test data. > > scala.MatchError: {2016010606 <(201)%20601-0606>=1} (of class > scala.collection.convert.Wrappers$MapWrapper) > at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter. > toCatalystImpl(CatalystTypeConverters.scala:256) > at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter. > toCatalystImpl(CatalystTypeConverters.scala:251) > at org.apache.spark.sql.catalyst.CatalystTypeConverters$ > CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103) > at org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$ > createToCatalystConverter$2.apply(CatalystTypeConverters.scala:403) > at org.apache.spark.sql.execution.aggregate.ScalaUDAF.eval(udaf.scala:440) > at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$ > generateResultProjection$1.apply(AggregationIterator.scala:228) > at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$ > generateResultProjection$1.apply(AggregationIterator.scala:220) > at org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator. > next(SortBasedAggregationIterator.scala:152) > at org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator. > next(SortBasedAggregationIterator.scala:29) > at org.apache.spark.sql.execution.SparkPlan$$anonfun$ > 4.apply(SparkPlan.scala:247) > at org.apache.spark.sql.execution.SparkPlan$$anonfun$ > 4.apply(SparkPlan.scala:240) > at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$ > 1$$anonfun$apply$24.apply(RDD.scala:784) > at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$ > 1$$anonfun$apply$24.apply(RDD.scala:784) > at org.apache.spark.rdd.MapPartitionsRDD.compute( > MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) > at org.apache.spark.scheduler.Task.run(Task.scala:85) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) > at java.util.concurrent.ThreadPoolExecutor.runWorker( > ThreadPoolExecutor.java:1142) > at java.util.concurrent.ThreadPoolExecutor$Worker.run( > ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > 17/01/25 18:20:58 INFO Executor: Executor is trying to kill task 29.0 in > stage 7.0 (TID 398) > 17/01/25 18:20:58 INFO DAGScheduler: ResultStage 7 (show at > EdgeAggregator.java:29) failed in 0.699 s > 17/01/25 18:20:58 INFO DAGScheduler: Job 3 failed: show at > EdgeAggregator.java:29, took 0.712912 s > In merge hr: 2016010606 <(201)%20601-0606> > 17/01/25 18:20:58 WARN TaskSetManager: Lost task 29.0 in stage 7.0 (TID > 398, localhost): scala.MatchError: {2016010606=1} (of class > scala.collection.convert.Wrappers$MapWrapper) > at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter. > toCatalystImpl(CatalystTypeConverters.scala:256) > at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter. > toCatalystImpl(CatalystTypeConverters.scala:251) > at org.apache.spark.sql.catalyst.CatalystTypeConverters$ > CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103) > at org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$ > createToCatalystConverter$2.apply(CatalystTypeConverters.scala:403) > at org.apache.spark.sql.execution.aggregate.ScalaUDAF.eval(udaf.scala:440) > at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$ > generateResultProjection$1.apply(AggregationIterator.scala:228) > at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$ > generateResultProjection$1.apply(AggregationIterator.scala:220) > at org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator. > next(SortBasedAggregationIterator.scala:152) > at org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator. > next(SortBasedAggregationIterator.scala:29) > at org.apache.spark.sql.execution.SparkPlan$$anonfun$ > 4.apply(SparkPlan.scala:247) > at org.apache.spark.sql.execution.SparkPlan$$anonfun$ > 4.apply(SparkPlan.scala:240) > at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$ > 1$$anonfun$apply$24.apply(RDD.scala:784) > at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$ > 1$$anonfun$apply$24.apply(RDD.scala:784) > at org.apache.spark.rdd.MapPartitionsRDD.compute( > MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) > at org.apache.spark.scheduler.Task.run(Task.scala:85) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) > at java.util.concurrent.ThreadPoolExecutor.runWorker( > ThreadPoolExecutor.java:1142) > at java.util.concurrent.ThreadPoolExecutor$Worker.run( > ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > > > Attached is the code that you can use to reproduce the error. > > Thanks > Ankur > > > --------------------------------------------------------------------- > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > -- --- Takeshi Yamamuro