Hello Arthur, You can use do aggregations in SQL. How did you create LINEITEM?
Thanks, Yin On Thu, Oct 23, 2014 at 8:54 AM, arthur.hk.c...@gmail.com < arthur.hk.c...@gmail.com> wrote: > Hi, > > I got $TreeNodeException, few questions: > Q1) How should I do aggregation in SparK? Can I use aggregation directly > in SQL? or > Q1) Should I use SQL to load the data to form RDD then use scala to do the > aggregation? > > Regards > Arthur > > > MySQL (good one, without aggregation): > sqlContext.sql("SELECT L_RETURNFLAG FROM LINEITEM WHERE > L_SHIPDATE<='1998-09-02' GROUP BY L_RETURNFLAG, L_LINESTATUS ORDER BY > L_RETURNFLAG, L_LINESTATUS").collect().foreach(println); > [A] > [N] > [N] > [R] > > > My SQL (problem SQL, with aggregation): > sqlContext.sql("SELECT L_RETURNFLAG, L_LINESTATUS, SUM(L_QUANTITY) AS > SUM_QTY, SUM(L_EXTENDEDPRICE) AS SUM_BASE_PRICE, SUM(L_EXTENDEDPRICE * ( 1 > - L_DISCOUNT )) AS SUM_DISC_PRICE, SUM(L_EXTENDEDPRICE * ( 1 - L_DISCOUNT ) > * ( 1 + L_TAX )) AS SUM_CHARGE, AVG(L_QUANTITY) AS AVG_QTY, > AVG(L_EXTENDEDPRICE) AS AVG_PRICE, AVG(L_DISCOUNT) AS AVG_DISC, COUNT(*) AS > COUNT_ORDER FROM LINEITEM WHERE L_SHIPDATE<='1998-09-02' GROUP BY > L_RETURNFLAG, L_LINESTATUS ORDER BY L_RETURNFLAG, > L_LINESTATUS").collect().foreach(println); > > 14/10/23 20:38:31 INFO TaskSchedulerImpl: Removed TaskSet 18.0, whose > tasks have all completed, from pool > org.apache.spark.sql.catalyst.errors.package$TreeNodeException: sort, tree: > Sort [l_returnflag#200 ASC,l_linestatus#201 ASC], true > Exchange (RangePartitioning [l_returnflag#200 ASC,l_linestatus#201 ASC], > 200) > Aggregate false, [l_returnflag#200,l_linestatus#201], > [l_returnflag#200,l_linestatus#201,SUM(PartialSum#216) AS > sum_qty#181,SUM(PartialSum#217) AS sum_base_price#182,SUM(PartialSum#218) > AS sum_disc_price#183,SUM(PartialSum#219) AS > sum_charge#184,(CAST(SUM(PartialSum#220), DoubleType) / > CAST(SUM(PartialCount#221L), DoubleType)) AS > avg_qty#185,(CAST(SUM(PartialSum#222), DoubleType) / > CAST(SUM(PartialCount#223L), DoubleType)) AS > avg_price#186,(CAST(SUM(PartialSum#224), DoubleType) / > CAST(SUM(PartialCount#225L), DoubleType)) AS > avg_disc#187,SUM(PartialCount#226L) AS count_order#188L] > Exchange (HashPartitioning [l_returnflag#200,l_linestatus#201], 200) > Aggregate true, [l_returnflag#200,l_linestatus#201], > [l_returnflag#200,l_linestatus#201,COUNT(l_discount#195) AS > PartialCount#225L,SUM(l_discount#195) AS PartialSum#224,COUNT(1) AS > PartialCount#226L,SUM((l_extendedprice#194 * (1.0 - l_discount#195))) AS > PartialSum#218,SUM(l_extendedprice#194) AS > PartialSum#217,COUNT(l_quantity#193) AS > PartialCount#221L,SUM(l_quantity#193) AS > PartialSum#220,COUNT(l_extendedprice#194) AS > PartialCount#223L,SUM(l_extendedprice#194) AS > PartialSum#222,SUM(((l_extendedprice#194 * (1.0 - l_discount#195)) * (1.0 + > l_tax#196))) AS PartialSum#219,SUM(l_quantity#193) AS PartialSum#216] > Project > [l_extendedprice#194,l_linestatus#201,l_discount#195,l_returnflag#200,l_tax#196,l_quantity#193] > Filter (l_shipdate#197 <= 1998-09-02) > HiveTableScan > [l_shipdate#197,l_extendedprice#194,l_linestatus#201,l_discount#195,l_returnflag#200,l_tax#196,l_quantity#193], > (MetastoreRelation boc_12, lineitem, None), None > > at > org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:47) > at org.apache.spark.sql.execution.Sort.execute(basicOperators.scala:191) > at > org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:85) > at org.apache.spark.sql.SchemaRDD.collect(SchemaRDD.scala:438) > at $iwC$$iwC$$iwC$$iwC.<init>(<console>:15) > at $iwC$$iwC$$iwC.<init>(<console>:20) > at $iwC$$iwC.<init>(<console>:22) > at $iwC.<init>(<console>:24) > at <init>(<console>:26) > at .<init>(<console>:30) > at .<clinit>(<console>) > at .<init>(<console>:7) > at .<clinit>(<console>) > at $print(<console>) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:789) > at > org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1062) > at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:615) > at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:646) > at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:610) > at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:814) > at > org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:859) > at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:771) > at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:616) > at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:624) > at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:629) > at > org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:954) > at > org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902) > at > org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902) > at > scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) > at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:902) > at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:997) > at org.apache.spark.repl.Main$.main(Main.scala:31) > at org.apache.spark.repl.Main.main(Main.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328) > at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) > at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) > Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: > execute, tree: > Exchange (RangePartitioning [l_returnflag#200 ASC,l_linestatus#201 ASC], > 200) > Aggregate false, [l_returnflag#200,l_linestatus#201], > [l_returnflag#200,l_linestatus#201,SUM(PartialSum#216) AS > sum_qty#181,SUM(PartialSum#217) AS sum_base_price#182,SUM(PartialSum#218) > AS sum_disc_price#183,SUM(PartialSum#219) AS > sum_charge#184,(CAST(SUM(PartialSum#220), DoubleType) / > CAST(SUM(PartialCount#221L), DoubleType)) AS > avg_qty#185,(CAST(SUM(PartialSum#222), DoubleType) / > CAST(SUM(PartialCount#223L), DoubleType)) AS > avg_price#186,(CAST(SUM(PartialSum#224), DoubleType) / > CAST(SUM(PartialCount#225L), DoubleType)) AS > avg_disc#187,SUM(PartialCount#226L) AS count_order#188L] > Exchange (HashPartitioning [l_returnflag#200,l_linestatus#201], 200) > Aggregate true, [l_returnflag#200,l_linestatus#201], > [l_returnflag#200,l_linestatus#201,COUNT(l_discount#195) AS > PartialCount#225L,SUM(l_discount#195) AS PartialSum#224,COUNT(1) AS > PartialCount#226L,SUM((l_extendedprice#194 * (1.0 - l_discount#195))) AS > PartialSum#218,SUM(l_extendedprice#194) AS > PartialSum#217,COUNT(l_quantity#193) AS > PartialCount#221L,SUM(l_quantity#193) AS > PartialSum#220,COUNT(l_extendedprice#194) AS > PartialCount#223L,SUM(l_extendedprice#194) AS > PartialSum#222,SUM(((l_extendedprice#194 * (1.0 - l_discount#195)) * (1.0 + > l_tax#196))) AS PartialSum#219,SUM(l_quantity#193) AS PartialSum#216] > Project > [l_extendedprice#194,l_linestatus#201,l_discount#195,l_returnflag#200,l_tax#196,l_quantity#193] > Filter (l_shipdate#197 <= 1998-09-02) > HiveTableScan > [l_shipdate#197,l_extendedprice#194,l_linestatus#201,l_discount#195,l_returnflag#200,l_tax#196,l_quantity#193], > (MetastoreRelation boc_12, lineitem, None), None > > at > org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:47) > at org.apache.spark.sql.execution.Exchange.execute(Exchange.scala:44) > at > org.apache.spark.sql.execution.Sort$$anonfun$execute$3.apply(basicOperators.scala:192) > at > org.apache.spark.sql.execution.Sort$$anonfun$execute$3.apply(basicOperators.scala:193) > at > org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:46) > ... 43 more > Caused by: org.apache.spark.SparkException: Job aborted due to stage > failure: Task 22 in stage 18.0 failed 4 times, most recent failure: Lost > task 22.3 in stage 18.0 (TID 1488, m35): java.lang.ClassCastException: > java.lang.String cannot be cast to scala.math.BigDecimal > scala.math.Numeric$BigDecimalIsFractional$.plus(Numeric.scala:182) > > org.apache.spark.sql.catalyst.expressions.Add$$anonfun$eval$2.apply(arithmetic.scala:58) > > org.apache.spark.sql.catalyst.expressions.Add$$anonfun$eval$2.apply(arithmetic.scala:58) > > org.apache.spark.sql.catalyst.expressions.Expression.n2(Expression.scala:114) > > org.apache.spark.sql.catalyst.expressions.Add.eval(arithmetic.scala:58) > > org.apache.spark.sql.catalyst.expressions.MutableLiteral.update(literals.scala:69) > > org.apache.spark.sql.catalyst.expressions.SumFunction.update(aggregates.scala:433) > > org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$7.apply(Aggregate.scala:167) > > org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$7.apply(Aggregate.scala:151) > org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:596) > org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:596) > > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) > org.apache.spark.rdd.RDD.iterator(RDD.scala:229) > > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) > org.apache.spark.rdd.RDD.iterator(RDD.scala:229) > > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) > > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) > org.apache.spark.scheduler.Task.run(Task.scala:54) > > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > java.lang.Thread.run(Thread.java:745) > Driver stacktrace: > at org.apache.spark.scheduler.DAGScheduler.org > $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688) > at scala.Option.foreach(Option.scala:236) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:688) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1391) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) > at akka.actor.ActorCell.invoke(ActorCell.scala:456) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) > at akka.dispatch.Mailbox.run(Mailbox.scala:219) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >