Re: question about the new Dataset API

2016-10-19 Thread Yang
I even added a fake groupByKey on the entire DataSet:


scala> a_ds.groupByKey(k=>1).agg(typed.count[(Long,Long)](_._1)).show
+-++
|value|TypedCount(scala.Tuple2)|
+-++
|1|   2|
+-++




On Tue, Oct 18, 2016 at 11:30 PM, Yang  wrote:

> scala> val a = sc.parallelize(Array((1,2),(3,4)))
> a: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[243] at
> parallelize at :38
>
> scala> val a_ds = hc.di.createDataFrame(a).as[(Long,Long)]
> a_ds: org.apache.spark.sql.Dataset[(Long, Long)] = [_1: int, _2: int]
>
> scala> a_ds.agg(typed.count[(Long,Long)](x=>x._1))
> res34: org.apache.spark.sql.DataFrame = [TypedCount(org.apache.spark.sql.Row):
> bigint]
>
> scala> res34.show
>
> then it gave me the following error:
>
> Caused by: java.lang.ClassCastException: org.apache.spark.sql.catalyst.
> expressions.GenericRowWithSchema cannot be cast to scala.Tuple2
>
> at $anonfun$1.apply(:46)at 
> org.apache.spark.sql.execution.aggregate.TypedCount.reduce(typedaggregators.scala:69)at
>  
> org.apache.spark.sql.execution.aggregate.TypedCount.reduce(typedaggregators.scala:66)at
>  
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown
>  Source)at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)at
>  
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)at
>  scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)at 
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:126)at
>  org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)at 
> org.apache.spark.scheduler.Task.run(Task.scala:86)at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)at
>  
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)at
>  java.lang.Thread.run(Thread.java:745)
>
>
> I had to add a groupByKey()
> scala> a_ds.groupByKey(k=>k._1).agg(typed.count[(Long,Long)](_._1)).show
> +-++
> |value|TypedCount(scala.Tuple2)|
> +-++
> |1|   1|
> |3|   1|
> +-++
>
> but why does the groupByKey() make it any different? looks like a bug
>
>
>


question about the new Dataset API

2016-10-19 Thread Yang
scala> val a = sc.parallelize(Array((1,2),(3,4)))
a: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[243] at
parallelize at :38

scala> val a_ds = hc.di.createDataFrame(a).as[(Long,Long)]
a_ds: org.apache.spark.sql.Dataset[(Long, Long)] = [_1: int, _2: int]

scala> a_ds.agg(typed.count[(Long,Long)](x=>x._1))
res34: org.apache.spark.sql.DataFrame =
[TypedCount(org.apache.spark.sql.Row): bigint]

scala> res34.show

then it gave me the following error:

Caused by: java.lang.ClassCastException:
org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be
cast to scala.Tuple2

at $anonfun$1.apply(:46)at
org.apache.spark.sql.execution.aggregate.TypedCount.reduce(typedaggregators.scala:69)at
org.apache.spark.sql.execution.aggregate.TypedCount.reduce(typedaggregators.scala:66)at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown
Source)at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
Source)at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)at
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)at
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:126)at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)at
org.apache.spark.scheduler.Task.run(Task.scala:86)at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)at
java.lang.Thread.run(Thread.java:745)


I had to add a groupByKey()
scala> a_ds.groupByKey(k=>k._1).agg(typed.count[(Long,Long)](_._1)).show
+-++
|value|TypedCount(scala.Tuple2)|
+-++
|1|   1|
|3|   1|
+-++

but why does the groupByKey() make it any different? looks like a bug