My apologies I definitely misunderstood. You are 100% correct.
On Feb 24, 2016 19:25, "Sabarish Sasidharan" <
sabarish.sasidha...@manthan.com> wrote:

> I never said it needs one. All I said is that when calling context.sql()
> the sql is executed in the source database (assuming datasource is Hive or
> some RDBMS)
>
> Regards
> Sab
>
> Regards
> Sab
> On 24-Feb-2016 11:49 pm, "Mohannad Ali" <man...@gmail.com> wrote:
>
>> That is incorrect HiveContext does not need a hive instance to run.
>> On Feb 24, 2016 19:15, "Sabarish Sasidharan" <
>> sabarish.sasidha...@manthan.com> wrote:
>>
>>> Yes
>>>
>>> Regards
>>> Sab
>>> On 24-Feb-2016 9:15 pm, "Koert Kuipers" <ko...@tresata.com> wrote:
>>>
>>>> are you saying that HiveContext.sql(...) runs on hive, and not on
>>>> spark sql?
>>>>
>>>> On Wed, Feb 24, 2016 at 1:27 AM, Sabarish Sasidharan <
>>>> sabarish.sasidha...@manthan.com> wrote:
>>>>
>>>>> When using SQL your full query, including the joins, were executed in
>>>>> Hive(or RDBMS) and only the results were brought into the Spark cluster. 
>>>>> In
>>>>> the FP case, the data for the 3 tables is first pulled into the Spark
>>>>> cluster and then the join is executed.
>>>>>
>>>>> Thus the time difference.
>>>>>
>>>>> It's not immediately obvious why the results are different.
>>>>>
>>>>> Regards
>>>>> Sab
>>>>> On 24-Feb-2016 5:40 am, "Mich Talebzadeh" <
>>>>> mich.talebza...@cloudtechnologypartners.co.uk> wrote:
>>>>>
>>>>>>
>>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> First thanks everyone for their suggestions. Much appreciated.
>>>>>>
>>>>>> This was the original queries written in SQL and run against
>>>>>> Spark-shell
>>>>>>
>>>>>> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>>>>>> println ("\nStarted at"); HiveContext.sql("SELECT
>>>>>> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
>>>>>> ").collect.foreach(println)
>>>>>> HiveContext.sql("use oraclehadoop")
>>>>>>
>>>>>> val rs = HiveContext.sql(
>>>>>> """
>>>>>> SELECT t.calendar_month_desc, c.channel_desc, SUM(s.amount_sold) AS
>>>>>> TotalSales
>>>>>> FROM smallsales s
>>>>>> INNER JOIN times t
>>>>>> ON s.time_id = t.time_id
>>>>>> INNER JOIN channels c
>>>>>> ON s.channel_id = c.channel_id
>>>>>> GROUP BY t.calendar_month_desc, c.channel_desc
>>>>>> """)
>>>>>> rs.registerTempTable("tmp")
>>>>>> println ("\nfirst query")
>>>>>> HiveContext.sql("""
>>>>>> SELECT calendar_month_desc AS MONTH, channel_desc AS CHANNEL,
>>>>>> TotalSales
>>>>>> from tmp
>>>>>> ORDER BY MONTH, CHANNEL LIMIT 5
>>>>>> """).collect.foreach(println)
>>>>>> println ("\nsecond query")
>>>>>> HiveContext.sql("""
>>>>>> SELECT channel_desc AS CHANNEL, MAX(TotalSales)  AS SALES
>>>>>> FROM tmp
>>>>>> GROUP BY channel_desc
>>>>>> order by SALES DESC LIMIT 5
>>>>>> """).collect.foreach(println)
>>>>>> println ("\nFinished at"); HiveContext.sql("SELECT
>>>>>> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
>>>>>> ").collect.foreach(println)
>>>>>> sys.exit
>>>>>>
>>>>>> The second queries were written in FP as much as I could as below
>>>>>>
>>>>>> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>>>>>> println ("\nStarted at"); HiveContext.sql("SELECT
>>>>>> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
>>>>>> ").collect.foreach(println)
>>>>>> HiveContext.sql("use oraclehadoop")
>>>>>> var s = HiveContext.sql("SELECT AMOUNT_SOLD, TIME_ID, CHANNEL_ID FROM
>>>>>> sales")
>>>>>> val c = HiveContext.sql("SELECT CHANNEL_ID, CHANNEL_DESC FROM
>>>>>> channels")
>>>>>> val t = HiveContext.sql("SELECT TIME_ID, CALENDAR_MONTH_DESC FROM
>>>>>> times")
>>>>>> val rs =
>>>>>> s.join(t,"time_id").join(c,"channel_id").groupBy("calendar_month_desc","channel_desc").agg(sum("amount_sold").as("TotalSales"))
>>>>>> println ("\nfirst query")
>>>>>> val rs1 =
>>>>>> rs.orderBy("calendar_month_desc","channel_desc").take(5).foreach(println)
>>>>>> println ("\nsecond query")
>>>>>> val rs2
>>>>>> =rs.groupBy("channel_desc").agg(max("TotalSales").as("SALES")).orderBy("SALES").sort(desc("SALES")).take(5).foreach(println)
>>>>>> println ("\nFinished at"); HiveContext.sql("SELECT
>>>>>> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
>>>>>> ").collect.foreach(println)
>>>>>> sys.exit
>>>>>>
>>>>>>
>>>>>>
>>>>>> However The first query results are slightly different in SQL and FP
>>>>>> (may be the first query code in FP is not exactly correct?) and more
>>>>>> importantly the FP takes order of magnitude longer compared to SQL (8
>>>>>> minutes compared to less than a minute). I am not surprised as I expected
>>>>>> Functional Programming has to flatten up all those method calls and 
>>>>>> convert
>>>>>> them to SQL?
>>>>>>
>>>>>> *The standard SQL results*
>>>>>>
>>>>>>
>>>>>>
>>>>>> Started at
>>>>>> [23/02/2016 23:55:30.30]
>>>>>> res1: org.apache.spark.sql.DataFrame = [result: string]
>>>>>> rs: org.apache.spark.sql.DataFrame = [calendar_month_desc: string,
>>>>>> channel_desc: string, TotalSales: decimal(20,0)]
>>>>>>
>>>>>> first query
>>>>>> [1998-01,Direct Sales,9161730]
>>>>>> [1998-01,Internet,1248581]
>>>>>> [1998-01,Partners,2409776]
>>>>>> [1998-02,Direct Sales,9161840]
>>>>>> [1998-02,Internet,1533193]
>>>>>>
>>>>>>
>>>>>>
>>>>>> second query
>>>>>> [Direct Sales,9161840]
>>>>>> [Internet,3977374]
>>>>>> [Partners,3976291]
>>>>>> [Tele Sales,328760]
>>>>>>
>>>>>> Finished at
>>>>>> [23/02/2016 23:56:11.11]
>>>>>>
>>>>>> *The FP results*
>>>>>>
>>>>>> Started at
>>>>>> [23/02/2016 23:45:58.58]
>>>>>> res1: org.apache.spark.sql.DataFrame = [result: string]
>>>>>> s: org.apache.spark.sql.DataFrame = [AMOUNT_SOLD: decimal(10,0),
>>>>>> TIME_ID: timestamp, CHANNEL_ID: bigint]
>>>>>> c: org.apache.spark.sql.DataFrame = [CHANNEL_ID: double,
>>>>>> CHANNEL_DESC: string]
>>>>>> t: org.apache.spark.sql.DataFrame = [TIME_ID: timestamp,
>>>>>> CALENDAR_MONTH_DESC: string]
>>>>>> rs: org.apache.spark.sql.DataFrame = [calendar_month_desc: string,
>>>>>> channel_desc: string, TotalSales: decimal(20,0)]
>>>>>>
>>>>>> first query
>>>>>> [1998-01,Direct Sales,9086830]
>>>>>> [1998-01,Internet,1247641]
>>>>>> [1998-01,Partners,2393567]
>>>>>> [1998-02,Direct Sales,9161840]
>>>>>> [1998-02,Internet,1533193]
>>>>>> rs1: Unit = ()
>>>>>>
>>>>>> second query
>>>>>> [Direct Sales,9161840]
>>>>>> [Internet,3977374]
>>>>>> [Partners,3976291]
>>>>>> [Tele Sales,328760]
>>>>>> rs2: Unit = ()
>>>>>>
>>>>>> Finished at
>>>>>> [23/02/2016 23:53:42.42]
>>>>>>
>>>>>>
>>>>>>
>>>>>> On 22/02/2016 23:16, Mich Talebzadeh wrote:
>>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I have data stored in Hive tables that I want to do simple
>>>>>> manipulation.
>>>>>>
>>>>>> Currently in Spark I perform the following with getting the result
>>>>>> set using SQL from Hive tables, registering as a temporary table in Spark
>>>>>>
>>>>>> Now Ideally I can get the result set into a DF and work on DF to
>>>>>> slice and dice the data using functional programming with filter, map.
>>>>>> split etc.
>>>>>>
>>>>>> I wanted to get some ideas on how to go about it.
>>>>>>
>>>>>> thanks
>>>>>>
>>>>>> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>>>>>>
>>>>>> HiveContext.sql("use oraclehadoop")
>>>>>> val rs = HiveContext.sql("""SELECT t.calendar_month_desc,
>>>>>> c.channel_desc, SUM(s.amount_sold) AS TotalSales
>>>>>> FROM smallsales s, times t, channels c
>>>>>> WHERE s.time_id = t.time_id
>>>>>> AND   s.channel_id = c.channel_id
>>>>>> GROUP BY t.calendar_month_desc, c.channel_desc
>>>>>> """)
>>>>>> *rs.registerTempTable("tmp")*
>>>>>>
>>>>>>
>>>>>> HiveContext.sql("""
>>>>>> SELECT calendar_month_desc AS MONTH, channel_desc AS CHANNEL,
>>>>>> TotalSales
>>>>>> from tmp
>>>>>> ORDER BY MONTH, CHANNEL
>>>>>> """).collect.foreach(println)
>>>>>> HiveContext.sql("""
>>>>>> SELECT channel_desc AS CHANNEL, MAX(TotalSales)  AS SALES
>>>>>> FROM tmp
>>>>>> GROUP BY channel_desc
>>>>>> order by SALES DESC
>>>>>> """).collect.foreach(println)
>>>>>>
>>>>>>
>>>>>> --
>>>>>>
>>>>>> Dr Mich Talebzadeh
>>>>>>
>>>>>> LinkedIn  
>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>> http://talebzadehmich.wordpress.com
>>>>>>
>>>>>> NOTE: The information in this email is proprietary and confidential. 
>>>>>> This message is for the designated recipient only, if you are not the 
>>>>>> intended recipient, you should destroy it immediately. Any information 
>>>>>> in this message shall not be understood as given or endorsed by Cloud 
>>>>>> Technology Partners Ltd, its subsidiaries or their employees, unless 
>>>>>> expressly so stated. It is the responsibility of the recipient to ensure 
>>>>>> that this email is virus free, therefore neither Cloud Technology 
>>>>>> partners Ltd, its subsidiaries nor their employees accept any 
>>>>>> responsibility.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>>
>>>>>> Dr Mich Talebzadeh
>>>>>>
>>>>>> LinkedIn  
>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>> http://talebzadehmich.wordpress.com
>>>>>>
>>>>>> NOTE: The information in this email is proprietary and confidential. 
>>>>>> This message is for the designated recipient only, if you are not the 
>>>>>> intended recipient, you should destroy it immediately. Any information 
>>>>>> in this message shall not be understood as given or endorsed by Cloud 
>>>>>> Technology Partners Ltd, its subsidiaries or their employees, unless 
>>>>>> expressly so stated. It is the responsibility of the recipient to ensure 
>>>>>> that this email is virus free, therefore neither Cloud Technology 
>>>>>> partners Ltd, its subsidiaries nor their employees accept any 
>>>>>> responsibility.
>>>>>>
>>>>>>
>>>>>>
>>>>

Reply via email to