I even added a fake groupByKey on the entire DataSet:
scala> a_ds.groupByKey(k=>1).agg(typed.count[(Long,Long)](_._1)).show +-----+------------------------+ |value|TypedCount(scala.Tuple2)| +-----+------------------------+ | 1| 2| +-----+------------------------+ On Tue, Oct 18, 2016 at 11:30 PM, Yang <teddyyyy...@gmail.com> wrote: > scala> val a = sc.parallelize(Array((1,2),(3,4))) > a: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[243] at > parallelize at <console>:38 > > scala> val a_ds = hc.di.createDataFrame(a).as[(Long,Long)] > a_ds: org.apache.spark.sql.Dataset[(Long, Long)] = [_1: int, _2: int] > > scala> a_ds.agg(typed.count[(Long,Long)](x=>x._1)) > res34: org.apache.spark.sql.DataFrame = [TypedCount(org.apache.spark.sql.Row): > bigint] > > scala> res34.show > > then it gave me the following error: > > Caused by: java.lang.ClassCastException: org.apache.spark.sql.catalyst. > expressions.GenericRowWithSchema cannot be cast to scala.Tuple2 > > at $anonfun$1.apply(<console>:46)at > org.apache.spark.sql.execution.aggregate.TypedCount.reduce(typedaggregators.scala:69)at > > org.apache.spark.sql.execution.aggregate.TypedCount.reduce(typedaggregators.scala:66)at > > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown > Source)at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown > Source)at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)at > > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)at > scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)at > org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:126)at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)at > org.apache.spark.scheduler.Task.run(Task.scala:86)at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)at > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)at > java.lang.Thread.run(Thread.java:745) > > > I had to add a groupByKey() > scala> a_ds.groupByKey(k=>k._1).agg(typed.count[(Long,Long)](_._1)).show > +-----+------------------------+ > |value|TypedCount(scala.Tuple2)| > +-----+------------------------+ > | 1| 1| > | 3| 1| > +-----+------------------------+ > > but why does the groupByKey() make it any different? looks like a bug > > >