Hi,

Quickly looking around the attached, I found you wrongly passed the dataType
of your aggregator output in line70.
So, you need to at lease return `MapType` instead of `StructType`.
The stacktrace you showed explicitly say this type unmatch.

// maropu


On Thu, Jan 26, 2017 at 12:07 PM, Ankur Srivastava <
ankur.srivast...@gmail.com> wrote:

> Hi,
>
> I have a dataset with tuple of ID and Timestamp. I want to do a group by
> on ID and then create a map with frequency per hour for the ID.
>
> Input:
> 1| 20160106061005
> 1| 20160106061515
> 1| 20160106064010
> 1| 20160106050402
> 1| 20160106040101
> 2| 20160106040101
> 3| 20160106051451
>
> Expected Output:
> 1|{2016010604 <(201)%20601-0604>:1, 2016010605 <(201)%20601-0605>:1,
> 2016010606 <(201)%20601-0606>:3}
> 2|{2016010604 <(201)%20601-0604>:1}
> 3|{2016010605 <(201)%20601-0605>:1}
>
> As I could not find a function in org.apache.spark.sql.functions library
> that can do this aggregation I wrote a UDAF but when I execute it, it
> throws below exception.
>
> I am using Dataset API from Spark 2.0 and am using Java library. Also
> attached is the code with the test data.
>
> scala.MatchError: {2016010606 <(201)%20601-0606>=1} (of class
> scala.collection.convert.Wrappers$MapWrapper)
> at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.
> toCatalystImpl(CatalystTypeConverters.scala:256)
> at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.
> toCatalystImpl(CatalystTypeConverters.scala:251)
> at org.apache.spark.sql.catalyst.CatalystTypeConverters$
> CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103)
> at org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$
> createToCatalystConverter$2.apply(CatalystTypeConverters.scala:403)
> at org.apache.spark.sql.execution.aggregate.ScalaUDAF.eval(udaf.scala:440)
> at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$
> generateResultProjection$1.apply(AggregationIterator.scala:228)
> at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$
> generateResultProjection$1.apply(AggregationIterator.scala:220)
> at org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.
> next(SortBasedAggregationIterator.scala:152)
> at org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.
> next(SortBasedAggregationIterator.scala:29)
> at org.apache.spark.sql.execution.SparkPlan$$anonfun$
> 4.apply(SparkPlan.scala:247)
> at org.apache.spark.sql.execution.SparkPlan$$anonfun$
> 4.apply(SparkPlan.scala:240)
> at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$
> 1$$anonfun$apply$24.apply(RDD.scala:784)
> at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$
> 1$$anonfun$apply$24.apply(RDD.scala:784)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
> at org.apache.spark.scheduler.Task.run(Task.scala:85)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> 17/01/25 18:20:58 INFO Executor: Executor is trying to kill task 29.0 in
> stage 7.0 (TID 398)
> 17/01/25 18:20:58 INFO DAGScheduler: ResultStage 7 (show at
> EdgeAggregator.java:29) failed in 0.699 s
> 17/01/25 18:20:58 INFO DAGScheduler: Job 3 failed: show at
> EdgeAggregator.java:29, took 0.712912 s
> In merge hr: 2016010606 <(201)%20601-0606>
> 17/01/25 18:20:58 WARN TaskSetManager: Lost task 29.0 in stage 7.0 (TID
> 398, localhost): scala.MatchError: {2016010606=1} (of class
> scala.collection.convert.Wrappers$MapWrapper)
> at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.
> toCatalystImpl(CatalystTypeConverters.scala:256)
> at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.
> toCatalystImpl(CatalystTypeConverters.scala:251)
> at org.apache.spark.sql.catalyst.CatalystTypeConverters$
> CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103)
> at org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$
> createToCatalystConverter$2.apply(CatalystTypeConverters.scala:403)
> at org.apache.spark.sql.execution.aggregate.ScalaUDAF.eval(udaf.scala:440)
> at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$
> generateResultProjection$1.apply(AggregationIterator.scala:228)
> at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$
> generateResultProjection$1.apply(AggregationIterator.scala:220)
> at org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.
> next(SortBasedAggregationIterator.scala:152)
> at org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.
> next(SortBasedAggregationIterator.scala:29)
> at org.apache.spark.sql.execution.SparkPlan$$anonfun$
> 4.apply(SparkPlan.scala:247)
> at org.apache.spark.sql.execution.SparkPlan$$anonfun$
> 4.apply(SparkPlan.scala:240)
> at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$
> 1$$anonfun$apply$24.apply(RDD.scala:784)
> at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$
> 1$$anonfun$apply$24.apply(RDD.scala:784)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
> at org.apache.spark.scheduler.Task.run(Task.scala:85)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
>
> Attached is the code that you can use to reproduce the error.
>
> Thanks
> Ankur
>
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>



-- 
---
Takeshi Yamamuro

Reply via email to