My apologies I definitely misunderstood. You are 100% correct. On Feb 24, 2016 19:25, "Sabarish Sasidharan" < sabarish.sasidha...@manthan.com> wrote:
> I never said it needs one. All I said is that when calling context.sql() > the sql is executed in the source database (assuming datasource is Hive or > some RDBMS) > > Regards > Sab > > Regards > Sab > On 24-Feb-2016 11:49 pm, "Mohannad Ali" <man...@gmail.com> wrote: > >> That is incorrect HiveContext does not need a hive instance to run. >> On Feb 24, 2016 19:15, "Sabarish Sasidharan" < >> sabarish.sasidha...@manthan.com> wrote: >> >>> Yes >>> >>> Regards >>> Sab >>> On 24-Feb-2016 9:15 pm, "Koert Kuipers" <ko...@tresata.com> wrote: >>> >>>> are you saying that HiveContext.sql(...) runs on hive, and not on >>>> spark sql? >>>> >>>> On Wed, Feb 24, 2016 at 1:27 AM, Sabarish Sasidharan < >>>> sabarish.sasidha...@manthan.com> wrote: >>>> >>>>> When using SQL your full query, including the joins, were executed in >>>>> Hive(or RDBMS) and only the results were brought into the Spark cluster. >>>>> In >>>>> the FP case, the data for the 3 tables is first pulled into the Spark >>>>> cluster and then the join is executed. >>>>> >>>>> Thus the time difference. >>>>> >>>>> It's not immediately obvious why the results are different. >>>>> >>>>> Regards >>>>> Sab >>>>> On 24-Feb-2016 5:40 am, "Mich Talebzadeh" < >>>>> mich.talebza...@cloudtechnologypartners.co.uk> wrote: >>>>> >>>>>> >>>>>> >>>>>> Hi, >>>>>> >>>>>> First thanks everyone for their suggestions. Much appreciated. >>>>>> >>>>>> This was the original queries written in SQL and run against >>>>>> Spark-shell >>>>>> >>>>>> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc) >>>>>> println ("\nStarted at"); HiveContext.sql("SELECT >>>>>> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') >>>>>> ").collect.foreach(println) >>>>>> HiveContext.sql("use oraclehadoop") >>>>>> >>>>>> val rs = HiveContext.sql( >>>>>> """ >>>>>> SELECT t.calendar_month_desc, c.channel_desc, SUM(s.amount_sold) AS >>>>>> TotalSales >>>>>> FROM smallsales s >>>>>> INNER JOIN times t >>>>>> ON s.time_id = t.time_id >>>>>> INNER JOIN channels c >>>>>> ON s.channel_id = c.channel_id >>>>>> GROUP BY t.calendar_month_desc, c.channel_desc >>>>>> """) >>>>>> rs.registerTempTable("tmp") >>>>>> println ("\nfirst query") >>>>>> HiveContext.sql(""" >>>>>> SELECT calendar_month_desc AS MONTH, channel_desc AS CHANNEL, >>>>>> TotalSales >>>>>> from tmp >>>>>> ORDER BY MONTH, CHANNEL LIMIT 5 >>>>>> """).collect.foreach(println) >>>>>> println ("\nsecond query") >>>>>> HiveContext.sql(""" >>>>>> SELECT channel_desc AS CHANNEL, MAX(TotalSales) AS SALES >>>>>> FROM tmp >>>>>> GROUP BY channel_desc >>>>>> order by SALES DESC LIMIT 5 >>>>>> """).collect.foreach(println) >>>>>> println ("\nFinished at"); HiveContext.sql("SELECT >>>>>> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') >>>>>> ").collect.foreach(println) >>>>>> sys.exit >>>>>> >>>>>> The second queries were written in FP as much as I could as below >>>>>> >>>>>> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc) >>>>>> println ("\nStarted at"); HiveContext.sql("SELECT >>>>>> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') >>>>>> ").collect.foreach(println) >>>>>> HiveContext.sql("use oraclehadoop") >>>>>> var s = HiveContext.sql("SELECT AMOUNT_SOLD, TIME_ID, CHANNEL_ID FROM >>>>>> sales") >>>>>> val c = HiveContext.sql("SELECT CHANNEL_ID, CHANNEL_DESC FROM >>>>>> channels") >>>>>> val t = HiveContext.sql("SELECT TIME_ID, CALENDAR_MONTH_DESC FROM >>>>>> times") >>>>>> val rs = >>>>>> s.join(t,"time_id").join(c,"channel_id").groupBy("calendar_month_desc","channel_desc").agg(sum("amount_sold").as("TotalSales")) >>>>>> println ("\nfirst query") >>>>>> val rs1 = >>>>>> rs.orderBy("calendar_month_desc","channel_desc").take(5).foreach(println) >>>>>> println ("\nsecond query") >>>>>> val rs2 >>>>>> =rs.groupBy("channel_desc").agg(max("TotalSales").as("SALES")).orderBy("SALES").sort(desc("SALES")).take(5).foreach(println) >>>>>> println ("\nFinished at"); HiveContext.sql("SELECT >>>>>> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') >>>>>> ").collect.foreach(println) >>>>>> sys.exit >>>>>> >>>>>> >>>>>> >>>>>> However The first query results are slightly different in SQL and FP >>>>>> (may be the first query code in FP is not exactly correct?) and more >>>>>> importantly the FP takes order of magnitude longer compared to SQL (8 >>>>>> minutes compared to less than a minute). I am not surprised as I expected >>>>>> Functional Programming has to flatten up all those method calls and >>>>>> convert >>>>>> them to SQL? >>>>>> >>>>>> *The standard SQL results* >>>>>> >>>>>> >>>>>> >>>>>> Started at >>>>>> [23/02/2016 23:55:30.30] >>>>>> res1: org.apache.spark.sql.DataFrame = [result: string] >>>>>> rs: org.apache.spark.sql.DataFrame = [calendar_month_desc: string, >>>>>> channel_desc: string, TotalSales: decimal(20,0)] >>>>>> >>>>>> first query >>>>>> [1998-01,Direct Sales,9161730] >>>>>> [1998-01,Internet,1248581] >>>>>> [1998-01,Partners,2409776] >>>>>> [1998-02,Direct Sales,9161840] >>>>>> [1998-02,Internet,1533193] >>>>>> >>>>>> >>>>>> >>>>>> second query >>>>>> [Direct Sales,9161840] >>>>>> [Internet,3977374] >>>>>> [Partners,3976291] >>>>>> [Tele Sales,328760] >>>>>> >>>>>> Finished at >>>>>> [23/02/2016 23:56:11.11] >>>>>> >>>>>> *The FP results* >>>>>> >>>>>> Started at >>>>>> [23/02/2016 23:45:58.58] >>>>>> res1: org.apache.spark.sql.DataFrame = [result: string] >>>>>> s: org.apache.spark.sql.DataFrame = [AMOUNT_SOLD: decimal(10,0), >>>>>> TIME_ID: timestamp, CHANNEL_ID: bigint] >>>>>> c: org.apache.spark.sql.DataFrame = [CHANNEL_ID: double, >>>>>> CHANNEL_DESC: string] >>>>>> t: org.apache.spark.sql.DataFrame = [TIME_ID: timestamp, >>>>>> CALENDAR_MONTH_DESC: string] >>>>>> rs: org.apache.spark.sql.DataFrame = [calendar_month_desc: string, >>>>>> channel_desc: string, TotalSales: decimal(20,0)] >>>>>> >>>>>> first query >>>>>> [1998-01,Direct Sales,9086830] >>>>>> [1998-01,Internet,1247641] >>>>>> [1998-01,Partners,2393567] >>>>>> [1998-02,Direct Sales,9161840] >>>>>> [1998-02,Internet,1533193] >>>>>> rs1: Unit = () >>>>>> >>>>>> second query >>>>>> [Direct Sales,9161840] >>>>>> [Internet,3977374] >>>>>> [Partners,3976291] >>>>>> [Tele Sales,328760] >>>>>> rs2: Unit = () >>>>>> >>>>>> Finished at >>>>>> [23/02/2016 23:53:42.42] >>>>>> >>>>>> >>>>>> >>>>>> On 22/02/2016 23:16, Mich Talebzadeh wrote: >>>>>> >>>>>> Hi, >>>>>> >>>>>> I have data stored in Hive tables that I want to do simple >>>>>> manipulation. >>>>>> >>>>>> Currently in Spark I perform the following with getting the result >>>>>> set using SQL from Hive tables, registering as a temporary table in Spark >>>>>> >>>>>> Now Ideally I can get the result set into a DF and work on DF to >>>>>> slice and dice the data using functional programming with filter, map. >>>>>> split etc. >>>>>> >>>>>> I wanted to get some ideas on how to go about it. >>>>>> >>>>>> thanks >>>>>> >>>>>> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc) >>>>>> >>>>>> HiveContext.sql("use oraclehadoop") >>>>>> val rs = HiveContext.sql("""SELECT t.calendar_month_desc, >>>>>> c.channel_desc, SUM(s.amount_sold) AS TotalSales >>>>>> FROM smallsales s, times t, channels c >>>>>> WHERE s.time_id = t.time_id >>>>>> AND s.channel_id = c.channel_id >>>>>> GROUP BY t.calendar_month_desc, c.channel_desc >>>>>> """) >>>>>> *rs.registerTempTable("tmp")* >>>>>> >>>>>> >>>>>> HiveContext.sql(""" >>>>>> SELECT calendar_month_desc AS MONTH, channel_desc AS CHANNEL, >>>>>> TotalSales >>>>>> from tmp >>>>>> ORDER BY MONTH, CHANNEL >>>>>> """).collect.foreach(println) >>>>>> HiveContext.sql(""" >>>>>> SELECT channel_desc AS CHANNEL, MAX(TotalSales) AS SALES >>>>>> FROM tmp >>>>>> GROUP BY channel_desc >>>>>> order by SALES DESC >>>>>> """).collect.foreach(println) >>>>>> >>>>>> >>>>>> -- >>>>>> >>>>>> Dr Mich Talebzadeh >>>>>> >>>>>> LinkedIn >>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>>>> http://talebzadehmich.wordpress.com >>>>>> >>>>>> NOTE: The information in this email is proprietary and confidential. >>>>>> This message is for the designated recipient only, if you are not the >>>>>> intended recipient, you should destroy it immediately. Any information >>>>>> in this message shall not be understood as given or endorsed by Cloud >>>>>> Technology Partners Ltd, its subsidiaries or their employees, unless >>>>>> expressly so stated. It is the responsibility of the recipient to ensure >>>>>> that this email is virus free, therefore neither Cloud Technology >>>>>> partners Ltd, its subsidiaries nor their employees accept any >>>>>> responsibility. >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> >>>>>> Dr Mich Talebzadeh >>>>>> >>>>>> LinkedIn >>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>>>> http://talebzadehmich.wordpress.com >>>>>> >>>>>> NOTE: The information in this email is proprietary and confidential. >>>>>> This message is for the designated recipient only, if you are not the >>>>>> intended recipient, you should destroy it immediately. Any information >>>>>> in this message shall not be understood as given or endorsed by Cloud >>>>>> Technology Partners Ltd, its subsidiaries or their employees, unless >>>>>> expressly so stated. It is the responsibility of the recipient to ensure >>>>>> that this email is virus free, therefore neither Cloud Technology >>>>>> partners Ltd, its subsidiaries nor their employees accept any >>>>>> responsibility. >>>>>> >>>>>> >>>>>> >>>>