One more, you are referring to 2 different sales tables. That might account for the difference in numbers.
Regards Sab On 24-Feb-2016 9:50 pm, "Mich Talebzadeh" < mich.talebza...@cloudtechnologypartners.co.uk> wrote: > > > *Hi,* > > *Tools* > > *Spark 1.5.2, Hadoop 2.6, Hive 2.0, Spark-Shell, Hive Database* > > *Objectives: Timing differences between running Spark using SQL and > running Spark using functional programing (FP) (functional calls) on Hive > tables* > > *Underlying tables: Three tables in Hive database using ORC format* > > The main differences in timings come from running the queries and fetching > data. If you look the transformation part that is > > val rs = > s.join(t,"time_id").join(c,"channel_id").groupBy("calendar_month_desc","channel_desc").agg(sum("amount_sold").as("TotalSales")) > > Takes I second. On the other hand using SQL the query 1 takes 19 seconds > compared to just under 4 minutes for functional programming > > The seconds query using SQL takes 28 seconds. Using FP it takes around 4 > minutes. > > These are my assumptions. > > 1. Running SQL the full query is executed in Hive which means that > Hive can take advantage of ORC optimization/storage index etc? > 2. Running FP requires that data is fetched from the underlying tables > in Hive and brought back to Spark cluster (standalone here) and the joins > etc are done there > > The next step for me would be to: > > 1. Look at the query plans in Spark > 2. Run the same code on Hive alone and compare results > > > > Any other suggestions are welcome. > > *Standard SQL code* > > val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc) > println ("\nStarted at"); HiveContext.sql("SELECT > FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') > ").collect.foreach(println) > HiveContext.sql("use oraclehadoop") > println ("\ncreating data set at "); HiveContext.sql("SELECT > FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') > ").collect.foreach(println) > val rs = HiveContext.sql( > """ > SELECT t.calendar_month_desc > , c.channel_desc > , SUM(s.amount_sold) AS TotalSales > FROM smallsales s > INNER JOIN times t > ON s.time_id = t.time_id > INNER JOIN channels c > ON s.channel_id = c.channel_id > GROUP BY t.calendar_month_desc, c.channel_desc > """) > rs.registerTempTable("tmp") > println ("\nfirst query at"); HiveContext.sql("SELECT > FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') > ").collect.foreach(println) > HiveContext.sql(""" > SELECT calendar_month_desc AS MONTH, channel_desc AS CHANNEL, TotalSales > from tmp > ORDER BY MONTH, CHANNEL LIMIT 5 > """).collect.foreach(println) > println ("\nsecond query at"); HiveContext.sql("SELECT > FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') > ").collect.foreach(println) > HiveContext.sql(""" > SELECT channel_desc AS CHANNEL, MAX(TotalSales) AS SALES > FROM tmp > GROUP BY channel_desc > order by SALES DESC LIMIT 5 > """).collect.foreach(println) > println ("\nFinished at"); HiveContext.sql("SELECT > FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') > ").collect.foreach(println) > sys.exit > > *Results* > > Started at [24/02/2016 09:00:50.50] > res1: org.apache.spark.sql.DataFrame = [result: string] > > creating data set at [24/02/2016 09:00:53.53] > rs: org.apache.spark.sql.DataFrame = [calendar_month_desc: string, > channel_desc: string, TotalSales: decimal(20,0) > > First query at [24/02/2016 09:00:54.54] > [1998-01,Direct Sales,9161730] > [1998-01,Internet,1248581] > [1998-01,Partners,2409776] > [1998-02,Direct Sales,9161840] > [1998-02,Internet,1533193] > > second query at [24/02/2016 09:01:13.13] > [Direct Sales,9161840] > [Internet,3977374] > [Partners,3976291] > [Tele Sales,328760] > > Finished at [24/02/2016 09:01:31.31 > > *Code using functional programming* > > val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc) > println ("\nStarted at"); HiveContext.sql("SELECT > FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') > ").collect.foreach(println) > HiveContext.sql("use oraclehadoop") > var s = > HiveContext.table("sales").select("AMOUNT_SOLD","TIME_ID","CHANNEL_ID") > val c = HiveContext.table("channels").select("CHANNEL_ID","CHANNEL_DESC") > val t = HiveContext.table("times").select("TIME_ID","CALENDAR_MONTH_DESC") > println ("\ncreating data set at"); HiveContext.sql("SELECT > FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') > ").collect.foreach(println) > val rs = > s.join(t,"time_id").join(c,"channel_id").groupBy("calendar_month_desc","channel_desc").agg(sum("amount_sold").as("TotalSales")) > println ("\nfirst query at"); HiveContext.sql("SELECT > FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') > ").collect.foreach(println) > val rs1 = > rs.orderBy("calendar_month_desc","channel_desc").take(5).foreach(println) > println ("\nsecond query at"); HiveContext.sql("SELECT > FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') > ").collect.foreach(println) > val rs2 > =rs.groupBy("channel_desc").agg(max("TotalSales").as("SALES")).orderBy("SALES").sort(desc("SALES")).take(5).foreach(println) > println ("\nFinished at"); HiveContext.sql("SELECT > FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') > ").collect.foreach(println) > sys.exit > > *Results* > > Started at [24/02/2016 08:52:27.27] > res1: org.apache.spark.sql.DataFrame = [result: string] > s: org.apache.spark.sql.DataFrame = [AMOUNT_SOLD: decimal(10,0), TIME_ID: > timestamp, CHANNEL_ID: bigint] > c: org.apache.spark.sql.DataFrame = [CHANNEL_ID: double, CHANNEL_DESC: > string] > t: org.apache.spark.sql.DataFrame = [TIME_ID: timestamp, > CALENDAR_MONTH_DESC: string] > > creating data set at [24/02/2016 08:52:30.30] > rs: org.apache.spark.sql.DataFrame = [calendar_month_desc: string, > channel_desc: string, TotalSales: decimal(20,0)] > > first query at [24/02/2016 08:52:31.31] > [1998-01,Direct Sales,9086830] > [1998-01,Internet,1247641] > [1998-01,Partners,2393567] > [1998-02,Direct Sales,9161840] > [1998-02,Internet,1533193] > rs1: Unit = () > > second query at [24/02/2016 08:56:17.17] > [Direct Sales,9161840] > [Internet,3977374] > [Partners,3976291] > [Tele Sales,328760] > rs2: Unit = () > > Finished at > [24/02/2016 09:00:14.14] > > > > On 24/02/2016 06:27, Sabarish Sasidharan wrote: > > When using SQL your full query, including the joins, were executed in > Hive(or RDBMS) and only the results were brought into the Spark cluster. In > the FP case, the data for the 3 tables is first pulled into the Spark > cluster and then the join is executed. > > Thus the time difference. > > It's not immediately obvious why the results are different. > > Regards > Sab > > >