Hi, I got $TreeNodeException, few questions: Q1) How should I do aggregation in SparK? Can I use aggregation directly in SQL? or Q1) Should I use SQL to load the data to form RDD then use scala to do the aggregation?
Regards Arthur MySQL (good one, without aggregation): sqlContext.sql("SELECT L_RETURNFLAG FROM LINEITEM WHERE L_SHIPDATE<='1998-09-02' GROUP BY L_RETURNFLAG, L_LINESTATUS ORDER BY L_RETURNFLAG, L_LINESTATUS").collect().foreach(println); [A] [N] [N] [R] My SQL (problem SQL, with aggregation): sqlContext.sql("SELECT L_RETURNFLAG, L_LINESTATUS, SUM(L_QUANTITY) AS SUM_QTY, SUM(L_EXTENDEDPRICE) AS SUM_BASE_PRICE, SUM(L_EXTENDEDPRICE * ( 1 - L_DISCOUNT )) AS SUM_DISC_PRICE, SUM(L_EXTENDEDPRICE * ( 1 - L_DISCOUNT ) * ( 1 + L_TAX )) AS SUM_CHARGE, AVG(L_QUANTITY) AS AVG_QTY, AVG(L_EXTENDEDPRICE) AS AVG_PRICE, AVG(L_DISCOUNT) AS AVG_DISC, COUNT(*) AS COUNT_ORDER FROM LINEITEM WHERE L_SHIPDATE<='1998-09-02' GROUP BY L_RETURNFLAG, L_LINESTATUS ORDER BY L_RETURNFLAG, L_LINESTATUS").collect().foreach(println); 14/10/23 20:38:31 INFO TaskSchedulerImpl: Removed TaskSet 18.0, whose tasks have all completed, from pool org.apache.spark.sql.catalyst.errors.package$TreeNodeException: sort, tree: Sort [l_returnflag#200 ASC,l_linestatus#201 ASC], true Exchange (RangePartitioning [l_returnflag#200 ASC,l_linestatus#201 ASC], 200) Aggregate false, [l_returnflag#200,l_linestatus#201], [l_returnflag#200,l_linestatus#201,SUM(PartialSum#216) AS sum_qty#181,SUM(PartialSum#217) AS sum_base_price#182,SUM(PartialSum#218) AS sum_disc_price#183,SUM(PartialSum#219) AS sum_charge#184,(CAST(SUM(PartialSum#220), DoubleType) / CAST(SUM(PartialCount#221L), DoubleType)) AS avg_qty#185,(CAST(SUM(PartialSum#222), DoubleType) / CAST(SUM(PartialCount#223L), DoubleType)) AS avg_price#186,(CAST(SUM(PartialSum#224), DoubleType) / CAST(SUM(PartialCount#225L), DoubleType)) AS avg_disc#187,SUM(PartialCount#226L) AS count_order#188L] Exchange (HashPartitioning [l_returnflag#200,l_linestatus#201], 200) Aggregate true, [l_returnflag#200,l_linestatus#201], [l_returnflag#200,l_linestatus#201,COUNT(l_discount#195) AS PartialCount#225L,SUM(l_discount#195) AS PartialSum#224,COUNT(1) AS PartialCount#226L,SUM((l_extendedprice#194 * (1.0 - l_discount#195))) AS PartialSum#218,SUM(l_extendedprice#194) AS PartialSum#217,COUNT(l_quantity#193) AS PartialCount#221L,SUM(l_quantity#193) AS PartialSum#220,COUNT(l_extendedprice#194) AS PartialCount#223L,SUM(l_extendedprice#194) AS PartialSum#222,SUM(((l_extendedprice#194 * (1.0 - l_discount#195)) * (1.0 + l_tax#196))) AS PartialSum#219,SUM(l_quantity#193) AS PartialSum#216] Project [l_extendedprice#194,l_linestatus#201,l_discount#195,l_returnflag#200,l_tax#196,l_quantity#193] Filter (l_shipdate#197 <= 1998-09-02) HiveTableScan [l_shipdate#197,l_extendedprice#194,l_linestatus#201,l_discount#195,l_returnflag#200,l_tax#196,l_quantity#193], (MetastoreRelation boc_12, lineitem, None), None at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:47) at org.apache.spark.sql.execution.Sort.execute(basicOperators.scala:191) at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:85) at org.apache.spark.sql.SchemaRDD.collect(SchemaRDD.scala:438) at $iwC$$iwC$$iwC$$iwC.<init>(<console>:15) at $iwC$$iwC$$iwC.<init>(<console>:20) at $iwC$$iwC.<init>(<console>:22) at $iwC.<init>(<console>:24) at <init>(<console>:26) at .<init>(<console>:30) at .<clinit>(<console>) at .<init>(<console>:7) at .<clinit>(<console>) at $print(<console>) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:789) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1062) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:615) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:646) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:610) at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:814) at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:859) at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:771) at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:616) at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:624) at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:629) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:954) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902) at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:902) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:997) at org.apache.spark.repl.Main$.main(Main.scala:31) at org.apache.spark.repl.Main.main(Main.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree: Exchange (RangePartitioning [l_returnflag#200 ASC,l_linestatus#201 ASC], 200) Aggregate false, [l_returnflag#200,l_linestatus#201], [l_returnflag#200,l_linestatus#201,SUM(PartialSum#216) AS sum_qty#181,SUM(PartialSum#217) AS sum_base_price#182,SUM(PartialSum#218) AS sum_disc_price#183,SUM(PartialSum#219) AS sum_charge#184,(CAST(SUM(PartialSum#220), DoubleType) / CAST(SUM(PartialCount#221L), DoubleType)) AS avg_qty#185,(CAST(SUM(PartialSum#222), DoubleType) / CAST(SUM(PartialCount#223L), DoubleType)) AS avg_price#186,(CAST(SUM(PartialSum#224), DoubleType) / CAST(SUM(PartialCount#225L), DoubleType)) AS avg_disc#187,SUM(PartialCount#226L) AS count_order#188L] Exchange (HashPartitioning [l_returnflag#200,l_linestatus#201], 200) Aggregate true, [l_returnflag#200,l_linestatus#201], [l_returnflag#200,l_linestatus#201,COUNT(l_discount#195) AS PartialCount#225L,SUM(l_discount#195) AS PartialSum#224,COUNT(1) AS PartialCount#226L,SUM((l_extendedprice#194 * (1.0 - l_discount#195))) AS PartialSum#218,SUM(l_extendedprice#194) AS PartialSum#217,COUNT(l_quantity#193) AS PartialCount#221L,SUM(l_quantity#193) AS PartialSum#220,COUNT(l_extendedprice#194) AS PartialCount#223L,SUM(l_extendedprice#194) AS PartialSum#222,SUM(((l_extendedprice#194 * (1.0 - l_discount#195)) * (1.0 + l_tax#196))) AS PartialSum#219,SUM(l_quantity#193) AS PartialSum#216] Project [l_extendedprice#194,l_linestatus#201,l_discount#195,l_returnflag#200,l_tax#196,l_quantity#193] Filter (l_shipdate#197 <= 1998-09-02) HiveTableScan [l_shipdate#197,l_extendedprice#194,l_linestatus#201,l_discount#195,l_returnflag#200,l_tax#196,l_quantity#193], (MetastoreRelation boc_12, lineitem, None), None at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:47) at org.apache.spark.sql.execution.Exchange.execute(Exchange.scala:44) at org.apache.spark.sql.execution.Sort$$anonfun$execute$3.apply(basicOperators.scala:192) at org.apache.spark.sql.execution.Sort$$anonfun$execute$3.apply(basicOperators.scala:193) at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:46) ... 43 more Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 22 in stage 18.0 failed 4 times, most recent failure: Lost task 22.3 in stage 18.0 (TID 1488, m35): java.lang.ClassCastException: java.lang.String cannot be cast to scala.math.BigDecimal scala.math.Numeric$BigDecimalIsFractional$.plus(Numeric.scala:182) org.apache.spark.sql.catalyst.expressions.Add$$anonfun$eval$2.apply(arithmetic.scala:58) org.apache.spark.sql.catalyst.expressions.Add$$anonfun$eval$2.apply(arithmetic.scala:58) org.apache.spark.sql.catalyst.expressions.Expression.n2(Expression.scala:114) org.apache.spark.sql.catalyst.expressions.Add.eval(arithmetic.scala:58) org.apache.spark.sql.catalyst.expressions.MutableLiteral.update(literals.scala:69) org.apache.spark.sql.catalyst.expressions.SumFunction.update(aggregates.scala:433) org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$7.apply(Aggregate.scala:167) org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$7.apply(Aggregate.scala:151) org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:596) org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:596) org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) org.apache.spark.scheduler.Task.run(Task.scala:54) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:688) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1391) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)