Hi,

I got $TreeNodeException, few questions:
Q1) How should I do aggregation in SparK? Can I use aggregation directly in 
SQL? or
Q1) Should I use SQL to load the data to form RDD then use scala to do the 
aggregation?

Regards
Arthur


MySQL (good one, without aggregation): 
sqlContext.sql("SELECT L_RETURNFLAG FROM LINEITEM WHERE  
L_SHIPDATE<='1998-09-02'  GROUP  BY L_RETURNFLAG, L_LINESTATUS ORDER  BY 
L_RETURNFLAG, L_LINESTATUS").collect().foreach(println);
[A]
[N]
[N]
[R]


My SQL (problem SQL, with aggregation):
sqlContext.sql("SELECT L_RETURNFLAG, L_LINESTATUS, SUM(L_QUANTITY) AS SUM_QTY, 
SUM(L_EXTENDEDPRICE) AS SUM_BASE_PRICE, SUM(L_EXTENDEDPRICE * ( 1 - L_DISCOUNT 
)) AS SUM_DISC_PRICE, SUM(L_EXTENDEDPRICE * ( 1 - L_DISCOUNT ) * ( 1 + L_TAX )) 
AS SUM_CHARGE, AVG(L_QUANTITY) AS AVG_QTY, AVG(L_EXTENDEDPRICE) AS AVG_PRICE, 
AVG(L_DISCOUNT) AS AVG_DISC, COUNT(*) AS COUNT_ORDER  FROM LINEITEM WHERE  
L_SHIPDATE<='1998-09-02'  GROUP  BY L_RETURNFLAG, L_LINESTATUS ORDER  BY 
L_RETURNFLAG, L_LINESTATUS").collect().foreach(println);

14/10/23 20:38:31 INFO TaskSchedulerImpl: Removed TaskSet 18.0, whose tasks 
have all completed, from pool 
org.apache.spark.sql.catalyst.errors.package$TreeNodeException: sort, tree:
Sort [l_returnflag#200 ASC,l_linestatus#201 ASC], true
 Exchange (RangePartitioning [l_returnflag#200 ASC,l_linestatus#201 ASC], 200)
  Aggregate false, [l_returnflag#200,l_linestatus#201], 
[l_returnflag#200,l_linestatus#201,SUM(PartialSum#216) AS 
sum_qty#181,SUM(PartialSum#217) AS sum_base_price#182,SUM(PartialSum#218) AS 
sum_disc_price#183,SUM(PartialSum#219) AS 
sum_charge#184,(CAST(SUM(PartialSum#220), DoubleType) / 
CAST(SUM(PartialCount#221L), DoubleType)) AS 
avg_qty#185,(CAST(SUM(PartialSum#222), DoubleType) / 
CAST(SUM(PartialCount#223L), DoubleType)) AS 
avg_price#186,(CAST(SUM(PartialSum#224), DoubleType) / 
CAST(SUM(PartialCount#225L), DoubleType)) AS 
avg_disc#187,SUM(PartialCount#226L) AS count_order#188L]
   Exchange (HashPartitioning [l_returnflag#200,l_linestatus#201], 200)
    Aggregate true, [l_returnflag#200,l_linestatus#201], 
[l_returnflag#200,l_linestatus#201,COUNT(l_discount#195) AS 
PartialCount#225L,SUM(l_discount#195) AS PartialSum#224,COUNT(1) AS 
PartialCount#226L,SUM((l_extendedprice#194 * (1.0 - l_discount#195))) AS 
PartialSum#218,SUM(l_extendedprice#194) AS PartialSum#217,COUNT(l_quantity#193) 
AS PartialCount#221L,SUM(l_quantity#193) AS 
PartialSum#220,COUNT(l_extendedprice#194) AS 
PartialCount#223L,SUM(l_extendedprice#194) AS 
PartialSum#222,SUM(((l_extendedprice#194 * (1.0 - l_discount#195)) * (1.0 + 
l_tax#196))) AS PartialSum#219,SUM(l_quantity#193) AS PartialSum#216]
     Project 
[l_extendedprice#194,l_linestatus#201,l_discount#195,l_returnflag#200,l_tax#196,l_quantity#193]
      Filter (l_shipdate#197 <= 1998-09-02)
       HiveTableScan 
[l_shipdate#197,l_extendedprice#194,l_linestatus#201,l_discount#195,l_returnflag#200,l_tax#196,l_quantity#193],
 (MetastoreRelation boc_12, lineitem, None), None

        at 
org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:47)
        at org.apache.spark.sql.execution.Sort.execute(basicOperators.scala:191)
        at 
org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:85)
        at org.apache.spark.sql.SchemaRDD.collect(SchemaRDD.scala:438)
        at $iwC$$iwC$$iwC$$iwC.<init>(<console>:15)
        at $iwC$$iwC$$iwC.<init>(<console>:20)
        at $iwC$$iwC.<init>(<console>:22)
        at $iwC.<init>(<console>:24)
        at <init>(<console>:26)
        at .<init>(<console>:30)
        at .<clinit>(<console>)
        at .<init>(<console>:7)
        at .<clinit>(<console>)
        at $print(<console>)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at 
org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:789)
        at 
org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1062)
        at 
org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:615)
        at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:646)
        at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:610)
        at 
org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:814)
        at 
org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:859)
        at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:771)
        at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:616)
        at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:624)
        at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:629)
        at 
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:954)
        at 
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902)
        at 
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902)
        at 
scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
        at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:902)
        at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:997)
        at org.apache.spark.repl.Main$.main(Main.scala:31)
        at org.apache.spark.repl.Main.main(Main.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: 
execute, tree:
Exchange (RangePartitioning [l_returnflag#200 ASC,l_linestatus#201 ASC], 200)
 Aggregate false, [l_returnflag#200,l_linestatus#201], 
[l_returnflag#200,l_linestatus#201,SUM(PartialSum#216) AS 
sum_qty#181,SUM(PartialSum#217) AS sum_base_price#182,SUM(PartialSum#218) AS 
sum_disc_price#183,SUM(PartialSum#219) AS 
sum_charge#184,(CAST(SUM(PartialSum#220), DoubleType) / 
CAST(SUM(PartialCount#221L), DoubleType)) AS 
avg_qty#185,(CAST(SUM(PartialSum#222), DoubleType) / 
CAST(SUM(PartialCount#223L), DoubleType)) AS 
avg_price#186,(CAST(SUM(PartialSum#224), DoubleType) / 
CAST(SUM(PartialCount#225L), DoubleType)) AS 
avg_disc#187,SUM(PartialCount#226L) AS count_order#188L]
  Exchange (HashPartitioning [l_returnflag#200,l_linestatus#201], 200)
   Aggregate true, [l_returnflag#200,l_linestatus#201], 
[l_returnflag#200,l_linestatus#201,COUNT(l_discount#195) AS 
PartialCount#225L,SUM(l_discount#195) AS PartialSum#224,COUNT(1) AS 
PartialCount#226L,SUM((l_extendedprice#194 * (1.0 - l_discount#195))) AS 
PartialSum#218,SUM(l_extendedprice#194) AS PartialSum#217,COUNT(l_quantity#193) 
AS PartialCount#221L,SUM(l_quantity#193) AS 
PartialSum#220,COUNT(l_extendedprice#194) AS 
PartialCount#223L,SUM(l_extendedprice#194) AS 
PartialSum#222,SUM(((l_extendedprice#194 * (1.0 - l_discount#195)) * (1.0 + 
l_tax#196))) AS PartialSum#219,SUM(l_quantity#193) AS PartialSum#216]
    Project 
[l_extendedprice#194,l_linestatus#201,l_discount#195,l_returnflag#200,l_tax#196,l_quantity#193]
     Filter (l_shipdate#197 <= 1998-09-02)
      HiveTableScan 
[l_shipdate#197,l_extendedprice#194,l_linestatus#201,l_discount#195,l_returnflag#200,l_tax#196,l_quantity#193],
 (MetastoreRelation boc_12, lineitem, None), None

        at 
org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:47)
        at org.apache.spark.sql.execution.Exchange.execute(Exchange.scala:44)
        at 
org.apache.spark.sql.execution.Sort$$anonfun$execute$3.apply(basicOperators.scala:192)
        at 
org.apache.spark.sql.execution.Sort$$anonfun$execute$3.apply(basicOperators.scala:193)
        at 
org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:46)
        ... 43 more
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: 
Task 22 in stage 18.0 failed 4 times, most recent failure: Lost task 22.3 in 
stage 18.0 (TID 1488, m35): java.lang.ClassCastException: java.lang.String 
cannot be cast to scala.math.BigDecimal
        scala.math.Numeric$BigDecimalIsFractional$.plus(Numeric.scala:182)
        
org.apache.spark.sql.catalyst.expressions.Add$$anonfun$eval$2.apply(arithmetic.scala:58)
        
org.apache.spark.sql.catalyst.expressions.Add$$anonfun$eval$2.apply(arithmetic.scala:58)
        
org.apache.spark.sql.catalyst.expressions.Expression.n2(Expression.scala:114)
        org.apache.spark.sql.catalyst.expressions.Add.eval(arithmetic.scala:58)
        
org.apache.spark.sql.catalyst.expressions.MutableLiteral.update(literals.scala:69)
        
org.apache.spark.sql.catalyst.expressions.SumFunction.update(aggregates.scala:433)
        
org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$7.apply(Aggregate.scala:167)
        
org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$7.apply(Aggregate.scala:151)
        org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:596)
        org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:596)
        org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
        org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
        org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
        org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
        org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
        org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
        
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
        
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
        org.apache.spark.scheduler.Task.run(Task.scala:54)
        org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
        
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
        at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185)
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174)
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173)
        at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173)
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688)
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688)
        at scala.Option.foreach(Option.scala:236)
        at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:688)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1391)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
        at akka.actor.ActorCell.invoke(ActorCell.scala:456)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
        at akka.dispatch.Mailbox.run(Mailbox.scala:219)
        at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Reply via email to