Re: What's the best way to find the nearest neighbor in Spark? Any windowing function?

2016-09-13 Thread Mobius ReX
Yes, that's it! Thank you, Ayan.



On Tue, Sep 13, 2016 at 5:50 PM, ayan guha  wrote:

> >>> df.show()
>
> ++---+---+-+-+-+
> |city|flg| id|  nbr|price|state|
> ++---+---+-+-+-+
> |  CA|  0|  1| 1000|  100|A|
> |  CA|  1|  2| 1010|   96|A|
> |  CA|  1|  3| 1010|  195|A|
> |  NY|  0|  4| 2000|  124|B|
> |  NY|  1|  5| 2001|  128|B|
> |  NY|  0|  6|3|   24|C|
> |  NY|  1|  7|30100|   27|C|
> |  NY|  0|  8|30200|   29|C|
> |  NY|  1|  9|33000|   39|C|
> ++---+---+-+-+-+
>
>
> >>> flg0 = df.filter(df.flg==0)
> >>> flg1 = df.filter(df.flg!=0)
> >>> flg0.registerTempTable("t_flg0")
> >>> flg1.registerTempTable("t_flg1")
>
> >>> j = sqlContext.sql("select *, rank() over (partition by id0 order by
> dist) r from (select *,x.id as id0,y.id as id1, abs(x.nbr/1000 -
> y.nbr/1000) + abs(x.price/100 - y.price/100) as dist from t_flg0 x inner
> join t_flg1 y on (x.city=y.city and x.state=y.state))x ")
>
>
> >>> j.show()
>
> city flg  id   nbr price state city flg  id   nbr price state id0 id1
> dist   r
>   *CA* *0* *1* *1000* *100* *A* *  CA* *1* *2* *1010* *96* *A* *1*
> *2* *0.05* *1*
>   CA 0 1 1000 100 A   CA 1 3 1010 195 A 1 3 0.96 2
>   *NY* *0* *4* *2000* *124* *B* *  NY* *1* *5* *2001* *128* *B*
> *4* *5* *0.041* *1*
>  * NY* *0* *6* *3* *24* *C* *  NY* *1* *7* *30100* *27* *C*
> *6* *7* *0.13* *1*
>   NY 0 6 3 24 C   NY 1 9 33000 39 C 6 9 3.15 2
>   *NY* *0* *8* *30200* *29* *C* *  NY* *1* *7* *30100* *27* *C*
> *8* *7* *0.12* *1*
>   NY 0 8 30200 29 C   NY 1 9 33000 39 C 8 9 2.9 2
> Wherever r=1, you got a match.
>
>
>
> On Wed, Sep 14, 2016 at 5:45 AM, Mobius ReX  wrote:
>
>> Hi Sean,
>>
>> Great!
>>
>> Is there any sample code implementing Locality Sensitive Hashing with
>> Spark, in either scala or python?
>>
>> "However if your rule is really like "must match column A and B and
>> then closest value in column C then just ordering everything by A, B,
>> C lets you pretty much read off the answer from the result set
>> directly. Everything is closest to one of its two neighbors."
>>
>> This is interesting since we can use Lead/Lag Windowing function if we
>> have only one continuous column. However,
>> our rule is "must match column A and B and then closest values in column
>> C and D - for any ID with column E = 0, and the closest ID with Column E = 
>> 1".
>> The distance metric between ID1 (with Column E =0) and ID2 (with Column E
>> =1) is defined as
>> abs( C1/C1 - C2/C1 ) + abs (D1/D1 - D2/D1)
>> One cannot do
>> abs( (C1/C1 + D1/D1) - (C2/C1 + D2/ D1) )
>>
>>
>> Any further tips?
>>
>> Best,
>> Rex
>>
>>
>>
>> On Tue, Sep 13, 2016 at 11:09 AM, Sean Owen  wrote:
>>
>>> The key is really to specify the distance metric that defines
>>> "closeness" for you. You have features that aren't on the same scale,
>>> and some that aren't continuous. You might look to clustering for
>>> ideas here, though mostly you just want to normalize the scale of
>>> dimensions to make them comparable.
>>>
>>> You can find nearest neighbors by brute force. If speed really matters
>>> you can consider locality sensitive hashing, which isn't that hard to
>>> implement and can give a lot of speed for a small cost in accuracy.
>>>
>>> However if your rule is really like "must match column A and B and
>>> then closest value in column C then just ordering everything by A, B,
>>> C lets you pretty much read off the answer from the result set
>>> directly. Everything is closest to one of its two neighbors.
>>>
>>> On Tue, Sep 13, 2016 at 6:18 PM, Mobius ReX  wrote:
>>> > Given a table
>>> >
>>> >> $cat data.csv
>>> >>
>>> >> ID,State,City,Price,Number,Flag
>>> >> 1,CA,A,100,1000,0
>>> >> 2,CA,A,96,1010,1
>>> >> 3,CA,A,195,1010,1
>>> >> 4,NY,B,124,2000,0
>>> >> 5,NY,B,128,2001,1
>>> >> 6,NY,C,24,3,0
>>> >> 7,NY,C,27,30100,1
>>> >> 8,NY,C,29,30200,0
>>> >> 9,NY,C,39,33000,1
>>> >
>>> >
>>> > Expected Result:
>>> >
>>> > ID0, ID1
>>> > 1,2
>>> > 4,5
>>> > 6,7
>>> > 8,7
>>> >
>>> > for each ID with Flag=0 above, we want to find another ID from Flag=1,
>>> with
>>> > the same "State" and "City", and the nearest Price and Number
>>> normalized by
>>> > the corresponding values of that ID with Flag=0.
>>> >
>>> > For example, ID = 1 and ID=2, has the same State and City, but
>>> different
>>> > FLAG.
>>> > After normalized the Price and Number (Price divided by 100, Number
>>> divided
>>> > by 1000), the distance between ID=1 and ID=2 is defined as :
>>> > abs(100/100 - 96/100) + abs(1000/1000 - 1010/1000) = 0.04 + 0.01 = 0.05
>>> >
>>> >
>>> > What's the best way to find such nearest neighbor? Any valuable tips
>>> will be
>>> > greatly appreciated!
>>> >
>>> >
>>>
>>
>>
>
>
> --
> Best Regards,
> Ayan Guha
>


Re: Using Spark SQL to Create JDBC Tables

2016-09-13 Thread ayan guha
I did not install myself, as it is part of Oracle's product, However, you
can bring in any SerDe yourself and add them to library. See this

blog for more information.

On Wed, Sep 14, 2016 at 2:15 PM, Benjamin Kim  wrote:

> Thank you for the idea. I will look for a PostgreSQL Serde for Hive. But,
> if you don’t mind me asking, how did you install the Oracle Serde?
>
> Cheers,
> Ben
>
>
>
> On Sep 13, 2016, at 7:12 PM, ayan guha  wrote:
>
> One option is have Hive as the central point of exposing data ie create
> hive tables which "point to" any other DB. i know Oracle provides there own
> Serde for hive. Not sure about PG though.
>
> Once tables are created in hive, STS will automatically see it.
>
> On Wed, Sep 14, 2016 at 11:08 AM, Benjamin Kim  wrote:
>
>> Has anyone created tables using Spark SQL that directly connect to a JDBC
>> data source such as PostgreSQL? I would like to use Spark SQL Thriftserver
>> to access and query remote PostgreSQL tables. In this way, we can
>> centralize data access to Spark SQL tables along with PostgreSQL making it
>> very convenient for users. They would not know or care where the data is
>> physically located anymore.
>>
>> By the way, our users only know SQL.
>>
>> If anyone has a better suggestion, then please let me know too.
>>
>> Thanks,
>> Ben
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>
>
> --
> Best Regards,
> Ayan Guha
>
>
>


-- 
Best Regards,
Ayan Guha


Re: Access HDFS within Spark Map Operation

2016-09-13 Thread ayan guha
Sure, and please post back if it works (or it does not :) )

On Wed, Sep 14, 2016 at 2:09 PM, Saliya Ekanayake  wrote:

> Thank you, I'll try.
>
> saliya
>
> On Wed, Sep 14, 2016 at 12:07 AM, ayan guha  wrote:
>
>> Depends on join, but unless you are doing cross join, it should not blow
>> up. 6M is not too much. I think what you may want to consider (a) volume of
>> your data files (b) reduce shuffling by following similar partitioning on
>> both RDDs
>>
>> On Wed, Sep 14, 2016 at 2:00 PM, Saliya Ekanayake 
>> wrote:
>>
>>> Thank you, but isn't that join going to be too expensive for this?
>>>
>>> On Tue, Sep 13, 2016 at 11:55 PM, ayan guha  wrote:
>>>
 My suggestion:

 1. Read first text file in (say) RDD1 using textFile
 2. Read 80K data files in RDD2 using wholeTextFile. RDD2 will be of
 signature (filename,filecontent).
 3. Join RDD1 and 2 based on some file name (or some other key).

 On Wed, Sep 14, 2016 at 1:41 PM, Saliya Ekanayake 
 wrote:

> 1.) What needs to be parallelized is the work for each of those 6M
> rows, not the 80K files. Let me elaborate this with a simple for loop if 
> we
> were to write this serially.
>
> For each line L out of 6M in the first file{
>  process the file corresponding to L out of those 80K files.
> }
>
> The 80K files are in HDFS and to read all that content into each
> worker is not possible due to size.
>
> 2. No. multiple rows may point to rthe same file but they operate on
> different records within the file.
>
> 3. End goal is to write back 6M processed information.
>
> This is simple map only type scenario. One workaround I can think of
> is to append all the 6M records to each of the data files.
>
> Thank you
>
> On Tue, Sep 13, 2016 at 11:25 PM, ayan guha 
> wrote:
>
>> Question:
>>
>> 1. Why you can not read all 80K files together? ie, why you have a
>> dependency on first text file?
>> 2. Your first text file has 6M rows, but total number of files~80K.
>> is there a scenario where there may not be a file in HDFS corresponding 
>> to
>> the row in first text file?
>> 3. May be a follow up of 1, what is your end goal?
>>
>> On Wed, Sep 14, 2016 at 12:17 PM, Saliya Ekanayake > > wrote:
>>
>>> The first text file is not that large, it has 6 million records
>>> (lines). For each line I need to read a file out of 8 files. They 
>>> total
>>> around 1.5TB. I didn't understand what you meant by "then again
>>> read text files for each line and union all rdds."
>>>
>>> On Tue, Sep 13, 2016 at 10:04 PM, Raghavendra Pandey <
>>> raghavendra.pan...@gmail.com> wrote:
>>>
 How large is your first text file? The idea is you read first text
 file and if it is not large you can collect all the lines on driver and
 then again read text files for each line and union all rdds.

 On 13 Sep 2016 11:39 p.m., "Saliya Ekanayake" 
 wrote:

> Just wonder if this is possible with Spark?
>
> On Mon, Sep 12, 2016 at 12:14 AM, Saliya Ekanayake <
> esal...@gmail.com> wrote:
>
>> Hi,
>>
>> I've got a text file where each line is a record. For each
>> record, I need to process a file in HDFS.
>>
>> So if I represent these records as an RDD and invoke a map()
>> operation on them how can I access the HDFS within that map()? Do I 
>> have to
>> create a Spark context within map() or is there a better solution to 
>> that?
>>
>> Thank you,
>> Saliya
>>
>>
>>
>> --
>> Saliya Ekanayake
>> Ph.D. Candidate | Research Assistant
>> School of Informatics and Computing | Digital Science Center
>> Indiana University, Bloomington
>>
>>
>
>
> --
> Saliya Ekanayake
> Ph.D. Candidate | Research Assistant
> School of Informatics and Computing | Digital Science Center
> Indiana University, Bloomington
>
>
>>>
>>>
>>> --
>>> Saliya Ekanayake
>>> Ph.D. Candidate | Research Assistant
>>> School of Informatics and Computing | Digital Science Center
>>> Indiana University, Bloomington
>>>
>>>
>>
>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>
>
>
> --
> Saliya Ekanayake
> Ph.D. Candidate | Research Assistant
> School of Informatics and Computing | Digital Science Center
> Indiana University, Bloomington
>
>


 --
 Best Regards,
 Ayan Guha

>>>
>>>
>>>
>>> --

Re: Using Spark SQL to Create JDBC Tables

2016-09-13 Thread Benjamin Kim
Thank you for the idea. I will look for a PostgreSQL Serde for Hive. But, if 
you don’t mind me asking, how did you install the Oracle Serde?

Cheers,
Ben


> On Sep 13, 2016, at 7:12 PM, ayan guha  wrote:
> 
> One option is have Hive as the central point of exposing data ie create hive 
> tables which "point to" any other DB. i know Oracle provides there own Serde 
> for hive. Not sure about PG though.
> 
> Once tables are created in hive, STS will automatically see it. 
> 
> On Wed, Sep 14, 2016 at 11:08 AM, Benjamin Kim  > wrote:
> Has anyone created tables using Spark SQL that directly connect to a JDBC 
> data source such as PostgreSQL? I would like to use Spark SQL Thriftserver to 
> access and query remote PostgreSQL tables. In this way, we can centralize 
> data access to Spark SQL tables along with PostgreSQL making it very 
> convenient for users. They would not know or care where the data is 
> physically located anymore.
> 
> By the way, our users only know SQL.
> 
> If anyone has a better suggestion, then please let me know too.
> 
> Thanks,
> Ben
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org 
> 
> 
> 
> 
> 
> -- 
> Best Regards,
> Ayan Guha



Re: Access HDFS within Spark Map Operation

2016-09-13 Thread Saliya Ekanayake
Thank you, I'll try.

saliya

On Wed, Sep 14, 2016 at 12:07 AM, ayan guha  wrote:

> Depends on join, but unless you are doing cross join, it should not blow
> up. 6M is not too much. I think what you may want to consider (a) volume of
> your data files (b) reduce shuffling by following similar partitioning on
> both RDDs
>
> On Wed, Sep 14, 2016 at 2:00 PM, Saliya Ekanayake 
> wrote:
>
>> Thank you, but isn't that join going to be too expensive for this?
>>
>> On Tue, Sep 13, 2016 at 11:55 PM, ayan guha  wrote:
>>
>>> My suggestion:
>>>
>>> 1. Read first text file in (say) RDD1 using textFile
>>> 2. Read 80K data files in RDD2 using wholeTextFile. RDD2 will be of
>>> signature (filename,filecontent).
>>> 3. Join RDD1 and 2 based on some file name (or some other key).
>>>
>>> On Wed, Sep 14, 2016 at 1:41 PM, Saliya Ekanayake 
>>> wrote:
>>>
 1.) What needs to be parallelized is the work for each of those 6M
 rows, not the 80K files. Let me elaborate this with a simple for loop if we
 were to write this serially.

 For each line L out of 6M in the first file{
  process the file corresponding to L out of those 80K files.
 }

 The 80K files are in HDFS and to read all that content into each worker
 is not possible due to size.

 2. No. multiple rows may point to rthe same file but they operate on
 different records within the file.

 3. End goal is to write back 6M processed information.

 This is simple map only type scenario. One workaround I can think of is
 to append all the 6M records to each of the data files.

 Thank you

 On Tue, Sep 13, 2016 at 11:25 PM, ayan guha 
 wrote:

> Question:
>
> 1. Why you can not read all 80K files together? ie, why you have a
> dependency on first text file?
> 2. Your first text file has 6M rows, but total number of files~80K. is
> there a scenario where there may not be a file in HDFS corresponding to 
> the
> row in first text file?
> 3. May be a follow up of 1, what is your end goal?
>
> On Wed, Sep 14, 2016 at 12:17 PM, Saliya Ekanayake 
> wrote:
>
>> The first text file is not that large, it has 6 million records
>> (lines). For each line I need to read a file out of 8 files. They 
>> total
>> around 1.5TB. I didn't understand what you meant by "then again read
>> text files for each line and union all rdds."
>>
>> On Tue, Sep 13, 2016 at 10:04 PM, Raghavendra Pandey <
>> raghavendra.pan...@gmail.com> wrote:
>>
>>> How large is your first text file? The idea is you read first text
>>> file and if it is not large you can collect all the lines on driver and
>>> then again read text files for each line and union all rdds.
>>>
>>> On 13 Sep 2016 11:39 p.m., "Saliya Ekanayake" 
>>> wrote:
>>>
 Just wonder if this is possible with Spark?

 On Mon, Sep 12, 2016 at 12:14 AM, Saliya Ekanayake <
 esal...@gmail.com> wrote:

> Hi,
>
> I've got a text file where each line is a record. For each record,
> I need to process a file in HDFS.
>
> So if I represent these records as an RDD and invoke a map()
> operation on them how can I access the HDFS within that map()? Do I 
> have to
> create a Spark context within map() or is there a better solution to 
> that?
>
> Thank you,
> Saliya
>
>
>
> --
> Saliya Ekanayake
> Ph.D. Candidate | Research Assistant
> School of Informatics and Computing | Digital Science Center
> Indiana University, Bloomington
>
>


 --
 Saliya Ekanayake
 Ph.D. Candidate | Research Assistant
 School of Informatics and Computing | Digital Science Center
 Indiana University, Bloomington


>>
>>
>> --
>> Saliya Ekanayake
>> Ph.D. Candidate | Research Assistant
>> School of Informatics and Computing | Digital Science Center
>> Indiana University, Bloomington
>>
>>
>
>
> --
> Best Regards,
> Ayan Guha
>



 --
 Saliya Ekanayake
 Ph.D. Candidate | Research Assistant
 School of Informatics and Computing | Digital Science Center
 Indiana University, Bloomington


>>>
>>>
>>> --
>>> Best Regards,
>>> Ayan Guha
>>>
>>
>>
>>
>> --
>> Saliya Ekanayake
>> Ph.D. Candidate | Research Assistant
>> School of Informatics and Computing | Digital Science Center
>> Indiana University, Bloomington
>>
>>
>
>
> --
> Best Regards,
> Ayan Guha
>



-- 
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | 

Re: Access HDFS within Spark Map Operation

2016-09-13 Thread ayan guha
Depends on join, but unless you are doing cross join, it should not blow
up. 6M is not too much. I think what you may want to consider (a) volume of
your data files (b) reduce shuffling by following similar partitioning on
both RDDs

On Wed, Sep 14, 2016 at 2:00 PM, Saliya Ekanayake  wrote:

> Thank you, but isn't that join going to be too expensive for this?
>
> On Tue, Sep 13, 2016 at 11:55 PM, ayan guha  wrote:
>
>> My suggestion:
>>
>> 1. Read first text file in (say) RDD1 using textFile
>> 2. Read 80K data files in RDD2 using wholeTextFile. RDD2 will be of
>> signature (filename,filecontent).
>> 3. Join RDD1 and 2 based on some file name (or some other key).
>>
>> On Wed, Sep 14, 2016 at 1:41 PM, Saliya Ekanayake 
>> wrote:
>>
>>> 1.) What needs to be parallelized is the work for each of those 6M rows,
>>> not the 80K files. Let me elaborate this with a simple for loop if we were
>>> to write this serially.
>>>
>>> For each line L out of 6M in the first file{
>>>  process the file corresponding to L out of those 80K files.
>>> }
>>>
>>> The 80K files are in HDFS and to read all that content into each worker
>>> is not possible due to size.
>>>
>>> 2. No. multiple rows may point to rthe same file but they operate on
>>> different records within the file.
>>>
>>> 3. End goal is to write back 6M processed information.
>>>
>>> This is simple map only type scenario. One workaround I can think of is
>>> to append all the 6M records to each of the data files.
>>>
>>> Thank you
>>>
>>> On Tue, Sep 13, 2016 at 11:25 PM, ayan guha  wrote:
>>>
 Question:

 1. Why you can not read all 80K files together? ie, why you have a
 dependency on first text file?
 2. Your first text file has 6M rows, but total number of files~80K. is
 there a scenario where there may not be a file in HDFS corresponding to the
 row in first text file?
 3. May be a follow up of 1, what is your end goal?

 On Wed, Sep 14, 2016 at 12:17 PM, Saliya Ekanayake 
 wrote:

> The first text file is not that large, it has 6 million records
> (lines). For each line I need to read a file out of 8 files. They 
> total
> around 1.5TB. I didn't understand what you meant by "then again read
> text files for each line and union all rdds."
>
> On Tue, Sep 13, 2016 at 10:04 PM, Raghavendra Pandey <
> raghavendra.pan...@gmail.com> wrote:
>
>> How large is your first text file? The idea is you read first text
>> file and if it is not large you can collect all the lines on driver and
>> then again read text files for each line and union all rdds.
>>
>> On 13 Sep 2016 11:39 p.m., "Saliya Ekanayake" 
>> wrote:
>>
>>> Just wonder if this is possible with Spark?
>>>
>>> On Mon, Sep 12, 2016 at 12:14 AM, Saliya Ekanayake <
>>> esal...@gmail.com> wrote:
>>>
 Hi,

 I've got a text file where each line is a record. For each record,
 I need to process a file in HDFS.

 So if I represent these records as an RDD and invoke a map()
 operation on them how can I access the HDFS within that map()? Do I 
 have to
 create a Spark context within map() or is there a better solution to 
 that?

 Thank you,
 Saliya



 --
 Saliya Ekanayake
 Ph.D. Candidate | Research Assistant
 School of Informatics and Computing | Digital Science Center
 Indiana University, Bloomington


>>>
>>>
>>> --
>>> Saliya Ekanayake
>>> Ph.D. Candidate | Research Assistant
>>> School of Informatics and Computing | Digital Science Center
>>> Indiana University, Bloomington
>>>
>>>
>
>
> --
> Saliya Ekanayake
> Ph.D. Candidate | Research Assistant
> School of Informatics and Computing | Digital Science Center
> Indiana University, Bloomington
>
>


 --
 Best Regards,
 Ayan Guha

>>>
>>>
>>>
>>> --
>>> Saliya Ekanayake
>>> Ph.D. Candidate | Research Assistant
>>> School of Informatics and Computing | Digital Science Center
>>> Indiana University, Bloomington
>>>
>>>
>>
>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>
>
>
> --
> Saliya Ekanayake
> Ph.D. Candidate | Research Assistant
> School of Informatics and Computing | Digital Science Center
> Indiana University, Bloomington
>
>


-- 
Best Regards,
Ayan Guha


Re: Can I assign affinity for spark executor processes?

2016-09-13 Thread Jakob Odersky
Hi Xiaoye,
could it be that the executors were spawned before the affinity was
set on the worker? Would it help to start spark worker with taskset
from the beginning, i.e. "taskset [mask] start-slave.sh"?
Workers in spark (standalone mode) simply create processes with the
standard java process API. Unless there is something funky going on in
the JRE, I don't see how spark could affect cpu affinity.

regards,
--Jakob

On Tue, Sep 13, 2016 at 7:56 PM, Xiaoye Sun  wrote:
> Hi,
>
> In my experiment, I pin one very important process on a fixed CPU. So the
> performance of Spark task execution will be affected if the executors or the
> worker uses that CPU. I am wondering if it is possible to let the Spark
> executors not using a particular CPU.
>
> I tried to 'taskset -p [cpumask] [pid]' command to set the affinity of the
> Worker process. However, the executor processes created by the worker
> process don't inherit the same CPU affinity.
>
> Thanks!
>
> Best,
> Xiaoye

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Access HDFS within Spark Map Operation

2016-09-13 Thread Saliya Ekanayake
Thank you, but isn't that join going to be too expensive for this?

On Tue, Sep 13, 2016 at 11:55 PM, ayan guha  wrote:

> My suggestion:
>
> 1. Read first text file in (say) RDD1 using textFile
> 2. Read 80K data files in RDD2 using wholeTextFile. RDD2 will be of
> signature (filename,filecontent).
> 3. Join RDD1 and 2 based on some file name (or some other key).
>
> On Wed, Sep 14, 2016 at 1:41 PM, Saliya Ekanayake 
> wrote:
>
>> 1.) What needs to be parallelized is the work for each of those 6M rows,
>> not the 80K files. Let me elaborate this with a simple for loop if we were
>> to write this serially.
>>
>> For each line L out of 6M in the first file{
>>  process the file corresponding to L out of those 80K files.
>> }
>>
>> The 80K files are in HDFS and to read all that content into each worker
>> is not possible due to size.
>>
>> 2. No. multiple rows may point to rthe same file but they operate on
>> different records within the file.
>>
>> 3. End goal is to write back 6M processed information.
>>
>> This is simple map only type scenario. One workaround I can think of is
>> to append all the 6M records to each of the data files.
>>
>> Thank you
>>
>> On Tue, Sep 13, 2016 at 11:25 PM, ayan guha  wrote:
>>
>>> Question:
>>>
>>> 1. Why you can not read all 80K files together? ie, why you have a
>>> dependency on first text file?
>>> 2. Your first text file has 6M rows, but total number of files~80K. is
>>> there a scenario where there may not be a file in HDFS corresponding to the
>>> row in first text file?
>>> 3. May be a follow up of 1, what is your end goal?
>>>
>>> On Wed, Sep 14, 2016 at 12:17 PM, Saliya Ekanayake 
>>> wrote:
>>>
 The first text file is not that large, it has 6 million records
 (lines). For each line I need to read a file out of 8 files. They total
 around 1.5TB. I didn't understand what you meant by "then again read
 text files for each line and union all rdds."

 On Tue, Sep 13, 2016 at 10:04 PM, Raghavendra Pandey <
 raghavendra.pan...@gmail.com> wrote:

> How large is your first text file? The idea is you read first text
> file and if it is not large you can collect all the lines on driver and
> then again read text files for each line and union all rdds.
>
> On 13 Sep 2016 11:39 p.m., "Saliya Ekanayake" 
> wrote:
>
>> Just wonder if this is possible with Spark?
>>
>> On Mon, Sep 12, 2016 at 12:14 AM, Saliya Ekanayake > > wrote:
>>
>>> Hi,
>>>
>>> I've got a text file where each line is a record. For each record, I
>>> need to process a file in HDFS.
>>>
>>> So if I represent these records as an RDD and invoke a map()
>>> operation on them how can I access the HDFS within that map()? Do I 
>>> have to
>>> create a Spark context within map() or is there a better solution to 
>>> that?
>>>
>>> Thank you,
>>> Saliya
>>>
>>>
>>>
>>> --
>>> Saliya Ekanayake
>>> Ph.D. Candidate | Research Assistant
>>> School of Informatics and Computing | Digital Science Center
>>> Indiana University, Bloomington
>>>
>>>
>>
>>
>> --
>> Saliya Ekanayake
>> Ph.D. Candidate | Research Assistant
>> School of Informatics and Computing | Digital Science Center
>> Indiana University, Bloomington
>>
>>


 --
 Saliya Ekanayake
 Ph.D. Candidate | Research Assistant
 School of Informatics and Computing | Digital Science Center
 Indiana University, Bloomington


>>>
>>>
>>> --
>>> Best Regards,
>>> Ayan Guha
>>>
>>
>>
>>
>> --
>> Saliya Ekanayake
>> Ph.D. Candidate | Research Assistant
>> School of Informatics and Computing | Digital Science Center
>> Indiana University, Bloomington
>>
>>
>
>
> --
> Best Regards,
> Ayan Guha
>



-- 
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington


Re: Access HDFS within Spark Map Operation

2016-09-13 Thread ayan guha
My suggestion:

1. Read first text file in (say) RDD1 using textFile
2. Read 80K data files in RDD2 using wholeTextFile. RDD2 will be of
signature (filename,filecontent).
3. Join RDD1 and 2 based on some file name (or some other key).

On Wed, Sep 14, 2016 at 1:41 PM, Saliya Ekanayake  wrote:

> 1.) What needs to be parallelized is the work for each of those 6M rows,
> not the 80K files. Let me elaborate this with a simple for loop if we were
> to write this serially.
>
> For each line L out of 6M in the first file{
>  process the file corresponding to L out of those 80K files.
> }
>
> The 80K files are in HDFS and to read all that content into each worker is
> not possible due to size.
>
> 2. No. multiple rows may point to rthe same file but they operate on
> different records within the file.
>
> 3. End goal is to write back 6M processed information.
>
> This is simple map only type scenario. One workaround I can think of is to
> append all the 6M records to each of the data files.
>
> Thank you
>
> On Tue, Sep 13, 2016 at 11:25 PM, ayan guha  wrote:
>
>> Question:
>>
>> 1. Why you can not read all 80K files together? ie, why you have a
>> dependency on first text file?
>> 2. Your first text file has 6M rows, but total number of files~80K. is
>> there a scenario where there may not be a file in HDFS corresponding to the
>> row in first text file?
>> 3. May be a follow up of 1, what is your end goal?
>>
>> On Wed, Sep 14, 2016 at 12:17 PM, Saliya Ekanayake 
>> wrote:
>>
>>> The first text file is not that large, it has 6 million records (lines).
>>> For each line I need to read a file out of 8 files. They total around
>>> 1.5TB. I didn't understand what you meant by "then again read text
>>> files for each line and union all rdds."
>>>
>>> On Tue, Sep 13, 2016 at 10:04 PM, Raghavendra Pandey <
>>> raghavendra.pan...@gmail.com> wrote:
>>>
 How large is your first text file? The idea is you read first text file
 and if it is not large you can collect all the lines on driver and then
 again read text files for each line and union all rdds.

 On 13 Sep 2016 11:39 p.m., "Saliya Ekanayake" 
 wrote:

> Just wonder if this is possible with Spark?
>
> On Mon, Sep 12, 2016 at 12:14 AM, Saliya Ekanayake 
> wrote:
>
>> Hi,
>>
>> I've got a text file where each line is a record. For each record, I
>> need to process a file in HDFS.
>>
>> So if I represent these records as an RDD and invoke a map()
>> operation on them how can I access the HDFS within that map()? Do I have 
>> to
>> create a Spark context within map() or is there a better solution to 
>> that?
>>
>> Thank you,
>> Saliya
>>
>>
>>
>> --
>> Saliya Ekanayake
>> Ph.D. Candidate | Research Assistant
>> School of Informatics and Computing | Digital Science Center
>> Indiana University, Bloomington
>>
>>
>
>
> --
> Saliya Ekanayake
> Ph.D. Candidate | Research Assistant
> School of Informatics and Computing | Digital Science Center
> Indiana University, Bloomington
>
>
>>>
>>>
>>> --
>>> Saliya Ekanayake
>>> Ph.D. Candidate | Research Assistant
>>> School of Informatics and Computing | Digital Science Center
>>> Indiana University, Bloomington
>>>
>>>
>>
>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>
>
>
> --
> Saliya Ekanayake
> Ph.D. Candidate | Research Assistant
> School of Informatics and Computing | Digital Science Center
> Indiana University, Bloomington
>
>


-- 
Best Regards,
Ayan Guha


Re: Access HDFS within Spark Map Operation

2016-09-13 Thread Saliya Ekanayake
1.) What needs to be parallelized is the work for each of those 6M rows,
not the 80K files. Let me elaborate this with a simple for loop if we were
to write this serially.

For each line L out of 6M in the first file{
 process the file corresponding to L out of those 80K files.
}

The 80K files are in HDFS and to read all that content into each worker is
not possible due to size.

2. No. multiple rows may point to rthe same file but they operate on
different records within the file.

3. End goal is to write back 6M processed information.

This is simple map only type scenario. One workaround I can think of is to
append all the 6M records to each of the data files.

Thank you

On Tue, Sep 13, 2016 at 11:25 PM, ayan guha  wrote:

> Question:
>
> 1. Why you can not read all 80K files together? ie, why you have a
> dependency on first text file?
> 2. Your first text file has 6M rows, but total number of files~80K. is
> there a scenario where there may not be a file in HDFS corresponding to the
> row in first text file?
> 3. May be a follow up of 1, what is your end goal?
>
> On Wed, Sep 14, 2016 at 12:17 PM, Saliya Ekanayake 
> wrote:
>
>> The first text file is not that large, it has 6 million records (lines).
>> For each line I need to read a file out of 8 files. They total around
>> 1.5TB. I didn't understand what you meant by "then again read text files
>> for each line and union all rdds."
>>
>> On Tue, Sep 13, 2016 at 10:04 PM, Raghavendra Pandey <
>> raghavendra.pan...@gmail.com> wrote:
>>
>>> How large is your first text file? The idea is you read first text file
>>> and if it is not large you can collect all the lines on driver and then
>>> again read text files for each line and union all rdds.
>>>
>>> On 13 Sep 2016 11:39 p.m., "Saliya Ekanayake"  wrote:
>>>
 Just wonder if this is possible with Spark?

 On Mon, Sep 12, 2016 at 12:14 AM, Saliya Ekanayake 
 wrote:

> Hi,
>
> I've got a text file where each line is a record. For each record, I
> need to process a file in HDFS.
>
> So if I represent these records as an RDD and invoke a map() operation
> on them how can I access the HDFS within that map()? Do I have to create a
> Spark context within map() or is there a better solution to that?
>
> Thank you,
> Saliya
>
>
>
> --
> Saliya Ekanayake
> Ph.D. Candidate | Research Assistant
> School of Informatics and Computing | Digital Science Center
> Indiana University, Bloomington
>
>


 --
 Saliya Ekanayake
 Ph.D. Candidate | Research Assistant
 School of Informatics and Computing | Digital Science Center
 Indiana University, Bloomington


>>
>>
>> --
>> Saliya Ekanayake
>> Ph.D. Candidate | Research Assistant
>> School of Informatics and Computing | Digital Science Center
>> Indiana University, Bloomington
>>
>>
>
>
> --
> Best Regards,
> Ayan Guha
>



-- 
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington


Re: Access HDFS within Spark Map Operation

2016-09-13 Thread ayan guha
Question:

1. Why you can not read all 80K files together? ie, why you have a
dependency on first text file?
2. Your first text file has 6M rows, but total number of files~80K. is
there a scenario where there may not be a file in HDFS corresponding to the
row in first text file?
3. May be a follow up of 1, what is your end goal?

On Wed, Sep 14, 2016 at 12:17 PM, Saliya Ekanayake 
wrote:

> The first text file is not that large, it has 6 million records (lines).
> For each line I need to read a file out of 8 files. They total around
> 1.5TB. I didn't understand what you meant by "then again read text files
> for each line and union all rdds."
>
> On Tue, Sep 13, 2016 at 10:04 PM, Raghavendra Pandey <
> raghavendra.pan...@gmail.com> wrote:
>
>> How large is your first text file? The idea is you read first text file
>> and if it is not large you can collect all the lines on driver and then
>> again read text files for each line and union all rdds.
>>
>> On 13 Sep 2016 11:39 p.m., "Saliya Ekanayake"  wrote:
>>
>>> Just wonder if this is possible with Spark?
>>>
>>> On Mon, Sep 12, 2016 at 12:14 AM, Saliya Ekanayake 
>>> wrote:
>>>
 Hi,

 I've got a text file where each line is a record. For each record, I
 need to process a file in HDFS.

 So if I represent these records as an RDD and invoke a map() operation
 on them how can I access the HDFS within that map()? Do I have to create a
 Spark context within map() or is there a better solution to that?

 Thank you,
 Saliya



 --
 Saliya Ekanayake
 Ph.D. Candidate | Research Assistant
 School of Informatics and Computing | Digital Science Center
 Indiana University, Bloomington


>>>
>>>
>>> --
>>> Saliya Ekanayake
>>> Ph.D. Candidate | Research Assistant
>>> School of Informatics and Computing | Digital Science Center
>>> Indiana University, Bloomington
>>>
>>>
>
>
> --
> Saliya Ekanayake
> Ph.D. Candidate | Research Assistant
> School of Informatics and Computing | Digital Science Center
> Indiana University, Bloomington
>
>


-- 
Best Regards,
Ayan Guha


Can I assign affinity for spark executor processes?

2016-09-13 Thread Xiaoye Sun
Hi,

In my experiment, I pin one very important process on a fixed CPU. So the
performance of Spark task execution will be affected if the executors or
the worker uses that CPU. I am wondering if it is possible to let the Spark
executors not using a particular CPU.

I tried to 'taskset -p [cpumask] [pid]' command to set the affinity of the
Worker process. However, the executor processes created by the worker
process don't inherit the same CPU affinity.

Thanks!

Best,
Xiaoye


Can I assign affinity for spark executor processes?

2016-09-13 Thread Xiaoye Sun
Hi,

In my experiment, I pin one very important process on a fixed CPU. So the
performance of Spark task execution will be affected if the executors or
the worker uses that CPU. I am wondering if it is possible to let the Spark
executors not using a particular CPU.

I tried to 'taskset -p [cpumask] [pid]' command to set the affinity of the
Worker process. However, the executor processes created by the worker
process don't inherit the same CPU affinity.

Thanks!

Best,
Xiaoye


Re: Access HDFS within Spark Map Operation

2016-09-13 Thread Saliya Ekanayake
The first text file is not that large, it has 6 million records (lines).
For each line I need to read a file out of 8 files. They total around
1.5TB. I didn't understand what you meant by "then again read text files
for each line and union all rdds."

On Tue, Sep 13, 2016 at 10:04 PM, Raghavendra Pandey <
raghavendra.pan...@gmail.com> wrote:

> How large is your first text file? The idea is you read first text file
> and if it is not large you can collect all the lines on driver and then
> again read text files for each line and union all rdds.
>
> On 13 Sep 2016 11:39 p.m., "Saliya Ekanayake"  wrote:
>
>> Just wonder if this is possible with Spark?
>>
>> On Mon, Sep 12, 2016 at 12:14 AM, Saliya Ekanayake 
>> wrote:
>>
>>> Hi,
>>>
>>> I've got a text file where each line is a record. For each record, I
>>> need to process a file in HDFS.
>>>
>>> So if I represent these records as an RDD and invoke a map() operation
>>> on them how can I access the HDFS within that map()? Do I have to create a
>>> Spark context within map() or is there a better solution to that?
>>>
>>> Thank you,
>>> Saliya
>>>
>>>
>>>
>>> --
>>> Saliya Ekanayake
>>> Ph.D. Candidate | Research Assistant
>>> School of Informatics and Computing | Digital Science Center
>>> Indiana University, Bloomington
>>>
>>>
>>
>>
>> --
>> Saliya Ekanayake
>> Ph.D. Candidate | Research Assistant
>> School of Informatics and Computing | Digital Science Center
>> Indiana University, Bloomington
>>
>>


-- 
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington


Re: KafkaUtils.createDirectStream() with kafka topic expanded

2016-09-13 Thread Cody Koeninger
That version of createDirectStream doesn't handle partition changes.
You can work around it by starting the job again.

The spark 2.0 consumer for kafka 0.10 should handle partition changes
via SubscribePattern.

On Tue, Sep 13, 2016 at 7:13 PM, vinay gupta
 wrote:
> Hi we are using the following version of KafkaUtils.createDirectStream()
> from spark 1.5.0
>
> createDirectStream(JavaStreamingContext jssc,
> Class keyClass,
> Class valueClass,
> Class keyDecoderClass,
> Class valueDecoderClass,
> Class recordClass,
> java.util.Map kafkaParams,
> java.util.Map fromOffsets,
> Function,R> messageHandler)
>
>
> while the streaming app is running, the kafka topic got expanded by
> increasing the partitions
> from 10 to 20.
>
> The problem is that the running app doesn't change to include the 10 new
> partitions. We have to stop
> the app and feed the fromOffsets map the new partitions and restart.
>
> Is there any way to get this done automatically? Curious to know if you ran
> into same problem and
> whats your solution/workaround?
>
> Thanks
> -Vinay
>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark kafka integration issues

2016-09-13 Thread Cody Koeninger
1.  see 
http://spark.apache.org/docs/latest/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers
 look for HasOffsetRange.  If you really want the info per-message
rather than per-partition, createRDD has an overload that takes a
messageHandler from MessageAndMetadata to whatever you need

2. createRDD takes type parameters for the key and value decoder, so
specify them there

3. you can use spark-streaming-kafka-0-8 against 0.9 or 0.10 brokers.
There is a spark-streaming-kafka-0-10 package with additional features
that only works on brokers 0.10 or higher.  A pull request for
documenting it has been merged, but not deployed.

On Tue, Sep 13, 2016 at 6:46 PM, Mukesh Jha  wrote:
> Hello fellow sparkers,
>
> I'm using spark to consume messages from kafka in a non streaming fashion.
> I'm suing the using spark-streaming-kafka-0-8_2.10 & sparkv2.0to do the
> same.
>
> I have a few queries for the same, please get back if you guys have clues on
> the same.
>
> 1) Is there anyway to get the have the topic and partition & offset
> information for each item from the KafkaRDD. I'm using the
> KafkaUtils.createRDD[String, String, StringDecoder, StringDecoder] to create
> my kafka RDD.
> 2) How to pass my custom Decoder instead of using the String or Byte decoder
> are there any examples for the same?
> 3) is there a newer version to consumer from kafka-0.10 & kafka-0.9 clusters
>
> --
> Thanks & Regards,
>
> Mukesh Jha

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Using Spark SQL to Create JDBC Tables

2016-09-13 Thread ayan guha
One option is have Hive as the central point of exposing data ie create
hive tables which "point to" any other DB. i know Oracle provides there own
Serde for hive. Not sure about PG though.

Once tables are created in hive, STS will automatically see it.

On Wed, Sep 14, 2016 at 11:08 AM, Benjamin Kim  wrote:

> Has anyone created tables using Spark SQL that directly connect to a JDBC
> data source such as PostgreSQL? I would like to use Spark SQL Thriftserver
> to access and query remote PostgreSQL tables. In this way, we can
> centralize data access to Spark SQL tables along with PostgreSQL making it
> very convenient for users. They would not know or care where the data is
> physically located anymore.
>
> By the way, our users only know SQL.
>
> If anyone has a better suggestion, then please let me know too.
>
> Thanks,
> Ben
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


-- 
Best Regards,
Ayan Guha


Re: Access HDFS within Spark Map Operation

2016-09-13 Thread Raghavendra Pandey
How large is your first text file? The idea is you read first text file and
if it is not large you can collect all the lines on driver and then again
read text files for each line and union all rdds.

On 13 Sep 2016 11:39 p.m., "Saliya Ekanayake"  wrote:

> Just wonder if this is possible with Spark?
>
> On Mon, Sep 12, 2016 at 12:14 AM, Saliya Ekanayake 
> wrote:
>
>> Hi,
>>
>> I've got a text file where each line is a record. For each record, I need
>> to process a file in HDFS.
>>
>> So if I represent these records as an RDD and invoke a map() operation on
>> them how can I access the HDFS within that map()? Do I have to create a
>> Spark context within map() or is there a better solution to that?
>>
>> Thank you,
>> Saliya
>>
>>
>>
>> --
>> Saliya Ekanayake
>> Ph.D. Candidate | Research Assistant
>> School of Informatics and Computing | Digital Science Center
>> Indiana University, Bloomington
>>
>>
>
>
> --
> Saliya Ekanayake
> Ph.D. Candidate | Research Assistant
> School of Informatics and Computing | Digital Science Center
> Indiana University, Bloomington
>
>


Shuffle Spill (Memory) greater than Shuffle Spill (Disk)

2016-09-13 Thread prayag chandran
Hello!

In my spark job, I see that Shuffle Spill (Memory) is greater than Shuffle
Spill (Disk). spark.shuffle.compress parameter is left to default(true?). I
would expect the size on disk to be smaller which isn't the case here. I've
been having some performance issues as well and I suspect this is somehow
related to that.

All memory configuration parameters are default. I'm running spark 2.0.
Shuffle Spill (Memory): 712.0 MB
Shuffle Spill (Disk): 7.9 GB

To my surprise, I also see the following for some tasks:
Shuffle Spill (Memory): 0.0 B
Shuffle Spill (Disk): 77.5 MB

I would appreciate if anyone can explain this behavior.

-Prayag


Using Spark SQL to Create JDBC Tables

2016-09-13 Thread Benjamin Kim
Has anyone created tables using Spark SQL that directly connect to a JDBC data 
source such as PostgreSQL? I would like to use Spark SQL Thriftserver to access 
and query remote PostgreSQL tables. In this way, we can centralize data access 
to Spark SQL tables along with PostgreSQL making it very convenient for users. 
They would not know or care where the data is physically located anymore.

By the way, our users only know SQL.

If anyone has a better suggestion, then please let me know too.

Thanks,
Ben
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark SQL Thriftserver

2016-09-13 Thread Takeshi Yamamuro
Hi, all

Spark STS just uses HiveContext inside and does not use MR.
Anyway, Spark STS misses some HiveServer2 functionalities such as HA (See:
https://issues.apache.org/jira/browse/SPARK-11100) and has some known
issues there.
So, you'd better off checking all the jira issues related to STS for
considering the replacement.

// maropu

On Wed, Sep 14, 2016 at 8:55 AM, ayan guha  wrote:

> Hi
>
> AFAIK STS uses Spark SQL and not Map Reduce. Is that not correct?
>
> Best
> Ayan
>
> On Wed, Sep 14, 2016 at 8:51 AM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> STS will rely on Hive execution engine. My Hive uses Spark execution
>> engine so STS will pass the SQL to Hive and let it do the work and return
>> the result set
>>
>>  which beeline
>> /usr/lib/spark-2.0.0-bin-hadoop2.6/bin/beeline
>> ${SPARK_HOME}/bin/beeline -u jdbc:hive2://rhes564:10055 -n hduser -p
>> 
>> Connecting to jdbc:hive2://rhes564:10055
>> Connected to: Spark SQL (version 2.0.0)
>> Driver: Hive JDBC (version 1.2.1.spark2)
>> Transaction isolation: TRANSACTION_REPEATABLE_READ
>> Beeline version 1.2.1.spark2 by Apache Hive
>> 0: jdbc:hive2://rhes564:10055>
>>
>> jdbc:hive2://rhes564:10055> select count(1) from test.prices;
>> Ok I did a simple query in STS, You will this in hive.log
>>
>> 2016-09-13T23:44:50,996 INFO  [pool-4-thread-4]: metastore.HiveMetaStore
>> (HiveMetaStore.java:logInfo(670)) - 4: source:50.140.197.217
>> get_database: test
>> 2016-09-13T23:44:50,996 INFO  [pool-4-thread-4]: HiveMetaStore.audit
>> (HiveMetaStore.java:logAuditEvent(280)) - ugi=hduser
>> ip=50.140.197.217   cmd=source:50.140.197.217 get_database: test
>> 2016-09-13T23:44:50,998 INFO  [pool-4-thread-4]: metastore.HiveMetaStore
>> (HiveMetaStore.java:logInfo(670)) - 4: source:50.140.197.217 get_table :
>> db=test tbl=prices
>> 2016-09-13T23:44:50,998 INFO  [pool-4-thread-4]: HiveMetaStore.audit
>> (HiveMetaStore.java:logAuditEvent(280)) - ugi=hduser
>> ip=50.140.197.217   cmd=source:50.140.197.217 get_table : db=test
>> tbl=prices
>> 2016-09-13T23:44:51,007 INFO  [pool-4-thread-4]: metastore.HiveMetaStore
>> (HiveMetaStore.java:logInfo(670)) - 4: source:50.140.197.217 get_table :
>> db=test tbl=prices
>> 2016-09-13T23:44:51,007 INFO  [pool-4-thread-4]: HiveMetaStore.audit
>> (HiveMetaStore.java:logAuditEvent(280)) - ugi=hduser
>> ip=50.140.197.217   cmd=source:50.140.197.217 get_table : db=test
>> tbl=prices
>> 2016-09-13T23:44:51,021 INFO  [pool-4-thread-4]: metastore.HiveMetaStore
>> (HiveMetaStore.java:logInfo(670)) - 4: source:50.140.197.217
>> get_database: test
>> 2016-09-13T23:44:51,021 INFO  [pool-4-thread-4]: HiveMetaStore.audit
>> (HiveMetaStore.java:logAuditEvent(280)) - ugi=hduser
>> ip=50.140.197.217   cmd=source:50.140.197.217 get_database: test
>> 2016-09-13T23:44:51,023 INFO  [pool-4-thread-4]: metastore.HiveMetaStore
>> (HiveMetaStore.java:logInfo(670)) - 4: source:50.140.197.217 get_table :
>> db=test tbl=prices
>> 2016-09-13T23:44:51,023 INFO  [pool-4-thread-4]: HiveMetaStore.audit
>> (HiveMetaStore.java:logAuditEvent(280)) - ugi=hduser
>> ip=50.140.197.217   cmd=source:50.140.197.217 get_table : db=test
>> tbl=prices
>> 2016-09-13T23:44:51,029 INFO  [pool-4-thread-4]: metastore.HiveMetaStore
>> (HiveMetaStore.java:logInfo(670)) - 4: source:50.140.197.217 get_table :
>> db=test tbl=prices
>> 2016-09-13T23:44:51,029 INFO  [pool-4-thread-4]: HiveMetaStore.audit
>> (HiveMetaStore.java:logAuditEvent(280)) - ugi=hduser
>> ip=50.140.197.217   cmd=source:50.140.197.217 get_table : db=test
>> tbl=prices
>>
>> I think it is a good idea to switch to Spark engine (as opposed to MR).
>> My tests proved that Hive on Spark using DAG and in-memory offering runs at
>> least by order of magnitude faster compared to map-reduce.
>>
>> You can either connect to beeline from $HIVE_HOME/... or beeline from
>> $SPARK_HOME
>>
>> HTH
>>
>>
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>> On 13 September 2016 at 23:28, Benjamin Kim  wrote:
>>
>>> Mich,
>>>
>>> It sounds like that there would be no harm in changing then. Are you
>>> saying that using STS would still use MapReduce to run the SQL statements?
>>> What our users are doing in our CDH 5.7.2 installation is changing the
>>> execution engine to Spark when connected to HiveServer2 to get faster
>>> results. Would they still have to do this using 

Re: What's the best way to find the nearest neighbor in Spark? Any windowing function?

2016-09-13 Thread ayan guha
>>> df.show()

++---+---+-+-+-+
|city|flg| id|  nbr|price|state|
++---+---+-+-+-+
|  CA|  0|  1| 1000|  100|A|
|  CA|  1|  2| 1010|   96|A|
|  CA|  1|  3| 1010|  195|A|
|  NY|  0|  4| 2000|  124|B|
|  NY|  1|  5| 2001|  128|B|
|  NY|  0|  6|3|   24|C|
|  NY|  1|  7|30100|   27|C|
|  NY|  0|  8|30200|   29|C|
|  NY|  1|  9|33000|   39|C|
++---+---+-+-+-+


>>> flg0 = df.filter(df.flg==0)
>>> flg1 = df.filter(df.flg!=0)
>>> flg0.registerTempTable("t_flg0")
>>> flg1.registerTempTable("t_flg1")

>>> j = sqlContext.sql("select *, rank() over (partition by id0 order by
dist) r from (select *,x.id as id0,y.id as id1, abs(x.nbr/1000 -
y.nbr/1000) + abs(x.price/100 - y.price/100) as dist from t_flg0 x inner
join t_flg1 y on (x.city=y.city and x.state=y.state))x ")


>>> j.show()

city flg  id   nbr price state city flg  id   nbr price state id0 id1
dist   r
  *CA* *0* *1* *1000* *100* *A* *  CA* *1* *2* *1010* *96* *A* *1*
*2* *0.05* *1*
  CA 0 1 1000 100 A   CA 1 3 1010 195 A 1 3 0.96 2
  *NY* *0* *4* *2000* *124* *B* *  NY* *1* *5* *2001* *128* *B* *4*
*5* *0.041* *1*
 * NY* *0* *6* *3* *24* *C* *  NY* *1* *7* *30100* *27* *C* *6*
*7* *0.13* *1*
  NY 0 6 3 24 C   NY 1 9 33000 39 C 6 9 3.15 2
  *NY* *0* *8* *30200* *29* *C* *  NY* *1* *7* *30100* *27* *C* *8*
*7* *0.12* *1*
  NY 0 8 30200 29 C   NY 1 9 33000 39 C 8 9 2.9 2
Wherever r=1, you got a match.



On Wed, Sep 14, 2016 at 5:45 AM, Mobius ReX  wrote:

> Hi Sean,
>
> Great!
>
> Is there any sample code implementing Locality Sensitive Hashing with
> Spark, in either scala or python?
>
> "However if your rule is really like "must match column A and B and
> then closest value in column C then just ordering everything by A, B,
> C lets you pretty much read off the answer from the result set
> directly. Everything is closest to one of its two neighbors."
>
> This is interesting since we can use Lead/Lag Windowing function if we
> have only one continuous column. However,
> our rule is "must match column A and B and then closest values in column
> C and D - for any ID with column E = 0, and the closest ID with Column E = 1".
> The distance metric between ID1 (with Column E =0) and ID2 (with Column E
> =1) is defined as
> abs( C1/C1 - C2/C1 ) + abs (D1/D1 - D2/D1)
> One cannot do
> abs( (C1/C1 + D1/D1) - (C2/C1 + D2/ D1) )
>
>
> Any further tips?
>
> Best,
> Rex
>
>
>
> On Tue, Sep 13, 2016 at 11:09 AM, Sean Owen  wrote:
>
>> The key is really to specify the distance metric that defines
>> "closeness" for you. You have features that aren't on the same scale,
>> and some that aren't continuous. You might look to clustering for
>> ideas here, though mostly you just want to normalize the scale of
>> dimensions to make them comparable.
>>
>> You can find nearest neighbors by brute force. If speed really matters
>> you can consider locality sensitive hashing, which isn't that hard to
>> implement and can give a lot of speed for a small cost in accuracy.
>>
>> However if your rule is really like "must match column A and B and
>> then closest value in column C then just ordering everything by A, B,
>> C lets you pretty much read off the answer from the result set
>> directly. Everything is closest to one of its two neighbors.
>>
>> On Tue, Sep 13, 2016 at 6:18 PM, Mobius ReX  wrote:
>> > Given a table
>> >
>> >> $cat data.csv
>> >>
>> >> ID,State,City,Price,Number,Flag
>> >> 1,CA,A,100,1000,0
>> >> 2,CA,A,96,1010,1
>> >> 3,CA,A,195,1010,1
>> >> 4,NY,B,124,2000,0
>> >> 5,NY,B,128,2001,1
>> >> 6,NY,C,24,3,0
>> >> 7,NY,C,27,30100,1
>> >> 8,NY,C,29,30200,0
>> >> 9,NY,C,39,33000,1
>> >
>> >
>> > Expected Result:
>> >
>> > ID0, ID1
>> > 1,2
>> > 4,5
>> > 6,7
>> > 8,7
>> >
>> > for each ID with Flag=0 above, we want to find another ID from Flag=1,
>> with
>> > the same "State" and "City", and the nearest Price and Number
>> normalized by
>> > the corresponding values of that ID with Flag=0.
>> >
>> > For example, ID = 1 and ID=2, has the same State and City, but different
>> > FLAG.
>> > After normalized the Price and Number (Price divided by 100, Number
>> divided
>> > by 1000), the distance between ID=1 and ID=2 is defined as :
>> > abs(100/100 - 96/100) + abs(1000/1000 - 1010/1000) = 0.04 + 0.01 = 0.05
>> >
>> >
>> > What's the best way to find such nearest neighbor? Any valuable tips
>> will be
>> > greatly appreciated!
>> >
>> >
>>
>
>


-- 
Best Regards,
Ayan Guha


KafkaUtils.createDirectStream() with kafka topic expanded

2016-09-13 Thread vinay gupta
Hi we are using the following version of KafkaUtils.createDirectStream() from 
spark 1.5.0
createDirectStream(JavaStreamingContext jssc,   

Class keyClass,
Class valueClass,
Class keyDecoderClass,
Class valueDecoderClass,
Class recordClass,
java.util.Map kafkaParams,
java.util.Map fromOffsets,
Function,R> messageHandler)
while the streaming app is running, the kafka topic got expanded by increasing 
the partitionsfrom 10 to 20.
The problem is that the running app doesn't change to include the 10 new 
partitions. We have to stopthe app and feed the fromOffsets map the new 
partitions and restart.
Is there any way to get this done automatically? Curious to know if you ran 
into same problem andwhats your solution/workaround?
Thanks-Vinay



Re: Spark SQL Thriftserver

2016-09-13 Thread ayan guha
Hi

AFAIK STS uses Spark SQL and not Map Reduce. Is that not correct?

Best
Ayan

On Wed, Sep 14, 2016 at 8:51 AM, Mich Talebzadeh 
wrote:

> STS will rely on Hive execution engine. My Hive uses Spark execution
> engine so STS will pass the SQL to Hive and let it do the work and return
> the result set
>
>  which beeline
> /usr/lib/spark-2.0.0-bin-hadoop2.6/bin/beeline
> ${SPARK_HOME}/bin/beeline -u jdbc:hive2://rhes564:10055 -n hduser -p
> 
> Connecting to jdbc:hive2://rhes564:10055
> Connected to: Spark SQL (version 2.0.0)
> Driver: Hive JDBC (version 1.2.1.spark2)
> Transaction isolation: TRANSACTION_REPEATABLE_READ
> Beeline version 1.2.1.spark2 by Apache Hive
> 0: jdbc:hive2://rhes564:10055>
>
> jdbc:hive2://rhes564:10055> select count(1) from test.prices;
> Ok I did a simple query in STS, You will this in hive.log
>
> 2016-09-13T23:44:50,996 INFO  [pool-4-thread-4]: metastore.HiveMetaStore
> (HiveMetaStore.java:logInfo(670)) - 4: source:50.140.197.217
> get_database: test
> 2016-09-13T23:44:50,996 INFO  [pool-4-thread-4]: HiveMetaStore.audit
> (HiveMetaStore.java:logAuditEvent(280)) - ugi=hduser
> ip=50.140.197.217   cmd=source:50.140.197.217 get_database: test
> 2016-09-13T23:44:50,998 INFO  [pool-4-thread-4]: metastore.HiveMetaStore
> (HiveMetaStore.java:logInfo(670)) - 4: source:50.140.197.217 get_table :
> db=test tbl=prices
> 2016-09-13T23:44:50,998 INFO  [pool-4-thread-4]: HiveMetaStore.audit
> (HiveMetaStore.java:logAuditEvent(280)) - ugi=hduser
> ip=50.140.197.217   cmd=source:50.140.197.217 get_table : db=test
> tbl=prices
> 2016-09-13T23:44:51,007 INFO  [pool-4-thread-4]: metastore.HiveMetaStore
> (HiveMetaStore.java:logInfo(670)) - 4: source:50.140.197.217 get_table :
> db=test tbl=prices
> 2016-09-13T23:44:51,007 INFO  [pool-4-thread-4]: HiveMetaStore.audit
> (HiveMetaStore.java:logAuditEvent(280)) - ugi=hduser
> ip=50.140.197.217   cmd=source:50.140.197.217 get_table : db=test
> tbl=prices
> 2016-09-13T23:44:51,021 INFO  [pool-4-thread-4]: metastore.HiveMetaStore
> (HiveMetaStore.java:logInfo(670)) - 4: source:50.140.197.217
> get_database: test
> 2016-09-13T23:44:51,021 INFO  [pool-4-thread-4]: HiveMetaStore.audit
> (HiveMetaStore.java:logAuditEvent(280)) - ugi=hduser
> ip=50.140.197.217   cmd=source:50.140.197.217 get_database: test
> 2016-09-13T23:44:51,023 INFO  [pool-4-thread-4]: metastore.HiveMetaStore
> (HiveMetaStore.java:logInfo(670)) - 4: source:50.140.197.217 get_table :
> db=test tbl=prices
> 2016-09-13T23:44:51,023 INFO  [pool-4-thread-4]: HiveMetaStore.audit
> (HiveMetaStore.java:logAuditEvent(280)) - ugi=hduser
> ip=50.140.197.217   cmd=source:50.140.197.217 get_table : db=test
> tbl=prices
> 2016-09-13T23:44:51,029 INFO  [pool-4-thread-4]: metastore.HiveMetaStore
> (HiveMetaStore.java:logInfo(670)) - 4: source:50.140.197.217 get_table :
> db=test tbl=prices
> 2016-09-13T23:44:51,029 INFO  [pool-4-thread-4]: HiveMetaStore.audit
> (HiveMetaStore.java:logAuditEvent(280)) - ugi=hduser
> ip=50.140.197.217   cmd=source:50.140.197.217 get_table : db=test
> tbl=prices
>
> I think it is a good idea to switch to Spark engine (as opposed to MR). My
> tests proved that Hive on Spark using DAG and in-memory offering runs at
> least by order of magnitude faster compared to map-reduce.
>
> You can either connect to beeline from $HIVE_HOME/... or beeline from
> $SPARK_HOME
>
> HTH
>
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 13 September 2016 at 23:28, Benjamin Kim  wrote:
>
>> Mich,
>>
>> It sounds like that there would be no harm in changing then. Are you
>> saying that using STS would still use MapReduce to run the SQL statements?
>> What our users are doing in our CDH 5.7.2 installation is changing the
>> execution engine to Spark when connected to HiveServer2 to get faster
>> results. Would they still have to do this using STS? Lastly, we are seeing
>> zombie YARN jobs left behind even after a user disconnects. Are you seeing
>> this happen with STS? If not, then this would be even better.
>>
>> Thanks for your fast reply.
>>
>> Cheers,
>> Ben
>>
>> On Sep 13, 2016, at 3:15 PM, Mich Talebzadeh 
>> wrote:
>>
>> Hi,
>>
>> Spark Thrift server (STS) still uses hive thrift server. If you look at
>> $SPARK_HOME/sbin/start-thriftserver.sh you will see (mine is Spark 2)
>>
>> function usage {
>>   echo "Usage: 

Spark kafka integration issues

2016-09-13 Thread Mukesh Jha
Hello fellow sparkers,

I'm using spark to consume messages from kafka in a non streaming fashion.
I'm suing the using spark-streaming-kafka-0-8_2.10 & sparkv2.0to do the
same.

I have a few queries for the same, please get back if you guys have clues
on the same.

1) Is there anyway to get the have the topic and partition & offset
information for each item from the KafkaRDD. I'm using the
*KafkaUtils.createRDD[String,
String, StringDecoder, StringDecoder]* to create my kafka RDD.
2) How to pass my custom Decoder instead of using the String or Byte
decoder are there any examples for the same?
3) is there a newer version to consumer from kafka-0.10 & kafka-0.9 clusters

-- 
Thanks & Regards,

*Mukesh Jha *


Re: Spark SQL Thriftserver

2016-09-13 Thread Mich Talebzadeh
STS will rely on Hive execution engine. My Hive uses Spark execution engine
so STS will pass the SQL to Hive and let it do the work and return the
result set

 which beeline
/usr/lib/spark-2.0.0-bin-hadoop2.6/bin/beeline
${SPARK_HOME}/bin/beeline -u jdbc:hive2://rhes564:10055 -n hduser -p

Connecting to jdbc:hive2://rhes564:10055
Connected to: Spark SQL (version 2.0.0)
Driver: Hive JDBC (version 1.2.1.spark2)
Transaction isolation: TRANSACTION_REPEATABLE_READ
Beeline version 1.2.1.spark2 by Apache Hive
0: jdbc:hive2://rhes564:10055>

jdbc:hive2://rhes564:10055> select count(1) from test.prices;
Ok I did a simple query in STS, You will this in hive.log

2016-09-13T23:44:50,996 INFO  [pool-4-thread-4]: metastore.HiveMetaStore
(HiveMetaStore.java:logInfo(670)) - 4: source:50.140.197.217 get_database:
test
2016-09-13T23:44:50,996 INFO  [pool-4-thread-4]: HiveMetaStore.audit
(HiveMetaStore.java:logAuditEvent(280)) - ugi=hduser
ip=50.140.197.217   cmd=source:50.140.197.217 get_database: test
2016-09-13T23:44:50,998 INFO  [pool-4-thread-4]: metastore.HiveMetaStore
(HiveMetaStore.java:logInfo(670)) - 4: source:50.140.197.217 get_table :
db=test tbl=prices
2016-09-13T23:44:50,998 INFO  [pool-4-thread-4]: HiveMetaStore.audit
(HiveMetaStore.java:logAuditEvent(280)) - ugi=hduser
ip=50.140.197.217   cmd=source:50.140.197.217 get_table : db=test
tbl=prices
2016-09-13T23:44:51,007 INFO  [pool-4-thread-4]: metastore.HiveMetaStore
(HiveMetaStore.java:logInfo(670)) - 4: source:50.140.197.217 get_table :
db=test tbl=prices
2016-09-13T23:44:51,007 INFO  [pool-4-thread-4]: HiveMetaStore.audit
(HiveMetaStore.java:logAuditEvent(280)) - ugi=hduser
ip=50.140.197.217   cmd=source:50.140.197.217 get_table : db=test
tbl=prices
2016-09-13T23:44:51,021 INFO  [pool-4-thread-4]: metastore.HiveMetaStore
(HiveMetaStore.java:logInfo(670)) - 4: source:50.140.197.217 get_database:
test
2016-09-13T23:44:51,021 INFO  [pool-4-thread-4]: HiveMetaStore.audit
(HiveMetaStore.java:logAuditEvent(280)) - ugi=hduser
ip=50.140.197.217   cmd=source:50.140.197.217 get_database: test
2016-09-13T23:44:51,023 INFO  [pool-4-thread-4]: metastore.HiveMetaStore
(HiveMetaStore.java:logInfo(670)) - 4: source:50.140.197.217 get_table :
db=test tbl=prices
2016-09-13T23:44:51,023 INFO  [pool-4-thread-4]: HiveMetaStore.audit
(HiveMetaStore.java:logAuditEvent(280)) - ugi=hduser
ip=50.140.197.217   cmd=source:50.140.197.217 get_table : db=test
tbl=prices
2016-09-13T23:44:51,029 INFO  [pool-4-thread-4]: metastore.HiveMetaStore
(HiveMetaStore.java:logInfo(670)) - 4: source:50.140.197.217 get_table :
db=test tbl=prices
2016-09-13T23:44:51,029 INFO  [pool-4-thread-4]: HiveMetaStore.audit
(HiveMetaStore.java:logAuditEvent(280)) - ugi=hduser
ip=50.140.197.217   cmd=source:50.140.197.217 get_table : db=test
tbl=prices

I think it is a good idea to switch to Spark engine (as opposed to MR). My
tests proved that Hive on Spark using DAG and in-memory offering runs at
least by order of magnitude faster compared to map-reduce.

You can either connect to beeline from $HIVE_HOME/... or beeline from
$SPARK_HOME

HTH




Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 13 September 2016 at 23:28, Benjamin Kim  wrote:

> Mich,
>
> It sounds like that there would be no harm in changing then. Are you
> saying that using STS would still use MapReduce to run the SQL statements?
> What our users are doing in our CDH 5.7.2 installation is changing the
> execution engine to Spark when connected to HiveServer2 to get faster
> results. Would they still have to do this using STS? Lastly, we are seeing
> zombie YARN jobs left behind even after a user disconnects. Are you seeing
> this happen with STS? If not, then this would be even better.
>
> Thanks for your fast reply.
>
> Cheers,
> Ben
>
> On Sep 13, 2016, at 3:15 PM, Mich Talebzadeh 
> wrote:
>
> Hi,
>
> Spark Thrift server (STS) still uses hive thrift server. If you look at
> $SPARK_HOME/sbin/start-thriftserver.sh you will see (mine is Spark 2)
>
> function usage {
>   echo "Usage: ./sbin/start-thriftserver [options] [thrift server options]"
>   pattern="usage"
>   *pattern+="\|Spark assembly has been built with Hive"*
>   pattern+="\|NOTE: SPARK_PREPEND_CLASSES is set"
>   pattern+="\|Spark Command: "
>   pattern+="\|==="
>   pattern+="\|--help"
>
>
> Indeed when you start STS, you pass hiveconf parameter to it
>
> 

Re: Spark SQL Thriftserver

2016-09-13 Thread Benjamin Kim
Mich,

It sounds like that there would be no harm in changing then. Are you saying 
that using STS would still use MapReduce to run the SQL statements? What our 
users are doing in our CDH 5.7.2 installation is changing the execution engine 
to Spark when connected to HiveServer2 to get faster results. Would they still 
have to do this using STS? Lastly, we are seeing zombie YARN jobs left behind 
even after a user disconnects. Are you seeing this happen with STS? If not, 
then this would be even better.

Thanks for your fast reply.

Cheers,
Ben

> On Sep 13, 2016, at 3:15 PM, Mich Talebzadeh  
> wrote:
> 
> Hi,
> 
> Spark Thrift server (STS) still uses hive thrift server. If you look at 
> $SPARK_HOME/sbin/start-thriftserver.sh you will see (mine is Spark 2)
> 
> function usage {
>   echo "Usage: ./sbin/start-thriftserver [options] [thrift server options]"
>   pattern="usage"
>   pattern+="\|Spark assembly has been built with Hive"
>   pattern+="\|NOTE: SPARK_PREPEND_CLASSES is set"
>   pattern+="\|Spark Command: "
>   pattern+="\|==="
>   pattern+="\|--help"
> 
> 
> Indeed when you start STS, you pass hiveconf parameter to it
> 
> ${SPARK_HOME}/sbin/start-thriftserver.sh \
> --master  \
> --hiveconf hive.server2.thrift.port=10055 \
> 
> and STS bypasses Spark optimiser and uses Hive optimizer and execution 
> engine. You will see this in hive.log file
> 
> So I don't think it is going to give you much difference. Unless they have 
> recently changed the design of STS.
> 
> HTH
> 
> 
> 
> 
> Dr Mich Talebzadeh
>  
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>  
> 
>  
> http://talebzadehmich.wordpress.com 
> 
> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
> damage or destruction of data or any other property which may arise from 
> relying on this email's technical content is explicitly disclaimed. The 
> author will in no case be liable for any monetary damages arising from such 
> loss, damage or destruction.
>  
> 
> On 13 September 2016 at 22:32, Benjamin Kim  > wrote:
> Does anyone have any thoughts about using Spark SQL Thriftserver in Spark 
> 1.6.2 instead of HiveServer2? We are considering abandoning HiveServer2 for 
> it. Some advice and gotcha’s would be nice to know.
> 
> Thanks,
> Ben
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org 
> 
> 
> 



Re: Spark SQL Thriftserver

2016-09-13 Thread Mich Talebzadeh
Hi,

Spark Thrift server (STS) still uses hive thrift server. If you look at
$SPARK_HOME/sbin/start-thriftserver.sh you will see (mine is Spark 2)

function usage {
  echo "Usage: ./sbin/start-thriftserver [options] [thrift server options]"
  pattern="usage"
  *pattern+="\|Spark assembly has been built with Hive"*
  pattern+="\|NOTE: SPARK_PREPEND_CLASSES is set"
  pattern+="\|Spark Command: "
  pattern+="\|==="
  pattern+="\|--help"


Indeed when you start STS, you pass hiveconf parameter to it

${SPARK_HOME}/sbin/start-thriftserver.sh \
--master  \
--hiveconf hive.server2.thrift.port=10055 \

and STS bypasses Spark optimiser and uses Hive optimizer and execution
engine. You will see this in hive.log file

So I don't think it is going to give you much difference. Unless they have
recently changed the design of STS.

HTH




Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 13 September 2016 at 22:32, Benjamin Kim  wrote:

> Does anyone have any thoughts about using Spark SQL Thriftserver in Spark
> 1.6.2 instead of HiveServer2? We are considering abandoning HiveServer2 for
> it. Some advice and gotcha’s would be nice to know.
>
> Thanks,
> Ben
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Spark Java Heap Error

2016-09-13 Thread Baktaawar
this is the settings I have.

# Example:

# spark.master spark://master:7077

# spark.eventLog.enabled   true

# spark.eventLog.dir   hdfs://namenode:8021/directory

# spark.serializer
org.apache.spark.serializer.KryoSerializer

spark.driver.memory  16g

spark.executor.memory2g

spark.driver.maxResultSize   8g

spark.rdd.compress   false

spark.storage.memoryFraction 0.5


Same problem.
ᐧ

On Tue, Sep 13, 2016 at 10:27 AM, Manish Tripathi 
wrote:

> Data set is not big. It is 56K X 9K . It does have column names as long
> strings.
>
> It fits very easily in Pandas. That is also in memory thing. So I am not
> sure if memory is an issue here. If Pandas can fit it very easily and work
> on it very fast then Spark shouldnt have problems too right?
> ᐧ
>
> On Tue, Sep 13, 2016 at 10:24 AM, neil90 [via Apache Spark User List] <
> ml-node+s1001560n27707...@n3.nabble.com> wrote:
>
>> Im assuming the dataset your dealing with is big hence why you wanted to
>> allocate ur full 16gb of Ram to it.
>>
>> I suggest running the python spark-shell as such "pyspark --driver-memory
>> 16g".
>>
>> Also if you cache your data and it doesn't fully fit in memory you can do
>> df.cache(StorageLevel.MEMORY_AND_DISK).
>>
>> --
>> If you reply to this email, your message will be added to the discussion
>> below:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Ja
>> va-Heap-Error-tp27669p27707.html
>> To unsubscribe from Spark Java Heap Error, click here
>> 
>> .
>> NAML
>> 
>>
>
>




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Java-Heap-Error-tp27669p27709.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Spark SQL Thriftserver

2016-09-13 Thread Benjamin Kim
Does anyone have any thoughts about using Spark SQL Thriftserver in Spark 1.6.2 
instead of HiveServer2? We are considering abandoning HiveServer2 for it. Some 
advice and gotcha’s would be nice to know.

Thanks,
Ben
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: [Erorr:]vieiwng Web UI on EMR cluster

2016-09-13 Thread Jonathan Kelly
Yes, Spark on EMR runs on YARN, so there is only a Spark UI when a Spark
app is running. To expand on what Natu says, the best way to view the Spark
UI for both running and completed Spark apps is to start from the YARN
ResourceManager UI (port 8088) and to click the "Application Master" link
(for running apps) or "History" link (for completed apps).

~ Jonathan

On Tue, Sep 13, 2016 at 2:30 AM Natu Lauchande  wrote:

> Hi,
>
> I think the spark UI will be accessible whenever you launch a spark app in
> the cluster it should be the Application Tracker link.
>
>
> Regards,
> Natu
>
> On Tue, Sep 13, 2016 at 9:37 AM, Divya Gehlot 
> wrote:
>
>> Hi ,
>> Thank you all..
>> Hurray ...I am able to view the hadoop web UI now  @ 8088 . even Spark
>> Hisroty server Web UI @ 18080
>> But unable to figure out the Spark UI web port ...
>> Tried with 4044,4040.. ..
>> getting below error
>> This site can’t be reached
>> How can I find out the Spark port ?
>>
>> Would really appreciate the help.
>>
>> Thanks,
>> Divya
>>
>>
>> On 13 September 2016 at 15:09, Divya Gehlot 
>> wrote:
>>
>>> Hi,
>>> Thanks all for your prompt response.
>>> I followed the instruction in the docs EMR SSH tunnel
>>> 
>>> shared by Jonathan.
>>> I am on MAC and set up foxy proxy in my chrome browser
>>>
>>> Divyas-MacBook-Pro:.ssh divyag$ ssh  -N -D 8157
>>> had...@ec2-xx-xxx-xxx-xx.ap-southeast-1.compute.amazonaws.com
>>>
>>> channel 3: open failed: connect failed: Connection refused
>>>
>>> channel 3: open failed: connect failed: Connection refused
>>>
>>> channel 4: open failed: connect failed: Connection refused
>>>
>>> channel 3: open failed: connect failed: Connection refused
>>>
>>> channel 4: open failed: connect failed: Connection refused
>>>
>>> channel 3: open failed: connect failed: Connection refused
>>>
>>> channel 3: open failed: connect failed: Connection refused
>>>
>>> channel 4: open failed: connect failed: Connection refused
>>>
>>> channel 5: open failed: connect failed: Connection refused
>>>
>>> channel 22: open failed: connect failed: Connection refused
>>>
>>> channel 23: open failed: connect failed: Connection refused
>>>
>>> channel 22: open failed: connect failed: Connection refused
>>>
>>> channel 23: open failed: connect failed: Connection refused
>>>
>>> channel 22: open failed: connect failed: Connection refused
>>>
>>> channel 8: open failed: administratively prohibited: open failed
>>>
>>>
>>> What am I missing now ?
>>>
>>>
>>> Thanks,
>>>
>>> Divya
>>>
>>> On 13 September 2016 at 14:23, Jonathan Kelly 
>>> wrote:
>>>
 I would not recommend opening port 50070 on your cluster, as that would
 give the entire world access to your data on HDFS. Instead, you should
 follow the instructions found here to create a secure tunnel to the
 cluster, through which you can proxy requests to the UIs using a browser
 plugin like FoxyProxy:
 https://docs.aws.amazon.com/ElasticMapReduce/latest/ManagementGuide/emr-ssh-tunnel.html

 ~ Jonathan

 On Mon, Sep 12, 2016 at 10:40 PM Mohammad Tariq 
 wrote:

> Hi Divya,
>
> Do you you have inbounds enabled on port 50070 of your NN machine.
> Also, it's a good idea to have the public DNS in your /etc/hosts for 
> proper
> name resolution.
>
>
> [image: --]
>
> Tariq, Mohammad
> [image: https://]about.me/mti
>
> 
>
>
>
>
> [image: http://] 
> Tariq, Mohammad
> about.me/mti
> [image: http://]
> 
>
> On Tue, Sep 13, 2016 at 9:28 AM, Divya Gehlot  > wrote:
>
>> Hi,
>> I am on EMR 4.7 with Spark 1.6.1   and Hadoop 2.7.2
>> When I am trying to view Any of the web UI of the cluster either
>> hadoop or Spark ,I am getting below error
>> "
>> This site can’t be reached
>>
>> "
>> Has anybody using EMR and able to view WebUI .
>> Could you please share the steps.
>>
>> Would really appreciate the help.
>>
>> Thanks,
>> Divya
>>
>
>
>>>
>>
>


Re: What's the best way to find the nearest neighbor in Spark? Any windowing function?

2016-09-13 Thread Mobius ReX
Hi Sean,

Now let's assume we have column C and column D normalized, and the metric
is simplified to
abs( C1 - C2 ) + abs (D1 - D2)

Can we benefit the performance from LSH?

Thank you again!

Best,
Rex


On Tue, Sep 13, 2016 at 12:47 PM, Sean Owen  wrote:

> Given the nature of your metric, I don't think you can use things like
> LSH which more or less depend on a continuous metric space. This is
> too specific to fit into a general framework usefully I think, but, I
> think you can solve this directly with some code without much trouble.
>
> On Tue, Sep 13, 2016 at 8:45 PM, Mobius ReX  wrote:
> > Hi Sean,
> >
> > Great!
> >
> > Is there any sample code implementing Locality Sensitive Hashing with
> Spark,
> > in either scala or python?
> >
> > "However if your rule is really like "must match column A and B and
> > then closest value in column C then just ordering everything by A, B,
> > C lets you pretty much read off the answer from the result set
> > directly. Everything is closest to one of its two neighbors."
> >
> > This is interesting since we can use Lead/Lag Windowing function if we
> have
> > only one continuous column. However,
> > our rule is "must match column A and B and then closest values in column
> C
> > and D - for any ID with column E = 0, and the closest ID with Column E =
> 1".
> > The distance metric between ID1 (with Column E =0) and ID2 (with Column E
> > =1) is defined as
> > abs( C1/C1 - C2/C1 ) + abs (D1/D1 - D2/D1)
> > One cannot do
> > abs( (C1/C1 + D1/D1) - (C2/C1 + D2/ D1) )
> >
> >
> > Any further tips?
> >
> > Best,
> > Rex
> >
> >
> >
> > On Tue, Sep 13, 2016 at 11:09 AM, Sean Owen  wrote:
> >>
> >> The key is really to specify the distance metric that defines
> >> "closeness" for you. You have features that aren't on the same scale,
> >> and some that aren't continuous. You might look to clustering for
> >> ideas here, though mostly you just want to normalize the scale of
> >> dimensions to make them comparable.
> >>
> >> You can find nearest neighbors by brute force. If speed really matters
> >> you can consider locality sensitive hashing, which isn't that hard to
> >> implement and can give a lot of speed for a small cost in accuracy.
> >>
> >> However if your rule is really like "must match column A and B and
> >> then closest value in column C then just ordering everything by A, B,
> >> C lets you pretty much read off the answer from the result set
> >> directly. Everything is closest to one of its two neighbors.
> >>
> >> On Tue, Sep 13, 2016 at 6:18 PM, Mobius ReX  wrote:
> >> > Given a table
> >> >
> >> >> $cat data.csv
> >> >>
> >> >> ID,State,City,Price,Number,Flag
> >> >> 1,CA,A,100,1000,0
> >> >> 2,CA,A,96,1010,1
> >> >> 3,CA,A,195,1010,1
> >> >> 4,NY,B,124,2000,0
> >> >> 5,NY,B,128,2001,1
> >> >> 6,NY,C,24,3,0
> >> >> 7,NY,C,27,30100,1
> >> >> 8,NY,C,29,30200,0
> >> >> 9,NY,C,39,33000,1
> >> >
> >> >
> >> > Expected Result:
> >> >
> >> > ID0, ID1
> >> > 1,2
> >> > 4,5
> >> > 6,7
> >> > 8,7
> >> >
> >> > for each ID with Flag=0 above, we want to find another ID from Flag=1,
> >> > with
> >> > the same "State" and "City", and the nearest Price and Number
> normalized
> >> > by
> >> > the corresponding values of that ID with Flag=0.
> >> >
> >> > For example, ID = 1 and ID=2, has the same State and City, but
> different
> >> > FLAG.
> >> > After normalized the Price and Number (Price divided by 100, Number
> >> > divided
> >> > by 1000), the distance between ID=1 and ID=2 is defined as :
> >> > abs(100/100 - 96/100) + abs(1000/1000 - 1010/1000) = 0.04 + 0.01 =
> 0.05
> >> >
> >> >
> >> > What's the best way to find such nearest neighbor? Any valuable tips
> >> > will be
> >> > greatly appreciated!
> >> >
> >> >
> >
> >
>


Re: What's the best way to find the nearest neighbor in Spark? Any windowing function?

2016-09-13 Thread Sean Owen
Given the nature of your metric, I don't think you can use things like
LSH which more or less depend on a continuous metric space. This is
too specific to fit into a general framework usefully I think, but, I
think you can solve this directly with some code without much trouble.

On Tue, Sep 13, 2016 at 8:45 PM, Mobius ReX  wrote:
> Hi Sean,
>
> Great!
>
> Is there any sample code implementing Locality Sensitive Hashing with Spark,
> in either scala or python?
>
> "However if your rule is really like "must match column A and B and
> then closest value in column C then just ordering everything by A, B,
> C lets you pretty much read off the answer from the result set
> directly. Everything is closest to one of its two neighbors."
>
> This is interesting since we can use Lead/Lag Windowing function if we have
> only one continuous column. However,
> our rule is "must match column A and B and then closest values in column C
> and D - for any ID with column E = 0, and the closest ID with Column E = 1".
> The distance metric between ID1 (with Column E =0) and ID2 (with Column E
> =1) is defined as
> abs( C1/C1 - C2/C1 ) + abs (D1/D1 - D2/D1)
> One cannot do
> abs( (C1/C1 + D1/D1) - (C2/C1 + D2/ D1) )
>
>
> Any further tips?
>
> Best,
> Rex
>
>
>
> On Tue, Sep 13, 2016 at 11:09 AM, Sean Owen  wrote:
>>
>> The key is really to specify the distance metric that defines
>> "closeness" for you. You have features that aren't on the same scale,
>> and some that aren't continuous. You might look to clustering for
>> ideas here, though mostly you just want to normalize the scale of
>> dimensions to make them comparable.
>>
>> You can find nearest neighbors by brute force. If speed really matters
>> you can consider locality sensitive hashing, which isn't that hard to
>> implement and can give a lot of speed for a small cost in accuracy.
>>
>> However if your rule is really like "must match column A and B and
>> then closest value in column C then just ordering everything by A, B,
>> C lets you pretty much read off the answer from the result set
>> directly. Everything is closest to one of its two neighbors.
>>
>> On Tue, Sep 13, 2016 at 6:18 PM, Mobius ReX  wrote:
>> > Given a table
>> >
>> >> $cat data.csv
>> >>
>> >> ID,State,City,Price,Number,Flag
>> >> 1,CA,A,100,1000,0
>> >> 2,CA,A,96,1010,1
>> >> 3,CA,A,195,1010,1
>> >> 4,NY,B,124,2000,0
>> >> 5,NY,B,128,2001,1
>> >> 6,NY,C,24,3,0
>> >> 7,NY,C,27,30100,1
>> >> 8,NY,C,29,30200,0
>> >> 9,NY,C,39,33000,1
>> >
>> >
>> > Expected Result:
>> >
>> > ID0, ID1
>> > 1,2
>> > 4,5
>> > 6,7
>> > 8,7
>> >
>> > for each ID with Flag=0 above, we want to find another ID from Flag=1,
>> > with
>> > the same "State" and "City", and the nearest Price and Number normalized
>> > by
>> > the corresponding values of that ID with Flag=0.
>> >
>> > For example, ID = 1 and ID=2, has the same State and City, but different
>> > FLAG.
>> > After normalized the Price and Number (Price divided by 100, Number
>> > divided
>> > by 1000), the distance between ID=1 and ID=2 is defined as :
>> > abs(100/100 - 96/100) + abs(1000/1000 - 1010/1000) = 0.04 + 0.01 = 0.05
>> >
>> >
>> > What's the best way to find such nearest neighbor? Any valuable tips
>> > will be
>> > greatly appreciated!
>> >
>> >
>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Fw: Spark + Parquet + IBM Block Storage at Bluemix

2016-09-13 Thread Daniel Lopes
Hi Mario,

Thanks for your help, so I will keeping using CSVs

Best,

*Daniel Lopes*
Chief Data and Analytics Officer | OneMatch
c: +55 (18) 99764-2733 | https://www.linkedin.com/in/dslopes

www.onematch.com.br


On Mon, Sep 12, 2016 at 3:39 PM, Mario Ds Briggs 
wrote:

> Daniel,
>
> I believe it is related to https://issues.apache.org/
> jira/browse/SPARK-13979 and happens only when task fails in a executor
> (probably for some other reason u hit the latter in parquet and not csv).
>
> The PR in there, should be shortly available in IBM's Analytics for Spark.
>
>
> thanks
> Mario
>
> [image: Inactive hide details for Adam Roberts---12/09/2016 09:37:21
> pm---Mario, incase you've not seen this...]Adam Roberts---12/09/2016
> 09:37:21 pm---Mario, incase you've not seen this...
>
> From: Adam Roberts/UK/IBM
> To: Mario Ds Briggs/India/IBM@IBMIN
> Date: 12/09/2016 09:37 pm
> Subject: Fw: Spark + Parquet + IBM Block Storage at Bluemix
> --
>
>
> Mario, incase you've not seen this...
>
> --
> *Adam Roberts*
> IBM Spark Team Lead
> Runtime Technologies - Hursley
> - Forwarded by Adam Roberts/UK/IBM on 12/09/2016 17:06 -
>
> From: Daniel Lopes 
> To: Steve Loughran 
> Cc: user 
> Date: 12/09/2016 13:05
> Subject: Re: Spark + Parquet + IBM Block Storage at Bluemix
> --
>
>
>
> Thanks Steve,
>
> But this error occurs only with parquet files, CSVs works.
>
> Best,
>
> *Daniel Lopes*
> Chief Data and Analytics Officer | OneMatch
> c: +55 (18) 99764-2733 | *https://www.linkedin.com/in/dslopes*
> 
>
> *www.onematch.com.br*
> 
>
> On Sun, Sep 11, 2016 at 3:28 PM, Steve Loughran <*ste...@hortonworks.com*
> > wrote:
>
>On 9 Sep 2016, at 17:56, Daniel Lopes <*dan...@onematch.com.br*
>  > wrote:
>
>  Hi, someone can help
>
>  I'm trying to use parquet in IBM Block Storage at Spark but when
>  I try to load get this error:
>
>  using this config
>
>  credentials = {
>"name": "keystone",
>*"auth_url": "**https://identity.open.softlayer.com*
>  *",*
>"project": "object_storage_23f274c1_d11XXXe634",
>"projectId": "XXd9c4aa39b7c7eb",
>"region": "dallas",
>"userId": "X64087180b40X2b909",
>"username": "admin_9dd810f8901d48778XX",
>"password": "chX6_",
>"domainId": "c1ddad17cfcX41",
>"domainName": "10XX",
>"role": "admin"
>  }
>
>  def set_hadoop_config(credentials):
>  """This function sets the Hadoop configuration with given
>  credentials,
>  so it is possible to access data using SparkContext"""
>
>  prefix = "fs.swift.service." + credentials['name']
>  hconf = sc._jsc.hadoopConfiguration()
>  *hconf.set(prefix + ".auth.url",
>  credentials['auth_url']+'/v3/auth/tokens')*
>  hconf.set(prefix + ".auth.endpoint.prefix", "endpoints")
>  hconf.set(prefix + ".tenant", credentials['projectId'])
>  hconf.set(prefix + ".username", credentials['userId'])
>  hconf.set(prefix + ".password", credentials['password'])
>  hconf.setInt(prefix + ".http.port", 8080)
>  hconf.set(prefix + ".region", credentials['region'])
>  hconf.setBoolean(prefix + ".public", True)
>
>  set_hadoop_config(credentials)
>
>  -
>
>  Py4JJavaErrorTraceback (most recent call last)
>   in ()
>  > 1 train.groupby('Acordo').count().show()
>
>  *Py4JJavaError: An error occurred while calling* o406.showString.
>  : org.apache.spark.SparkException: Job aborted due to stage
>  failure: Task 60 in stage 30.0 failed 10 times, most recent failure: 
> Lost
>  task 60.9 in stage 30.0 (TID 2556, yp-spark-dal09-env5-0039):
>  org.apache.hadoop.fs.swift.exceptions.
>  SwiftConfigurationException:* Missing mandatory configuration
>  option: fs.swift.service.keystone.auth.url*
>
>
>In my own code, I'd assume that the value of credentials['name']
>didn't match that of the URL, assuming you have something like
>swift://bucket.keystone . Failing that: the options were set too late.
>
>Instead of asking for the hadoop config and editing that, set the
>option in your spark context, before it is launched, with the prefix
>"hadoop"
>
>at 

Re: What's the best way to find the nearest neighbor in Spark? Any windowing function?

2016-09-13 Thread Mobius ReX
Hi Sean,

Great!

Is there any sample code implementing Locality Sensitive Hashing with
Spark, in either scala or python?

"However if your rule is really like "must match column A and B and
then closest value in column C then just ordering everything by A, B,
C lets you pretty much read off the answer from the result set
directly. Everything is closest to one of its two neighbors."

This is interesting since we can use Lead/Lag Windowing function if we have
only one continuous column. However,
our rule is "must match column A and B and then closest values in column C
and D - for any ID with column E = 0, and the closest ID with Column E = 1".
The distance metric between ID1 (with Column E =0) and ID2 (with Column E
=1) is defined as
abs( C1/C1 - C2/C1 ) + abs (D1/D1 - D2/D1)
One cannot do
abs( (C1/C1 + D1/D1) - (C2/C1 + D2/ D1) )


Any further tips?

Best,
Rex



On Tue, Sep 13, 2016 at 11:09 AM, Sean Owen  wrote:

> The key is really to specify the distance metric that defines
> "closeness" for you. You have features that aren't on the same scale,
> and some that aren't continuous. You might look to clustering for
> ideas here, though mostly you just want to normalize the scale of
> dimensions to make them comparable.
>
> You can find nearest neighbors by brute force. If speed really matters
> you can consider locality sensitive hashing, which isn't that hard to
> implement and can give a lot of speed for a small cost in accuracy.
>
> However if your rule is really like "must match column A and B and
> then closest value in column C then just ordering everything by A, B,
> C lets you pretty much read off the answer from the result set
> directly. Everything is closest to one of its two neighbors.
>
> On Tue, Sep 13, 2016 at 6:18 PM, Mobius ReX  wrote:
> > Given a table
> >
> >> $cat data.csv
> >>
> >> ID,State,City,Price,Number,Flag
> >> 1,CA,A,100,1000,0
> >> 2,CA,A,96,1010,1
> >> 3,CA,A,195,1010,1
> >> 4,NY,B,124,2000,0
> >> 5,NY,B,128,2001,1
> >> 6,NY,C,24,3,0
> >> 7,NY,C,27,30100,1
> >> 8,NY,C,29,30200,0
> >> 9,NY,C,39,33000,1
> >
> >
> > Expected Result:
> >
> > ID0, ID1
> > 1,2
> > 4,5
> > 6,7
> > 8,7
> >
> > for each ID with Flag=0 above, we want to find another ID from Flag=1,
> with
> > the same "State" and "City", and the nearest Price and Number normalized
> by
> > the corresponding values of that ID with Flag=0.
> >
> > For example, ID = 1 and ID=2, has the same State and City, but different
> > FLAG.
> > After normalized the Price and Number (Price divided by 100, Number
> divided
> > by 1000), the distance between ID=1 and ID=2 is defined as :
> > abs(100/100 - 96/100) + abs(1000/1000 - 1010/1000) = 0.04 + 0.01 = 0.05
> >
> >
> > What's the best way to find such nearest neighbor? Any valuable tips
> will be
> > greatly appreciated!
> >
> >
>


Re: Character encoding corruption in Spark JDBC connector

2016-09-13 Thread Sean Owen
Based on your description, this isn't a problem in Spark. It means
your JDBC connector isn't interpreting bytes from the database
according to the encoding in which they were written. It could be
Latin1, sure.

But if "new String(ResultSet.getBytes())" works, it's only because
your platform's default JVM encoding is Latin1 too. Really you need to
specify the encoding directly in that constructor, or else this will
not in general work on other platforms, no.

That's not the solution though; ideally you find the setting that lets
the JDBC connector read the data as intended.

On Tue, Sep 13, 2016 at 8:02 PM, Mark Bittmann  wrote:
> Hello Spark community,
>
> I'm reading from a MySQL database into a Spark dataframe using the JDBC
> connector functionality, and I'm experiencing some character encoding
> issues. The default encoding for MySQL strings is latin1, but the mysql JDBC
> connector implementation of "ResultSet.getString()" will return an mangled
> unicode encoding of the data for certain characters such as the "all rights
> reserved" char. Instead, you can use "new String(ResultSet.getBytes())"
> which will return the correctly encoded string. I've confirmed this behavior
> with the mysql connector classes (i.e., without using the Spark wrapper).
>
> I can see here that the Spark JDBC connector uses getString(), though there
> is a note to move to getBytes() for performance reasons:
>
> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala#L389
>
> For some special chars, I can reverse the behavior with a UDF that applies
> new String(badString.getBytes("Cp1252") , "UTF-8"), however for some foreign
> characters the underlying byte array is irreversibly changed and the data is
> corrupted.
>
> I can submit an issue/PR to fix it going forward if "new
> String(ResultSet.getBytes())" is the correct approach.
>
> Meanwhile, can anyone offer any recommendations on how to correct this
> behavior prior to it getting to a dataframe? I've tried every permutation of
> the settings in the JDBC connection url (characterSetResults,
> characterEncoding).
>
> I'm on Spark 1.6.
>
> Thanks!

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Character encoding corruption in Spark JDBC connector

2016-09-13 Thread Mark Bittmann
Hello Spark community,

I'm reading from a MySQL database into a Spark dataframe using the JDBC
connector functionality, and I'm experiencing some character encoding
issues. The default encoding for MySQL strings is latin1, but the mysql
JDBC connector implementation of "ResultSet.getString()" will return an
mangled unicode encoding of the data for certain characters such as the
"all rights reserved" char. Instead, you can use "new
String(ResultSet.getBytes())" which will return the correctly encoded
string. I've confirmed this behavior with the mysql connector classes
(i.e., without using the Spark wrapper).

I can see here that the Spark JDBC connector uses getString(), though there
is a note to move to getBytes() for performance reasons:

https://github.com/apache/spark/blob/master/sql/core/
src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.
scala#L389

For some special chars, I can reverse the behavior with a UDF that applies
new String(badString.getBytes("Cp1252") , "UTF-8"), however for some
foreign characters the underlying byte array is irreversibly changed and
the data is corrupted.

I can submit an issue/PR to fix it going forward if "new
String(ResultSet.getBytes())" is the correct approach.

Meanwhile, can anyone offer any recommendations on how to correct this
behavior prior to it getting to a dataframe? I've tried every permutation
of the settings in the JDBC connection url (characterSetResults,
characterEncoding).

I'm on Spark 1.6.

Thanks!


Re: Spark 2.0.0 won't let you create a new SparkContext?

2016-09-13 Thread Mark Hamstra
It sounds like you should be writing an application and not trying to force
the spark-shell to do more than what it was intended for.

On Tue, Sep 13, 2016 at 11:53 AM, Kevin Burton  wrote:

> I sort of agree but the problem is that some of this should be code.
>
> Some of our ES indexes have 100-200 columns.
>
> Defining which ones are arrays on the command line is going to get ugly
> fast.
>
>
>
> On Tue, Sep 13, 2016 at 11:50 AM, Sean Owen  wrote:
>
>> You would generally use --conf to set this on the command line if using
>> the shell.
>>
>>
>> On Tue, Sep 13, 2016, 19:22 Kevin Burton  wrote:
>>
>>> The problem is that without a new spark context, with a custom conf,
>>> elasticsearch-hadoop is refusing to read in settings about the ES setup...
>>>
>>> if I do a sc.stop() , then create a new one, it seems to work fine.
>>>
>>> But it isn't really documented anywhere and all the existing
>>> documentation is now invalid because you get an exception when you try to
>>> create a new spark context.
>>>
>>> On Tue, Sep 13, 2016 at 11:13 AM, Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
 I think this works in a shell but you need to allow multiple spark
 contexts

 Spark context Web UI available at http://50.140.197.217:5
 Spark context available as 'sc' (master = local, app id =
 local-1473789661846).
 Spark session available as 'spark'.
 Welcome to
     __
  / __/__  ___ _/ /__
 _\ \/ _ \/ _ `/ __/  '_/
/___/ .__/\_,_/_/ /_/\_\   version 2.0.0
   /_/
 Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java
 1.8.0_77)
 Type in expressions to have them evaluated.
 Type :help for more information.

 scala> import org.apache.spark.SparkContext
 import org.apache.spark.SparkContext
 scala>  val conf = new SparkConf().setMaster("local[2
 ]").setAppName("CountingSheep").
 *set("spark.driver.allowMultipleContexts", "true")*conf:
 org.apache.spark.SparkConf = org.apache.spark.SparkConf@bb5f9d
 scala> val sc = new SparkContext(conf)
 sc: org.apache.spark.SparkContext = org.apache.spark.SparkContext@
 4888425d


 HTH


 Dr Mich Talebzadeh



 LinkedIn * 
 https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
 *



 http://talebzadehmich.wordpress.com


 *Disclaimer:* Use it at your own risk. Any and all responsibility for
 any loss, damage or destruction of data or any other property which may
 arise from relying on this email's technical content is explicitly
 disclaimed. The author will in no case be liable for any monetary damages
 arising from such loss, damage or destruction.



 On 13 September 2016 at 18:57, Sean Owen  wrote:

> But you're in the shell there, which already has a SparkContext for
> you as sc.
>
> On Tue, Sep 13, 2016 at 6:49 PM, Kevin Burton 
> wrote:
>
>> I'm rather confused here as to what to do about creating a new
>> SparkContext.
>>
>> Spark 2.0 prevents it... (exception included below)
>>
>> yet a TON of examples I've seen basically tell you to create a new
>> SparkContext as standard practice:
>>
>> http://spark.apache.org/docs/latest/configuration.html#dynam
>> ically-loading-spark-properties
>>
>> val conf = new SparkConf()
>>  .setMaster("local[2]")
>>  .setAppName("CountingSheep")val sc = new SparkContext(conf)
>>
>>
>> I'm specifically running into a problem in that ES hadoop won't work
>> with its settings and I think its related to this problme.
>>
>> Do we have to call sc.stop() first and THEN create a new spark
>> context?
>>
>> That works,, but I can't find any documentation anywhere telling us
>> the right course of action.
>>
>>
>>
>> scala> val sc = new SparkContext();
>> org.apache.spark.SparkException: Only one SparkContext may be
>> running in this JVM (see SPARK-2243). To ignore this error, set
>> spark.driver.allowMultipleContexts = true. The currently running
>> SparkContext was created at:
>> org.apache.spark.sql.SparkSession$Builder.getOrCreate(
>> SparkSession.scala:823)
>> org.apache.spark.repl.Main$.createSparkSession(Main.scala:95)
>> (:15)
>> (:31)
>> (:33)
>> .(:37)
>> .()
>> .$print$lzycompute(:7)
>> .$print(:6)
>> $print()
>> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce
>> ssorImpl.java:62)
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe
>> 

Re: Spark 2.0.0 won't let you create a new SparkContext?

2016-09-13 Thread Kevin Burton
I sort of agree but the problem is that some of this should be code.

Some of our ES indexes have 100-200 columns.

Defining which ones are arrays on the command line is going to get ugly
fast.



On Tue, Sep 13, 2016 at 11:50 AM, Sean Owen  wrote:

> You would generally use --conf to set this on the command line if using
> the shell.
>
>
> On Tue, Sep 13, 2016, 19:22 Kevin Burton  wrote:
>
>> The problem is that without a new spark context, with a custom conf,
>> elasticsearch-hadoop is refusing to read in settings about the ES setup...
>>
>> if I do a sc.stop() , then create a new one, it seems to work fine.
>>
>> But it isn't really documented anywhere and all the existing
>> documentation is now invalid because you get an exception when you try to
>> create a new spark context.
>>
>> On Tue, Sep 13, 2016 at 11:13 AM, Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> I think this works in a shell but you need to allow multiple spark
>>> contexts
>>>
>>> Spark context Web UI available at http://50.140.197.217:5
>>> Spark context available as 'sc' (master = local, app id =
>>> local-1473789661846).
>>> Spark session available as 'spark'.
>>> Welcome to
>>>     __
>>>  / __/__  ___ _/ /__
>>> _\ \/ _ \/ _ `/ __/  '_/
>>>/___/ .__/\_,_/_/ /_/\_\   version 2.0.0
>>>   /_/
>>> Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java
>>> 1.8.0_77)
>>> Type in expressions to have them evaluated.
>>> Type :help for more information.
>>>
>>> scala> import org.apache.spark.SparkContext
>>> import org.apache.spark.SparkContext
>>> scala>  val conf = new SparkConf().setMaster("local[2]").setAppName("
>>> CountingSheep").
>>> *set("spark.driver.allowMultipleContexts", "true")*conf:
>>> org.apache.spark.SparkConf = org.apache.spark.SparkConf@bb5f9d
>>> scala> val sc = new SparkContext(conf)
>>> sc: org.apache.spark.SparkContext = org.apache.spark.SparkContext@
>>> 4888425d
>>>
>>>
>>> HTH
>>>
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> *
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>> On 13 September 2016 at 18:57, Sean Owen  wrote:
>>>
 But you're in the shell there, which already has a SparkContext for you
 as sc.

 On Tue, Sep 13, 2016 at 6:49 PM, Kevin Burton 
 wrote:

> I'm rather confused here as to what to do about creating a new
> SparkContext.
>
> Spark 2.0 prevents it... (exception included below)
>
> yet a TON of examples I've seen basically tell you to create a new
> SparkContext as standard practice:
>
> http://spark.apache.org/docs/latest/configuration.html#
> dynamically-loading-spark-properties
>
> val conf = new SparkConf()
>  .setMaster("local[2]")
>  .setAppName("CountingSheep")val sc = new SparkContext(conf)
>
>
> I'm specifically running into a problem in that ES hadoop won't work
> with its settings and I think its related to this problme.
>
> Do we have to call sc.stop() first and THEN create a new spark context?
>
> That works,, but I can't find any documentation anywhere telling us
> the right course of action.
>
>
>
> scala> val sc = new SparkContext();
> org.apache.spark.SparkException: Only one SparkContext may be running
> in this JVM (see SPARK-2243). To ignore this error, set 
> spark.driver.allowMultipleContexts
> = true. The currently running SparkContext was created at:
> org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.
> scala:823)
> org.apache.spark.repl.Main$.createSparkSession(Main.scala:95)
> (:15)
> (:31)
> (:33)
> .(:37)
> .()
> .$print$lzycompute(:7)
> .$print(:6)
> $print()
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> sun.reflect.NativeMethodAccessorImpl.invoke(
> NativeMethodAccessorImpl.java:62)
> sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
> java.lang.reflect.Method.invoke(Method.java:497)
> scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:786)
> scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1047)
> scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$
> loadAndRunReq$1.apply(IMain.scala:638)
> 

Re: Spark 2.0.0 won't let you create a new SparkContext?

2016-09-13 Thread Sean Owen
You would generally use --conf to set this on the command line if using the
shell.

On Tue, Sep 13, 2016, 19:22 Kevin Burton  wrote:

> The problem is that without a new spark context, with a custom conf,
> elasticsearch-hadoop is refusing to read in settings about the ES setup...
>
> if I do a sc.stop() , then create a new one, it seems to work fine.
>
> But it isn't really documented anywhere and all the existing documentation
> is now invalid because you get an exception when you try to create a new
> spark context.
>
> On Tue, Sep 13, 2016 at 11:13 AM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> I think this works in a shell but you need to allow multiple spark
>> contexts
>>
>> Spark context Web UI available at http://50.140.197.217:5
>> Spark context available as 'sc' (master = local, app id =
>> local-1473789661846).
>> Spark session available as 'spark'.
>> Welcome to
>>     __
>>  / __/__  ___ _/ /__
>> _\ \/ _ \/ _ `/ __/  '_/
>>/___/ .__/\_,_/_/ /_/\_\   version 2.0.0
>>   /_/
>> Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java
>> 1.8.0_77)
>> Type in expressions to have them evaluated.
>> Type :help for more information.
>>
>> scala> import org.apache.spark.SparkContext
>> import org.apache.spark.SparkContext
>> scala>  val conf = new
>> SparkConf().setMaster("local[2]").setAppName("CountingSheep").
>> *set("spark.driver.allowMultipleContexts", "true")*conf:
>> org.apache.spark.SparkConf = org.apache.spark.SparkConf@bb5f9d
>> scala> val sc = new SparkContext(conf)
>> sc: org.apache.spark.SparkContext = org.apache.spark.SparkContext@4888425d
>>
>>
>> HTH
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>> On 13 September 2016 at 18:57, Sean Owen  wrote:
>>
>>> But you're in the shell there, which already has a SparkContext for you
>>> as sc.
>>>
>>> On Tue, Sep 13, 2016 at 6:49 PM, Kevin Burton 
>>> wrote:
>>>
 I'm rather confused here as to what to do about creating a new
 SparkContext.

 Spark 2.0 prevents it... (exception included below)

 yet a TON of examples I've seen basically tell you to create a new
 SparkContext as standard practice:


 http://spark.apache.org/docs/latest/configuration.html#dynamically-loading-spark-properties

 val conf = new SparkConf()
  .setMaster("local[2]")
  .setAppName("CountingSheep")val sc = new SparkContext(conf)


 I'm specifically running into a problem in that ES hadoop won't work
 with its settings and I think its related to this problme.

 Do we have to call sc.stop() first and THEN create a new spark context?

 That works,, but I can't find any documentation anywhere telling us the
 right course of action.



 scala> val sc = new SparkContext();
 org.apache.spark.SparkException: Only one SparkContext may be running
 in this JVM (see SPARK-2243). To ignore this error, set
 spark.driver.allowMultipleContexts = true. The currently running
 SparkContext was created at:

 org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:823)
 org.apache.spark.repl.Main$.createSparkSession(Main.scala:95)
 (:15)
 (:31)
 (:33)
 .(:37)
 .()
 .$print$lzycompute(:7)
 .$print(:6)
 $print()
 sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 java.lang.reflect.Method.invoke(Method.java:497)
 scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:786)
 scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1047)
 scala.tools.nsc.interpreter.IM
 ain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:638)
 scala.tools.nsc.interpreter.IM
 ain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:637)
 scala.reflect.internal.util.Sc
 alaClassLoader$class.asContext(ScalaClassLoader.scala:31)

 scala.reflect.internal.util.AbstractFileClassLoader.asContext(AbstractFileClassLoader.scala:19)
   at
 org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$2.apply(SparkContext.scala:2221)

Re: Check if a nested column exists in DataFrame

2016-09-13 Thread Arun Patel
Is there a way to check nested column exists from Schema in PySpark?

http://stackoverflow.com/questions/37471346/automatically-and-elegantly-flatten-dataframe-in-spark-sql
shows how to get the list of nested columns in Scala.  But, can this be
done in PySpark?

Please help.

On Mon, Sep 12, 2016 at 5:28 PM, Arun Patel  wrote:

> I'm trying to analyze XML documents using spark-xml package.  Since all
> XML columns are optional, some columns may or may not exist. When I
> register the Dataframe as a table, how do I check if a nested column is
> existing or not? My column name is "emp" which is already exploded and I am
> trying to check if the nested column "emp.mgr.col" exists or not.  If it
> exists, I need to use it.  If it does not exist, I should set it to null.
> Is there a way to achieve this?
>
> Please note I am not able to use .columns method because it does not show
> the nested columns.
>
> Also, note that I  cannot manually specify the schema because of my
> requirement.
>
> I'm trying this in Pyspark.
>
> Thank you.
>


Re: Spark 2.0.0 won't let you create a new SparkContext?

2016-09-13 Thread Kevin Burton
The problem is that without a new spark context, with a custom conf,
elasticsearch-hadoop is refusing to read in settings about the ES setup...

if I do a sc.stop() , then create a new one, it seems to work fine.

But it isn't really documented anywhere and all the existing documentation
is now invalid because you get an exception when you try to create a new
spark context.

On Tue, Sep 13, 2016 at 11:13 AM, Mich Talebzadeh  wrote:

> I think this works in a shell but you need to allow multiple spark contexts
>
> Spark context Web UI available at http://50.140.197.217:5
> Spark context available as 'sc' (master = local, app id =
> local-1473789661846).
> Spark session available as 'spark'.
> Welcome to
>     __
>  / __/__  ___ _/ /__
> _\ \/ _ \/ _ `/ __/  '_/
>/___/ .__/\_,_/_/ /_/\_\   version 2.0.0
>   /_/
> Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java
> 1.8.0_77)
> Type in expressions to have them evaluated.
> Type :help for more information.
>
> scala> import org.apache.spark.SparkContext
> import org.apache.spark.SparkContext
> scala>  val conf = new SparkConf().setMaster("local[2]").setAppName("
> CountingSheep").
> *set("spark.driver.allowMultipleContexts", "true")*conf:
> org.apache.spark.SparkConf = org.apache.spark.SparkConf@bb5f9d
> scala> val sc = new SparkContext(conf)
> sc: org.apache.spark.SparkContext = org.apache.spark.SparkContext@4888425d
>
>
> HTH
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 13 September 2016 at 18:57, Sean Owen  wrote:
>
>> But you're in the shell there, which already has a SparkContext for you
>> as sc.
>>
>> On Tue, Sep 13, 2016 at 6:49 PM, Kevin Burton  wrote:
>>
>>> I'm rather confused here as to what to do about creating a new
>>> SparkContext.
>>>
>>> Spark 2.0 prevents it... (exception included below)
>>>
>>> yet a TON of examples I've seen basically tell you to create a new
>>> SparkContext as standard practice:
>>>
>>> http://spark.apache.org/docs/latest/configuration.html#dynam
>>> ically-loading-spark-properties
>>>
>>> val conf = new SparkConf()
>>>  .setMaster("local[2]")
>>>  .setAppName("CountingSheep")val sc = new SparkContext(conf)
>>>
>>>
>>> I'm specifically running into a problem in that ES hadoop won't work
>>> with its settings and I think its related to this problme.
>>>
>>> Do we have to call sc.stop() first and THEN create a new spark context?
>>>
>>> That works,, but I can't find any documentation anywhere telling us the
>>> right course of action.
>>>
>>>
>>>
>>> scala> val sc = new SparkContext();
>>> org.apache.spark.SparkException: Only one SparkContext may be running
>>> in this JVM (see SPARK-2243). To ignore this error, set
>>> spark.driver.allowMultipleContexts = true. The currently running
>>> SparkContext was created at:
>>> org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkS
>>> ession.scala:823)
>>> org.apache.spark.repl.Main$.createSparkSession(Main.scala:95)
>>> (:15)
>>> (:31)
>>> (:33)
>>> .(:37)
>>> .()
>>> .$print$lzycompute(:7)
>>> .$print(:6)
>>> $print()
>>> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce
>>> ssorImpl.java:62)
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe
>>> thodAccessorImpl.java:43)
>>> java.lang.reflect.Method.invoke(Method.java:497)
>>> scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:786)
>>> scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1047)
>>> scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$lo
>>> adAndRunReq$1.apply(IMain.scala:638)
>>> scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$lo
>>> adAndRunReq$1.apply(IMain.scala:637)
>>> scala.reflect.internal.util.ScalaClassLoader$class.asContext
>>> (ScalaClassLoader.scala:31)
>>> scala.reflect.internal.util.AbstractFileClassLoader.asContex
>>> t(AbstractFileClassLoader.scala:19)
>>>   at org.apache.spark.SparkContext$$anonfun$assertNoOtherContextI
>>> sRunning$2.apply(SparkContext.scala:2221)
>>>   at org.apache.spark.SparkContext$$anonfun$assertNoOtherContextI
>>> sRunning$2.apply(SparkContext.scala:2217)
>>>   at scala.Option.foreach(Option.scala:257)
>>>   at org.apache.spark.SparkContext$.assertNoOtherContextIsRunning
>>> (SparkContext.scala:2217)
>>>   at 

Re: Spark 2.0.0 won't let you create a new SparkContext?

2016-09-13 Thread Mich Talebzadeh
I think this works in a shell but you need to allow multiple spark contexts

Spark context Web UI available at http://50.140.197.217:5
Spark context available as 'sc' (master = local, app id =
local-1473789661846).
Spark session available as 'spark'.
Welcome to
    __
 / __/__  ___ _/ /__
_\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.0.0
  /_/
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java
1.8.0_77)
Type in expressions to have them evaluated.
Type :help for more information.

scala> import org.apache.spark.SparkContext
import org.apache.spark.SparkContext
scala>  val conf = new
SparkConf().setMaster("local[2]").setAppName("CountingSheep").
*set("spark.driver.allowMultipleContexts", "true")*conf:
org.apache.spark.SparkConf = org.apache.spark.SparkConf@bb5f9d
scala> val sc = new SparkContext(conf)
sc: org.apache.spark.SparkContext = org.apache.spark.SparkContext@4888425d


HTH


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 13 September 2016 at 18:57, Sean Owen  wrote:

> But you're in the shell there, which already has a SparkContext for you as
> sc.
>
> On Tue, Sep 13, 2016 at 6:49 PM, Kevin Burton  wrote:
>
>> I'm rather confused here as to what to do about creating a new
>> SparkContext.
>>
>> Spark 2.0 prevents it... (exception included below)
>>
>> yet a TON of examples I've seen basically tell you to create a new
>> SparkContext as standard practice:
>>
>> http://spark.apache.org/docs/latest/configuration.html#dynam
>> ically-loading-spark-properties
>>
>> val conf = new SparkConf()
>>  .setMaster("local[2]")
>>  .setAppName("CountingSheep")val sc = new SparkContext(conf)
>>
>>
>> I'm specifically running into a problem in that ES hadoop won't work with
>> its settings and I think its related to this problme.
>>
>> Do we have to call sc.stop() first and THEN create a new spark context?
>>
>> That works,, but I can't find any documentation anywhere telling us the
>> right course of action.
>>
>>
>>
>> scala> val sc = new SparkContext();
>> org.apache.spark.SparkException: Only one SparkContext may be running in
>> this JVM (see SPARK-2243). To ignore this error, set
>> spark.driver.allowMultipleContexts = true. The currently running
>> SparkContext was created at:
>> org.apache.spark.sql.SparkSession$Builder.getOrCreate(
>> SparkSession.scala:823)
>> org.apache.spark.repl.Main$.createSparkSession(Main.scala:95)
>> (:15)
>> (:31)
>> (:33)
>> .(:37)
>> .()
>> .$print$lzycompute(:7)
>> .$print(:6)
>> $print()
>> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce
>> ssorImpl.java:62)
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe
>> thodAccessorImpl.java:43)
>> java.lang.reflect.Method.invoke(Method.java:497)
>> scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:786)
>> scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1047)
>> scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$lo
>> adAndRunReq$1.apply(IMain.scala:638)
>> scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$lo
>> adAndRunReq$1.apply(IMain.scala:637)
>> scala.reflect.internal.util.ScalaClassLoader$class.asContext
>> (ScalaClassLoader.scala:31)
>> scala.reflect.internal.util.AbstractFileClassLoader.asContex
>> t(AbstractFileClassLoader.scala:19)
>>   at org.apache.spark.SparkContext$$anonfun$assertNoOtherContextI
>> sRunning$2.apply(SparkContext.scala:2221)
>>   at org.apache.spark.SparkContext$$anonfun$assertNoOtherContextI
>> sRunning$2.apply(SparkContext.scala:2217)
>>   at scala.Option.foreach(Option.scala:257)
>>   at org.apache.spark.SparkContext$.assertNoOtherContextIsRunning
>> (SparkContext.scala:2217)
>>   at org.apache.spark.SparkContext$.markPartiallyConstructed(Spar
>> kContext.scala:2290)
>>   at org.apache.spark.SparkContext.(SparkContext.scala:89)
>>   at org.apache.spark.SparkContext.(SparkContext.scala:121)
>>   ... 48 elided
>>
>>
>> --
>>
>> We’re hiring if you know of any awesome Java Devops or Linux Operations
>> Engineers!
>>
>> Founder/CEO Spinn3r.com
>> Location: *San Francisco, CA*
>> blog: http://burtonator.wordpress.com
>> … or check out my Google+ profile
>> 
>>
>>
>


Re: Unable to compare SparkSQL Date columns

2016-09-13 Thread Praseetha
Hi Mich,

Even i'm getting similar output.
The dates that are passed as input are different from the one in the output.
Since its an inner join, the expected result is
[2015-12-31,2015-12-31,1,105]
[2016-01-27,2016-01-27,5,101]

Thanks & Regds,
--Praseetha

On Tue, Sep 13, 2016 at 11:21 PM, Mich Talebzadeh  wrote:

> Hi  Praseetha,
>
> This is how I have written this.
>
> case class TestDate (id: String, loginTime: java.sql.Date)
> val formate = new SimpleDateFormat("-MM-DD")
> val TestDateData = sc.parallelize(List(
> ("1",  new java.sql.Date(formate.parse("2016-01-31").getTime)),
> ("2", new java.sql.Date(formate.parse("2106-01-30").getTime)),
> ("3", new java.sql.Date(formate.parse("2016-01-29").getTime)),
> ("4", new java.sql.Date(formate.parse("2016-01-28").getTime)),
> ("5", new java.sql.Date(formate.parse("2016-01-27").getTime))
> ))
>  val firstPair = TestDateData.map(x => ( new TestDate(x._1, x._2)))
>  val fp = firstPair.toDF
> case class TestDate2 (id2: String, loginTime2: java.sql.Date)
> val TestDateData1 = sc.parallelize(List(
> ("101", new java.sql.Date(formate.parse("2016-01-27").getTime)),
> ("102", new java.sql.Date(formate.parse("2016-01-26").getTime)),
> ("103", new java.sql.Date(formate.parse("2016-01-25").getTime)),
> ("104", new java.sql.Date(formate.parse("2016-01-24").getTime)),
> ("105", new java.sql.Date(formate.parse("2016-01-31").getTime))
> ))
> val secondPair = TestDateData1.map(x => ( new TestDate2(x._1, x._2)))
> val sp = secondPair.toDF
> val rs = 
> fp.join(sp,fp("loginTime")===sp("loginTime2"),"inner").select('loginTime,'loginTime2,
> 'id,'id2).show
>
> This is what I get
>
> [2015-12-27,2015-12-27,1,101]
> [2015-12-27,2015-12-27,1,102]
> [2015-12-27,2015-12-27,1,103]
> [2015-12-27,2015-12-27,1,104]
> [2015-12-27,2015-12-27,1,105]
> [2015-12-27,2015-12-27,3,101]
> [2015-12-27,2015-12-27,3,102]
> [2015-12-27,2015-12-27,3,103]
> [2015-12-27,2015-12-27,3,104]
> [2015-12-27,2015-12-27,3,105]
> [2015-12-27,2015-12-27,4,101]
> [2015-12-27,2015-12-27,4,102]
> [2015-12-27,2015-12-27,4,103]
> [2015-12-27,2015-12-27,4,104]
> [2015-12-27,2015-12-27,4,105]
> [2015-12-27,2015-12-27,5,101]
> [2015-12-27,2015-12-27,5,102]
> [2015-12-27,2015-12-27,5,103]
> [2015-12-27,2015-12-27,5,104]
> [2015-12-27,2015-12-27,5,105]
> rs: Unit = ()
>
>
> Is this what you are expecting?
>
> HTH
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 13 September 2016 at 16:46, Praseetha  wrote:
>
>> Hi Mich,
>>
>> val formate = new SimpleDateFormat("-MM-DD")
>>
>> Thanks & Regds,
>> --Praseetha
>>
>> On Tue, Sep 13, 2016 at 8:50 PM, Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> Hi Praseetha.
>>>
>>> :32: error: not found: value formate
>>> Error occurred in an application involving default arguments.
>>>("1",  new java.sql.Date(formate.parse("2
>>> 016-01-31").getTime)),
>>>
>>> What is that formate?
>>>
>>> Thanks
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> *
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>> On 13 September 2016 at 16:12, Praseetha  wrote:
>>>
 Hi Mich,

 Thanks a lot for your reply.

 Here is the sample

 case class TestDate (id: String, loginTime: java.sql.Date)

 val formate = new SimpleDateFormat("-MM-DD")

 val TestDateData = sc.parallelize(List(
 ("1",  new java.sql.Date(formate.parse("2016-01-31").getTime)),
 ("2", new java.sql.Date(formate.parse("2106-01-30").getTime)),
 ("3", new java.sql.Date(formate.parse("2016-01-29").getTime)),
 ("4", new java.sql.Date(formate.parse("2016-01-28").getTime)),
 ("5", new java.sql.Date(formate.parse("2016-01-27").getTime))
   

Re: What's the best way to find the nearest neighbor in Spark? Any windowing function?

2016-09-13 Thread Sean Owen
The key is really to specify the distance metric that defines
"closeness" for you. You have features that aren't on the same scale,
and some that aren't continuous. You might look to clustering for
ideas here, though mostly you just want to normalize the scale of
dimensions to make them comparable.

You can find nearest neighbors by brute force. If speed really matters
you can consider locality sensitive hashing, which isn't that hard to
implement and can give a lot of speed for a small cost in accuracy.

However if your rule is really like "must match column A and B and
then closest value in column C then just ordering everything by A, B,
C lets you pretty much read off the answer from the result set
directly. Everything is closest to one of its two neighbors.

On Tue, Sep 13, 2016 at 6:18 PM, Mobius ReX  wrote:
> Given a table
>
>> $cat data.csv
>>
>> ID,State,City,Price,Number,Flag
>> 1,CA,A,100,1000,0
>> 2,CA,A,96,1010,1
>> 3,CA,A,195,1010,1
>> 4,NY,B,124,2000,0
>> 5,NY,B,128,2001,1
>> 6,NY,C,24,3,0
>> 7,NY,C,27,30100,1
>> 8,NY,C,29,30200,0
>> 9,NY,C,39,33000,1
>
>
> Expected Result:
>
> ID0, ID1
> 1,2
> 4,5
> 6,7
> 8,7
>
> for each ID with Flag=0 above, we want to find another ID from Flag=1, with
> the same "State" and "City", and the nearest Price and Number normalized by
> the corresponding values of that ID with Flag=0.
>
> For example, ID = 1 and ID=2, has the same State and City, but different
> FLAG.
> After normalized the Price and Number (Price divided by 100, Number divided
> by 1000), the distance between ID=1 and ID=2 is defined as :
> abs(100/100 - 96/100) + abs(1000/1000 - 1010/1000) = 0.04 + 0.01 = 0.05
>
>
> What's the best way to find such nearest neighbor? Any valuable tips will be
> greatly appreciated!
>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Access HDFS within Spark Map Operation

2016-09-13 Thread Saliya Ekanayake
Just wonder if this is possible with Spark?

On Mon, Sep 12, 2016 at 12:14 AM, Saliya Ekanayake 
wrote:

> Hi,
>
> I've got a text file where each line is a record. For each record, I need
> to process a file in HDFS.
>
> So if I represent these records as an RDD and invoke a map() operation on
> them how can I access the HDFS within that map()? Do I have to create a
> Spark context within map() or is there a better solution to that?
>
> Thank you,
> Saliya
>
>
>
> --
> Saliya Ekanayake
> Ph.D. Candidate | Research Assistant
> School of Informatics and Computing | Digital Science Center
> Indiana University, Bloomington
>
>


-- 
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington


Re: Spark_JDBC_Partitions

2016-09-13 Thread Suresh Thalamati
There is also another  jdbc method in  data frame  reader api o specify your 
own predicates for  each partition. Using this you can control what is included 
in  each partition.

val jdbcPartitionWhereClause = Array[String]("id < 100" , "id >=100 and id < 
200")
val df = spark.read.jdbc(
  urlWithUserAndPass,
  "TEST.PEOPLE",
  predicates = jdbcPartitionWhereClause,
  new Properties())


Hope that helps. 
-suresh


> On Sep 13, 2016, at 9:44 AM, Rabin Banerjee  
> wrote:
> 
> Trust me, Only thing that can help you in your situation is SQOOP oracle 
> direct connector which is known as  ORAOOP. Spark cannot do everything , 
> you need a OOZIE workflow which will trigger sqoop job with oracle direct 
> connector to pull the data then spark batch to process .
> 
> Hope it helps !!
> 
> On Tue, Sep 13, 2016 at 6:10 PM, Igor Racic  > wrote:
> Hi, 
> 
> One way can be to use NTILE function to partition data. 
> Example:
> 
> REM Creating test table
> create table Test_part as select * from ( select rownum rn from all_tables t1 
> ) where rn <= 1000;
> 
> REM Partition lines by Oracle block number, 11 partitions in this example. 
> select ntile(11) over( order by dbms_rowid.ROWID_BLOCK_NUMBER( rowid ) ) nt 
> from Test_part
> 
> 
> Let's see distribution: 
> 
> select nt, count(*) from ( select ntile(11) over( order by 
> dbms_rowid.ROWID_BLOCK_NUMBER( rowid ) ) nt from Test_part) group by nt;
> 
> NT   COUNT(*)
> -- --
>  1 10
>  6 10
> 11  9
>  2 10
>  4 10
>  5 10
>  8 10
>  3 10
>  7 10
>  9  9
> 10  9
> 
> 11 rows selected.
> ^^ It looks good. Sure feel free to chose any other condition to order your 
> lines as best suits your case
> 
> So you can 
> 1) have one session reading and then decide where line goes (1 reader )
> 2) Or do multiple reads by specifying partition number. Note that in this 
> case you read whole table n times (in parallel) and is more internsive on 
> read part. (multiple readers)
> 
> Regards, 
> Igor
> 
> 
> 
> 2016-09-11 0:46 GMT+02:00 Mich Talebzadeh  >:
> Good points
> 
> Unfortunately databump. expr, imp use binary format for import and export. 
> that cannot be used to import data into HDFS in a suitable way.
> 
> One can use what is known as flat,sh script to get data out tab or , 
> separated etc.
> 
> ROWNUM is a pseudocolumn (not a real column) that is available in a query. 
> The issue is that in a table of 280Million rows to get the position of the 
> row it will have to do a table scan since no index cannot be built on it 
> (assuming there is no other suitable index). Not ideal but can be done.
> 
> I think a better alternative is to use datapump to take that table to 
> DEV/TEST, add a sequence (like an IDENTITY column in Sybase), build a unique 
> index on the sequence column and do the partitioning there.
> 
> HTH
> 
> 
> 
> 
> 
> Dr Mich Talebzadeh
>  
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>  
> 
>  
> http://talebzadehmich.wordpress.com 
> 
> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
> damage or destruction of data or any other property which may arise from 
> relying on this email's technical content is explicitly disclaimed. The 
> author will in no case be liable for any monetary damages arising from such 
> loss, damage or destruction.
>  
> 
> On 10 September 2016 at 22:37, ayan guha  > wrote:
> In oracle something called row num is present in every row.  You can create 
> an evenly distribution using that column. If it is one time work, try using 
> sqoop. Are you using Oracle's own appliance? Then you can use data pump format
> 
> On 11 Sep 2016 01:59, "Mich Talebzadeh"  > wrote:
> creating an Oracle sequence for a table of 200million is not going to be that 
> easy without changing the schema. It is possible to export that table from 
> prod and import it to DEV/TEST and create the sequence there.
> 
> If it is a FACT table then the foreign keys from the Dimension tables will be 
> bitmap indexes on the FACT table so they can be potentially used.
> 
> HTH
> 
> Dr Mich Talebzadeh
>  
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>  
> 
>  
> http://talebzadehmich.wordpress.com 
> 
> Disclaimer: Use it at your own risk. Any and 

Re: Spark 2.0.0 won't let you create a new SparkContext?

2016-09-13 Thread Marcelo Vanzin
You're running spark-shell. It already creates a SparkContext for you and
makes it available in a variable called "sc".

If you want to change the config of spark-shell's context, you need to use
command line option. (Or stop the existing context first, although I'm not
sure how well that will work.)

On Tue, Sep 13, 2016 at 10:49 AM, Kevin Burton  wrote:

> I'm rather confused here as to what to do about creating a new
> SparkContext.
>
> Spark 2.0 prevents it... (exception included below)
>
> yet a TON of examples I've seen basically tell you to create a new
> SparkContext as standard practice:
>
> http://spark.apache.org/docs/latest/configuration.html#
> dynamically-loading-spark-properties
>
> val conf = new SparkConf()
>  .setMaster("local[2]")
>  .setAppName("CountingSheep")val sc = new SparkContext(conf)
>
>
> I'm specifically running into a problem in that ES hadoop won't work with
> its settings and I think its related to this problme.
>
> Do we have to call sc.stop() first and THEN create a new spark context?
>
> That works,, but I can't find any documentation anywhere telling us the
> right course of action.
>
>
>
> scala> val sc = new SparkContext();
> org.apache.spark.SparkException: Only one SparkContext may be running in
> this JVM (see SPARK-2243). To ignore this error, set 
> spark.driver.allowMultipleContexts
> = true. The currently running SparkContext was created at:
> org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.
> scala:823)
> org.apache.spark.repl.Main$.createSparkSession(Main.scala:95)
> (:15)
> (:31)
> (:33)
> .(:37)
> .()
> .$print$lzycompute(:7)
> .$print(:6)
> $print()
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:
> 62)
> sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
> java.lang.reflect.Method.invoke(Method.java:497)
> scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:786)
> scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1047)
> scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$
> loadAndRunReq$1.apply(IMain.scala:638)
> scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$
> loadAndRunReq$1.apply(IMain.scala:637)
> scala.reflect.internal.util.ScalaClassLoader$class.
> asContext(ScalaClassLoader.scala:31)
> scala.reflect.internal.util.AbstractFileClassLoader.asContext(
> AbstractFileClassLoader.scala:19)
>   at org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$
> 2.apply(SparkContext.scala:2221)
>   at org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$
> 2.apply(SparkContext.scala:2217)
>   at scala.Option.foreach(Option.scala:257)
>   at org.apache.spark.SparkContext$.assertNoOtherContextIsRunning(
> SparkContext.scala:2217)
>   at org.apache.spark.SparkContext$.markPartiallyConstructed(
> SparkContext.scala:2290)
>   at org.apache.spark.SparkContext.(SparkContext.scala:89)
>   at org.apache.spark.SparkContext.(SparkContext.scala:121)
>   ... 48 elided
>
>
> --
>
> We’re hiring if you know of any awesome Java Devops or Linux Operations
> Engineers!
>
> Founder/CEO Spinn3r.com
> Location: *San Francisco, CA*
> blog: http://burtonator.wordpress.com
> … or check out my Google+ profile
> 
>
>


-- 
Marcelo


Re: Spark 2.0.0 won't let you create a new SparkContext?

2016-09-13 Thread Sean Owen
But you're in the shell there, which already has a SparkContext for you as
sc.

On Tue, Sep 13, 2016 at 6:49 PM, Kevin Burton  wrote:

> I'm rather confused here as to what to do about creating a new
> SparkContext.
>
> Spark 2.0 prevents it... (exception included below)
>
> yet a TON of examples I've seen basically tell you to create a new
> SparkContext as standard practice:
>
> http://spark.apache.org/docs/latest/configuration.html#
> dynamically-loading-spark-properties
>
> val conf = new SparkConf()
>  .setMaster("local[2]")
>  .setAppName("CountingSheep")val sc = new SparkContext(conf)
>
>
> I'm specifically running into a problem in that ES hadoop won't work with
> its settings and I think its related to this problme.
>
> Do we have to call sc.stop() first and THEN create a new spark context?
>
> That works,, but I can't find any documentation anywhere telling us the
> right course of action.
>
>
>
> scala> val sc = new SparkContext();
> org.apache.spark.SparkException: Only one SparkContext may be running in
> this JVM (see SPARK-2243). To ignore this error, set 
> spark.driver.allowMultipleContexts
> = true. The currently running SparkContext was created at:
> org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.
> scala:823)
> org.apache.spark.repl.Main$.createSparkSession(Main.scala:95)
> (:15)
> (:31)
> (:33)
> .(:37)
> .()
> .$print$lzycompute(:7)
> .$print(:6)
> $print()
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:
> 62)
> sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
> java.lang.reflect.Method.invoke(Method.java:497)
> scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:786)
> scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1047)
> scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$
> loadAndRunReq$1.apply(IMain.scala:638)
> scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$
> loadAndRunReq$1.apply(IMain.scala:637)
> scala.reflect.internal.util.ScalaClassLoader$class.
> asContext(ScalaClassLoader.scala:31)
> scala.reflect.internal.util.AbstractFileClassLoader.asContext(
> AbstractFileClassLoader.scala:19)
>   at org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$
> 2.apply(SparkContext.scala:2221)
>   at org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$
> 2.apply(SparkContext.scala:2217)
>   at scala.Option.foreach(Option.scala:257)
>   at org.apache.spark.SparkContext$.assertNoOtherContextIsRunning(
> SparkContext.scala:2217)
>   at org.apache.spark.SparkContext$.markPartiallyConstructed(
> SparkContext.scala:2290)
>   at org.apache.spark.SparkContext.(SparkContext.scala:89)
>   at org.apache.spark.SparkContext.(SparkContext.scala:121)
>   ... 48 elided
>
>
> --
>
> We’re hiring if you know of any awesome Java Devops or Linux Operations
> Engineers!
>
> Founder/CEO Spinn3r.com
> Location: *San Francisco, CA*
> blog: http://burtonator.wordpress.com
> … or check out my Google+ profile
> 
>
>


Re: Unable to compare SparkSQL Date columns

2016-09-13 Thread Mich Talebzadeh
Hi  Praseetha,

This is how I have written this.

case class TestDate (id: String, loginTime: java.sql.Date)
val formate = new SimpleDateFormat("-MM-DD")
val TestDateData = sc.parallelize(List(
("1",  new java.sql.Date(formate.parse("2016-01-31").getTime)),
("2", new java.sql.Date(formate.parse("2106-01-30").getTime)),
("3", new java.sql.Date(formate.parse("2016-01-29").getTime)),
("4", new java.sql.Date(formate.parse("2016-01-28").getTime)),
("5", new java.sql.Date(formate.parse("2016-01-27").getTime))
))
 val firstPair = TestDateData.map(x => ( new TestDate(x._1, x._2)))
 val fp = firstPair.toDF
case class TestDate2 (id2: String, loginTime2: java.sql.Date)
val TestDateData1 = sc.parallelize(List(
("101", new java.sql.Date(formate.parse("2016-01-27").getTime)),
("102", new java.sql.Date(formate.parse("2016-01-26").getTime)),
("103", new java.sql.Date(formate.parse("2016-01-25").getTime)),
("104", new java.sql.Date(formate.parse("2016-01-24").getTime)),
("105", new java.sql.Date(formate.parse("2016-01-31").getTime))
))
val secondPair = TestDateData1.map(x => ( new TestDate2(x._1, x._2)))
val sp = secondPair.toDF
val rs = fp.join(sp,fp("loginTime")===
sp("loginTime2"),"inner").select('loginTime,'loginTime2, 'id,'id2).show

This is what I get

[2015-12-27,2015-12-27,1,101]
[2015-12-27,2015-12-27,1,102]
[2015-12-27,2015-12-27,1,103]
[2015-12-27,2015-12-27,1,104]
[2015-12-27,2015-12-27,1,105]
[2015-12-27,2015-12-27,3,101]
[2015-12-27,2015-12-27,3,102]
[2015-12-27,2015-12-27,3,103]
[2015-12-27,2015-12-27,3,104]
[2015-12-27,2015-12-27,3,105]
[2015-12-27,2015-12-27,4,101]
[2015-12-27,2015-12-27,4,102]
[2015-12-27,2015-12-27,4,103]
[2015-12-27,2015-12-27,4,104]
[2015-12-27,2015-12-27,4,105]
[2015-12-27,2015-12-27,5,101]
[2015-12-27,2015-12-27,5,102]
[2015-12-27,2015-12-27,5,103]
[2015-12-27,2015-12-27,5,104]
[2015-12-27,2015-12-27,5,105]
rs: Unit = ()


Is this what you are expecting?

HTH

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 13 September 2016 at 16:46, Praseetha  wrote:

> Hi Mich,
>
> val formate = new SimpleDateFormat("-MM-DD")
>
> Thanks & Regds,
> --Praseetha
>
> On Tue, Sep 13, 2016 at 8:50 PM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> Hi Praseetha.
>>
>> :32: error: not found: value formate
>> Error occurred in an application involving default arguments.
>>("1",  new java.sql.Date(formate.parse("2
>> 016-01-31").getTime)),
>>
>> What is that formate?
>>
>> Thanks
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>> On 13 September 2016 at 16:12, Praseetha  wrote:
>>
>>> Hi Mich,
>>>
>>> Thanks a lot for your reply.
>>>
>>> Here is the sample
>>>
>>> case class TestDate (id: String, loginTime: java.sql.Date)
>>>
>>> val formate = new SimpleDateFormat("-MM-DD")
>>>
>>> val TestDateData = sc.parallelize(List(
>>> ("1",  new java.sql.Date(formate.parse("2016-01-31").getTime)),
>>> ("2", new java.sql.Date(formate.parse("2106-01-30").getTime)),
>>> ("3", new java.sql.Date(formate.parse("2016-01-29").getTime)),
>>> ("4", new java.sql.Date(formate.parse("2016-01-28").getTime)),
>>> ("5", new java.sql.Date(formate.parse("2016-01-27").getTime))
>>> ))
>>> val firstPair = TestDateData.map(x => ( new TestDate(x._1, x._2)))
>>>
>>> val TestDateData1 = sc.parallelize(List(
>>> ("101", new java.sql.Date(formate.parse("2016-01-27").getTime)),
>>> ("102", new java.sql.Date(formate.parse("2016-01-26").getTime)),
>>> ("103", new java.sql.Date(formate.parse("2016-01-25").getTime)),
>>> ("104", new java.sql.Date(formate.parse("2016-01-24").getTime)),
>>> ("105", new java.sql.Date(formate.parse("2016-01-31").getTime))
>>> ))
>>> val 

Spark 2.0.0 won't let you create a new SparkContext?

2016-09-13 Thread Kevin Burton
I'm rather confused here as to what to do about creating a new SparkContext.

Spark 2.0 prevents it... (exception included below)

yet a TON of examples I've seen basically tell you to create a new
SparkContext as standard practice:

http://spark.apache.org/docs/latest/configuration.html#dynamically-loading-spark-properties

val conf = new SparkConf()
 .setMaster("local[2]")
 .setAppName("CountingSheep")val sc = new SparkContext(conf)


I'm specifically running into a problem in that ES hadoop won't work with
its settings and I think its related to this problme.

Do we have to call sc.stop() first and THEN create a new spark context?

That works,, but I can't find any documentation anywhere telling us the
right course of action.



scala> val sc = new SparkContext();
org.apache.spark.SparkException: Only one SparkContext may be running in
this JVM (see SPARK-2243). To ignore this error, set
spark.driver.allowMultipleContexts = true. The currently running
SparkContext was created at:
org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:823)
org.apache.spark.repl.Main$.createSparkSession(Main.scala:95)
(:15)
(:31)
(:33)
.(:37)
.()
.$print$lzycompute(:7)
.$print(:6)
$print()
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:497)
scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:786)
scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1047)
scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:638)
scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:637)
scala.reflect.internal.util.ScalaClassLoader$class.asContext(ScalaClassLoader.scala:31)
scala.reflect.internal.util.AbstractFileClassLoader.asContext(AbstractFileClassLoader.scala:19)
  at
org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$2.apply(SparkContext.scala:2221)
  at
org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$2.apply(SparkContext.scala:2217)
  at scala.Option.foreach(Option.scala:257)
  at
org.apache.spark.SparkContext$.assertNoOtherContextIsRunning(SparkContext.scala:2217)
  at
org.apache.spark.SparkContext$.markPartiallyConstructed(SparkContext.scala:2290)
  at org.apache.spark.SparkContext.(SparkContext.scala:89)
  at org.apache.spark.SparkContext.(SparkContext.scala:121)
  ... 48 elided


-- 

We’re hiring if you know of any awesome Java Devops or Linux Operations
Engineers!

Founder/CEO Spinn3r.com
Location: *San Francisco, CA*
blog: http://burtonator.wordpress.com
… or check out my Google+ profile



Re: Spark Java Heap Error

2016-09-13 Thread Baktaawar
Data set is not big. It is 56K X 9K . It does have column names as long
strings.

It fits very easily in Pandas. That is also in memory thing. So I am not
sure if memory is an issue here. If Pandas can fit it very easily and work
on it very fast then Spark shouldnt have problems too right?
ᐧ

On Tue, Sep 13, 2016 at 10:24 AM, neil90 [via Apache Spark User List] <
ml-node+s1001560n27707...@n3.nabble.com> wrote:

> Im assuming the dataset your dealing with is big hence why you wanted to
> allocate ur full 16gb of Ram to it.
>
> I suggest running the python spark-shell as such "pyspark --driver-memory
> 16g".
>
> Also if you cache your data and it doesn't fully fit in memory you can do
> df.cache(StorageLevel.MEMORY_AND_DISK).
>
> --
> If you reply to this email, your message will be added to the discussion
> below:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-
> Java-Heap-Error-tp27669p27707.html
> To unsubscribe from Spark Java Heap Error, click here
> 
> .
> NAML
> 
>




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Java-Heap-Error-tp27669p27708.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Spark Java Heap Error

2016-09-13 Thread neil90
Im assuming the dataset your dealing with is big hence why you wanted to
allocate ur full 16gb of Ram to it.

I suggest running the python spark-shell as such "pyspark --driver-memory
16g".

Also if you cache your data and it doesn't fully fit in memory you can do
df.cache(StorageLevel.MEMORY_AND_DISK).



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Java-Heap-Error-tp27669p27707.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



What's the best way to find the nearest neighbor in Spark? Any windowing function?

2016-09-13 Thread Mobius ReX
Given a table

> $cat data.csv
>
> ID,State,City,Price,Number,Flag
> 1,CA,A,100,1000,0
> 2,CA,A,96,1010,1
> 3,CA,A,195,1010,1
> 4,NY,B,124,2000,0
> 5,NY,B,128,2001,1
> 6,NY,C,24,3,0
> 7,NY,C,27,30100,1
> 8,NY,C,29,30200,0
> 9,NY,C,39,33000,1


Expected Result:

ID0, ID1
1,2
4,5
6,7
8,7

for each ID with Flag=0 above, we want to find another ID from Flag=1, with
the same "State" and "City", and the nearest Price and Number normalized by
the corresponding values of that ID with Flag=0.

For example, ID = 1 and ID=2, has the same State and City, but different
FLAG.
After normalized the Price and Number (Price divided by 100, Number divided
by 1000), the distance between ID=1 and ID=2 is defined as :
abs(100/100 - 96/100) + abs(1000/1000 - 1010/1000) = 0.04 + 0.01 = 0.05


What's the best way to find such nearest neighbor? Any valuable tips will
be greatly appreciated!


Re: Master OOM in "master-rebuild-ui-thread" while running stream app

2016-09-13 Thread Mariano Semelman
Thanks, I would go with log disabling.
BTW, the master crashed while the application was still running.

--

*Mariano Semelman*
P13N - IT
Av. Corrientes Nº 746 - piso 13 - C.A.B.A. (C1043AAU)
Teléfono (54) 11- *4894-3500*


[image: Seguinos en Twitter!]  [image:
Seguinos en Facebook!]  [image: Seguinos
en YouTube!] 
*Despegar.com*
El mejor precio para tu viaje.

Este mensaje es confidencial y puede contener información amparada por el
secreto profesional. Si usted ha recibido este e-mail por error, por favor
comuníquenoslo inmediatamente respondiendo a este e-mail y luego
eliminándolo de su sistema. El contenido de este mensaje no deberá ser
copiado ni divulgado a ninguna persona.

On 13 September 2016 at 12:52, Bryan Cutler  wrote:

> It looks like you have logging enabled and your application event log is
> too large for the master to build a web UI from it.  In spark 1.6.2 and
> earlier, when an application completes, the master rebuilds a web UI to
> view events after the fact.  This functionality was removed in spark 2.0
> and the history server should be used instead.  If you are unable to
> upgrade could you try disabling logging?
>
> On Sep 13, 2016 7:18 AM, "Mariano Semelman" 
> wrote:
>
>> Hello everybody,
>>
>> I am running a spark streaming app and I am planning to use it as a long
>> running service. However while trying the app in a rc environment I got
>> this exception in the master daemon after 1 hour of running:
>>
>> ​​Exception in thread "master-rebuild-ui-thread"
>> java.lang.OutOfMemoryError: GC overhead limit exceeded
>> at java.util.regex.Pattern.compile(Pattern.java:1667)
>> at java.util.regex.Pattern.(Pattern.java:1351)
>> at java.util.regex.Pattern.compile(Pattern.java:1054)
>> at java.lang.String.replace(String.java:2239)
>> at org.apache.spark.util.Utils$.getFormattedClassName(Utils.sca
>> la:1632)
>> at org.apache.spark.util.JsonProtocol$.sparkEventFromJson(JsonP
>> rotocol.scala:486)
>> at org.apache.spark.scheduler.ReplayListenerBus.replay(ReplayLi
>> stenerBus.scala:58)
>> at org.apache.spark.deploy.master.Master$$anonfun$17.apply(
>> Master.scala:972)
>> at org.apache.spark.deploy.master.Master$$anonfun$17.apply(
>> Master.scala:952)
>> at scala.concurrent.impl.Future$PromiseCompletingRunnable.lifte
>> dTree1$1(Future.scala:24)
>> at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(F
>> uture.scala:24)
>> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
>> Executor.java:1142)
>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
>> lExecutor.java:617)
>> at java.lang.Thread.run(Thread.java:745)
>>
>>
>> As a palliative measure I've increased the master memory to 1.5gb.
>> My job is running with a batch interval of 5 seconds.
>> I'm using spark version 1.6.2.
>>
>> I think it might be related to this issues:
>>
>> https://issues.apache.org/jira/browse/SPARK-6270
>> https://issues.apache.org/jira/browse/SPARK-12062
>> https://issues.apache.org/jira/browse/SPARK-12299
>>
>> But I don't see a clear road to solve this apart from upgrading spark.
>> What would you recommend?
>>
>>
>> Thanks in advance
>> Mariano
>>
>>


Re: Spark_JDBC_Partitions

2016-09-13 Thread Rabin Banerjee
Trust me, Only thing that can help you in your situation is SQOOP oracle
direct connector which is known as  ORAOOP. Spark cannot do everything ,
you need a OOZIE workflow which will trigger sqoop job with oracle direct
connector to pull the data then spark batch to process .

Hope it helps !!

On Tue, Sep 13, 2016 at 6:10 PM, Igor Racic  wrote:

> Hi,
>
> One way can be to use NTILE function to partition data.
> Example:
>
> REM Creating test table
> create table Test_part as select * from ( select rownum rn from all_tables
> t1 ) where rn <= 1000;
>
> REM Partition lines by Oracle block number, 11 partitions in this example.
> select ntile(11) over( order by dbms_rowid.ROWID_BLOCK_NUMBER( rowid ) )
> nt from Test_part
>
>
> Let's see distribution:
>
> select nt, count(*) from ( select ntile(11) over( order by
> dbms_rowid.ROWID_BLOCK_NUMBER( rowid ) ) nt from Test_part) group by nt;
>
> NT   COUNT(*)
> -- --
>  1 10
>  6 10
> 11  9
>  2 10
>  4 10
>  5 10
>  8 10
>  3 10
>  7 10
>  9  9
> 10  9
>
> 11 rows selected.
> ^^ It looks good. Sure feel free to chose any other condition to order
> your lines as best suits your case
>
> So you can
> 1) have one session reading and then decide where line goes (1 reader )
> 2) Or do multiple reads by specifying partition number. Note that in this
> case you read whole table n times (in parallel) and is more internsive on
> read part. (multiple readers)
>
> Regards,
> Igor
>
>
>
> 2016-09-11 0:46 GMT+02:00 Mich Talebzadeh :
>
>> Good points
>>
>> Unfortunately databump. expr, imp use binary format for import and
>> export. that cannot be used to import data into HDFS in a suitable way.
>>
>> One can use what is known as flat,sh script to get data out tab or ,
>> separated etc.
>>
>> ROWNUM is a pseudocolumn (not a real column) that is available in a
>> query. The issue is that in a table of 280Million rows to get the position
>> of the row it will have to do a table scan since no index cannot be built
>> on it (assuming there is no other suitable index). Not ideal but can be
>> done.
>>
>> I think a better alternative is to use datapump to take that table to
>> DEV/TEST, add a sequence (like an IDENTITY column in Sybase), build a
>> unique index on the sequence column and do the partitioning there.
>>
>> HTH
>>
>>
>>
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>> On 10 September 2016 at 22:37, ayan guha  wrote:
>>
>>> In oracle something called row num is present in every row.  You can
>>> create an evenly distribution using that column. If it is one time work,
>>> try using sqoop. Are you using Oracle's own appliance? Then you can use
>>> data pump format
>>> On 11 Sep 2016 01:59, "Mich Talebzadeh" 
>>> wrote:
>>>
 creating an Oracle sequence for a table of 200million is not going to
 be that easy without changing the schema. It is possible to export that
 table from prod and import it to DEV/TEST and create the sequence there.

 If it is a FACT table then the foreign keys from the Dimension tables
 will be bitmap indexes on the FACT table so they can be potentially used.

 HTH

 Dr Mich Talebzadeh



 LinkedIn * 
 https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
 *



 http://talebzadehmich.wordpress.com


 *Disclaimer:* Use it at your own risk. Any and all responsibility for
 any loss, damage or destruction of data or any other property which may
 arise from relying on this email's technical content is explicitly
 disclaimed. The author will in no case be liable for any monetary damages
 arising from such loss, damage or destruction.



 On 10 September 2016 at 16:42, Takeshi Yamamuro 
 wrote:

> Hi,
>
> Yea, spark does not have the same functionality with sqoop.
> I think one of simple solutions is to assign unique ids on the oracle
> table by yourself.
> Thought?
>
> // maropu
>
>
> On 

Re: Spark Java Heap Error

2016-09-13 Thread Baktaawar
I put driver memory as 6gb instead of 8(half of 16). But does 2 gb make
this difference?

On Tuesday, September 13, 2016, neil90 [via Apache Spark User List] <
ml-node+s1001560n27704...@n3.nabble.com> wrote:

> Double check your Driver Memory in your Spark Web UI make sure the driver
> Memory is close to half of 16gb available.
>
> --
> If you reply to this email, your message will be added to the discussion
> below:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-
> Java-Heap-Error-tp27669p27704.html
> To unsubscribe from Spark Java Heap Error, click here
> 
> .
> NAML
> 
>




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Java-Heap-Error-tp27669p27705.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Spark Java Heap Error

2016-09-13 Thread neil90
Double check your Driver Memory in your Spark Web UI make sure the driver
Memory is close to half of 16gb available.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Java-Heap-Error-tp27669p27704.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Master OOM in "master-rebuild-ui-thread" while running stream app

2016-09-13 Thread Bryan Cutler
It looks like you have logging enabled and your application event log is
too large for the master to build a web UI from it.  In spark 1.6.2 and
earlier, when an application completes, the master rebuilds a web UI to
view events after the fact.  This functionality was removed in spark 2.0
and the history server should be used instead.  If you are unable to
upgrade could you try disabling logging?

On Sep 13, 2016 7:18 AM, "Mariano Semelman" 
wrote:

> Hello everybody,
>
> I am running a spark streaming app and I am planning to use it as a long
> running service. However while trying the app in a rc environment I got
> this exception in the master daemon after 1 hour of running:
>
> ​​Exception in thread "master-rebuild-ui-thread"
> java.lang.OutOfMemoryError: GC overhead limit exceeded
> at java.util.regex.Pattern.compile(Pattern.java:1667)
> at java.util.regex.Pattern.(Pattern.java:1351)
> at java.util.regex.Pattern.compile(Pattern.java:1054)
> at java.lang.String.replace(String.java:2239)
> at org.apache.spark.util.Utils$.getFormattedClassName(Utils.
> scala:1632)
> at org.apache.spark.util.JsonProtocol$.sparkEventFromJson(
> JsonProtocol.scala:486)
> at org.apache.spark.scheduler.ReplayListenerBus.replay(
> ReplayListenerBus.scala:58)
> at org.apache.spark.deploy.master.Master$$anonfun$17.
> apply(Master.scala:972)
> at org.apache.spark.deploy.master.Master$$anonfun$17.
> apply(Master.scala:952)
> at scala.concurrent.impl.Future$PromiseCompletingRunnable.
> liftedTree1$1(Future.scala:24)
> at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(
> Future.scala:24)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
>
> As a palliative measure I've increased the master memory to 1.5gb.
> My job is running with a batch interval of 5 seconds.
> I'm using spark version 1.6.2.
>
> I think it might be related to this issues:
>
> https://issues.apache.org/jira/browse/SPARK-6270
> https://issues.apache.org/jira/browse/SPARK-12062
> https://issues.apache.org/jira/browse/SPARK-12299
>
> But I don't see a clear road to solve this apart from upgrading spark.
> What would you recommend?
>
>
> Thanks in advance
> Mariano
>
>


Re: Unable to compare SparkSQL Date columns

2016-09-13 Thread Mich Talebzadeh
Hi Praseetha.

:32: error: not found: value formate
Error occurred in an application involving default arguments.
   ("1",  new
java.sql.Date(formate.parse("2016-01-31").getTime)),

What is that formate?

Thanks

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 13 September 2016 at 16:12, Praseetha  wrote:

> Hi Mich,
>
> Thanks a lot for your reply.
>
> Here is the sample
>
> case class TestDate (id: String, loginTime: java.sql.Date)
>
> val formate = new SimpleDateFormat("-MM-DD")
>
> val TestDateData = sc.parallelize(List(
> ("1",  new java.sql.Date(formate.parse("2016-01-31").getTime)),
> ("2", new java.sql.Date(formate.parse("2106-01-30").getTime)),
> ("3", new java.sql.Date(formate.parse("2016-01-29").getTime)),
> ("4", new java.sql.Date(formate.parse("2016-01-28").getTime)),
> ("5", new java.sql.Date(formate.parse("2016-01-27").getTime))
> ))
> val firstPair = TestDateData.map(x => ( new TestDate(x._1, x._2)))
>
> val TestDateData1 = sc.parallelize(List(
> ("101", new java.sql.Date(formate.parse("2016-01-27").getTime)),
> ("102", new java.sql.Date(formate.parse("2016-01-26").getTime)),
> ("103", new java.sql.Date(formate.parse("2016-01-25").getTime)),
> ("104", new java.sql.Date(formate.parse("2016-01-24").getTime)),
> ("105", new java.sql.Date(formate.parse("2016-01-31").getTime))
> ))
> val secondPair = TestDateData1.map(x => ( new TestDate(x._1, x._2)))
>
>firstPair.toDF.registerTempTable("firstTable")
>secondPair.toDF.registerTempTable("secondTable")
>
>val res = sqlContext.sql("select * from firstTable INNER JOIN
> secondTable on firstTable.loginTime = secondTable.loginTime")
>
>
> I tried the following query,
> sqlContext.sql("select loginTime from firstTable")
> Even this query gives the wrong dates.
>
> Regds,
> --Praseetha
>
> On Tue, Sep 13, 2016 at 6:33 PM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> Can you send the rdds that just creates those two dates?
>>
>> HTH
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>> On 13 September 2016 at 13:54, Praseetha  wrote:
>>
>>>
>>> Hi All,
>>>
>>> I have a case class in scala case class TestDate (id: String, loginTime:
>>> java.sql.Date)
>>>
>>> I created 2 RDD's of type TestDate
>>>
>>> I wanted to do an inner join on two rdd's where the values of loginTime
>>> column is equal. Please find the code snippet below,
>>>
>>> firstRDD.toDF.registerTempTable("firstTable")
>>> secondRDD.toDF.registerTempTable("secondTable")
>>> val res = sqlContext.sql("select * from firstTable INNER JOIN secondTable 
>>> on to_date(firstTable.loginTime) = to_date(secondTable.loginTime)")
>>>
>>> I'm not getting any exception. But i'm not getting correct answer too.
>>> It does a cartesian and some random dates are generated in the result.
>>>
>>>
>>> Regds,
>>> --Praseetha
>>>
>>
>>
>


Re: Unable to compare SparkSQL Date columns

2016-09-13 Thread Praseetha
Hi Mich,

Thanks a lot for your reply.

Here is the sample

case class TestDate (id: String, loginTime: java.sql.Date)

val formate = new SimpleDateFormat("-MM-DD")

val TestDateData = sc.parallelize(List(
("1",  new java.sql.Date(formate.parse("2016-01-31").getTime)),
("2", new java.sql.Date(formate.parse("2106-01-30").getTime)),
("3", new java.sql.Date(formate.parse("2016-01-29").getTime)),
("4", new java.sql.Date(formate.parse("2016-01-28").getTime)),
("5", new java.sql.Date(formate.parse("2016-01-27").getTime))
))
val firstPair = TestDateData.map(x => ( new TestDate(x._1, x._2)))

val TestDateData1 = sc.parallelize(List(
("101", new java.sql.Date(formate.parse("2016-01-27").getTime)),
("102", new java.sql.Date(formate.parse("2016-01-26").getTime)),
("103", new java.sql.Date(formate.parse("2016-01-25").getTime)),
("104", new java.sql.Date(formate.parse("2016-01-24").getTime)),
("105", new java.sql.Date(formate.parse("2016-01-31").getTime))
))
val secondPair = TestDateData1.map(x => ( new TestDate(x._1, x._2)))

   firstPair.toDF.registerTempTable("firstTable")
   secondPair.toDF.registerTempTable("secondTable")

   val res = sqlContext.sql("select * from firstTable INNER JOIN
secondTable on firstTable.loginTime = secondTable.loginTime")


I tried the following query,
sqlContext.sql("select loginTime from firstTable")
Even this query gives the wrong dates.

Regds,
--Praseetha

On Tue, Sep 13, 2016 at 6:33 PM, Mich Talebzadeh 
wrote:

> Can you send the rdds that just creates those two dates?
>
> HTH
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 13 September 2016 at 13:54, Praseetha  wrote:
>
>>
>> Hi All,
>>
>> I have a case class in scala case class TestDate (id: String, loginTime:
>> java.sql.Date)
>>
>> I created 2 RDD's of type TestDate
>>
>> I wanted to do an inner join on two rdd's where the values of loginTime
>> column is equal. Please find the code snippet below,
>>
>> firstRDD.toDF.registerTempTable("firstTable")
>> secondRDD.toDF.registerTempTable("secondTable")
>> val res = sqlContext.sql("select * from firstTable INNER JOIN secondTable on 
>> to_date(firstTable.loginTime) = to_date(secondTable.loginTime)")
>>
>> I'm not getting any exception. But i'm not getting correct answer too. It
>> does a cartesian and some random dates are generated in the result.
>>
>>
>> Regds,
>> --Praseetha
>>
>
>


Spark SQL - Applying transformation on a struct inside an array

2016-09-13 Thread Olivier Girardot
Hi everyone,I'm currently trying to create a generic transformation mecanism on
a Dataframe to modify an arbitrary column regardless of the underlying the
schema.
It's "relatively" straightforward for complex types like struct> to
apply an arbitrary UDF on the column and replace the data "inside" the struct,
however I'm struggling to make it work for complex types containing arrays along
the way like struct>>.
Michael Armbrust seemed to allude on the mailing list/forum to a way of using
Encoders to do that, I'd be interested in any pointers, especially considering
that it's not possible to output any Row or GenericRowWithSchema from a UDF
(thanks to
https://github.com/apache/spark/blob/v2.0.0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala#L657
it seems).
To sum up, I'd like to find a way to apply a transformation on complex nested
datatypes (arrays and struct) on a Dataframe updating the value itself.
Regards,
Olivier Girardot

Re: Spark Streaming - dividing DStream into mini batches

2016-09-13 Thread Daan Debie
Ah, that makes it much clearer, thanks!

It also brings up an additional question: who/what decides on the
partitioning? Does Spark Streaming decide to divide a micro batch/RDD into
more than 1 partition based on size? Or is it something that the "source"
(SocketStream, KafkaStream etc.) decides?

On Tue, Sep 13, 2016 at 4:26 PM, Cody Koeninger  wrote:

> A micro batch is an RDD.
>
> An RDD has partitions, so different executors can work on different
> partitions concurrently.
>
> Don't think of that as multiple micro-batches within a time slot.
> It's one RDD within a time slot, with multiple partitions.
>
> On Tue, Sep 13, 2016 at 9:01 AM, Daan Debie  wrote:
> > Thanks, but that thread does not answer my questions, which are about the
> > distributed nature of RDDs vs the small nature of "micro batches" and on
> how
> > Spark Streaming distributes work.
> >
> > On Tue, Sep 13, 2016 at 3:34 PM, Mich Talebzadeh <
> mich.talebza...@gmail.com>
> > wrote:
> >>
> >> Hi Daan,
> >>
> >> You may find this link Re: Is "spark streaming" streaming or mini-batch?
> >> helpful. This was a thread in this forum not long ago.
> >>
> >> HTH
> >>
> >> Dr Mich Talebzadeh
> >>
> >>
> >>
> >> LinkedIn
> >> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCd
> OABUrV8Pw
> >>
> >>
> >>
> >> http://talebzadehmich.wordpress.com
> >>
> >>
> >> Disclaimer: Use it at your own risk. Any and all responsibility for any
> >> loss, damage or destruction of data or any other property which may
> arise
> >> from relying on this email's technical content is explicitly
> disclaimed. The
> >> author will in no case be liable for any monetary damages arising from
> such
> >> loss, damage or destruction.
> >>
> >>
> >>
> >>
> >> On 13 September 2016 at 14:25, DandyDev  wrote:
> >>>
> >>> Hi all!
> >>>
> >>> When reading about Spark Streaming and its execution model, I see
> >>> diagrams
> >>> like this a lot:
> >>>
> >>>
> >>>  file/n27699/lambda-architecture-with-spark-spark-
> streaming-kafka-cassandra-akka-and-scala-31-638.jpg>
> >>>
> >>> It does a fine job explaining how DStreams consist of micro batches
> that
> >>> are
> >>> basically RDDs. There are however some things I don't understand:
> >>>
> >>> - RDDs are distributed by design, but micro batches are conceptually
> >>> small.
> >>> How/why are these micro batches distributed so that they need to be
> >>> implemented as RDD?
> >>> - The above image doesn't explain how Spark Streaming parallelizes
> data.
> >>> According to the image, a stream of events get broken into micro
> batches
> >>> over the axis of time (time 0 to 1 is a micro batch, time 1 to 2 is a
> >>> micro
> >>> batch, etc.). How does parallelism come into play here? Is it that even
> >>> within a "time slot" (eg. time 0 to 1) there can be so many events,
> that
> >>> multiple micro batches for that time slot will be created and
> distributed
> >>> across the executors?
> >>>
> >>> Clarification would be helpful!
> >>>
> >>> Daan
> >>>
> >>>
> >>>
> >>> --
> >>> View this message in context:
> >>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-
> Streaming-dividing-DStream-into-mini-batches-tp27699.html
> >>> Sent from the Apache Spark User List mailing list archive at
> Nabble.com.
> >>>
> >>> -
> >>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >>>
> >>
> >
>


Re: Spark Streaming - dividing DStream into mini batches

2016-09-13 Thread Cody Koeninger
The DStream implementation decides how to produce an RDD for a time
(this is the compute method)

The RDD implementation decides how to partition things (this is the
getPartitions method)

You can look at those methods in DirectKafkaInputDStream and KafkaRDD
respectively if you want to see an example

On Tue, Sep 13, 2016 at 9:37 AM, Daan Debie  wrote:
> Ah, that makes it much clearer, thanks!
>
> It also brings up an additional question: who/what decides on the
> partitioning? Does Spark Streaming decide to divide a micro batch/RDD into
> more than 1 partition based on size? Or is it something that the "source"
> (SocketStream, KafkaStream etc.) decides?
>
> On Tue, Sep 13, 2016 at 4:26 PM, Cody Koeninger  wrote:
>>
>> A micro batch is an RDD.
>>
>> An RDD has partitions, so different executors can work on different
>> partitions concurrently.
>>
>> Don't think of that as multiple micro-batches within a time slot.
>> It's one RDD within a time slot, with multiple partitions.
>>
>> On Tue, Sep 13, 2016 at 9:01 AM, Daan Debie  wrote:
>> > Thanks, but that thread does not answer my questions, which are about
>> > the
>> > distributed nature of RDDs vs the small nature of "micro batches" and on
>> > how
>> > Spark Streaming distributes work.
>> >
>> > On Tue, Sep 13, 2016 at 3:34 PM, Mich Talebzadeh
>> > 
>> > wrote:
>> >>
>> >> Hi Daan,
>> >>
>> >> You may find this link Re: Is "spark streaming" streaming or
>> >> mini-batch?
>> >> helpful. This was a thread in this forum not long ago.
>> >>
>> >> HTH
>> >>
>> >> Dr Mich Talebzadeh
>> >>
>> >>
>> >>
>> >> LinkedIn
>> >>
>> >> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> >>
>> >>
>> >>
>> >> http://talebzadehmich.wordpress.com
>> >>
>> >>
>> >> Disclaimer: Use it at your own risk. Any and all responsibility for any
>> >> loss, damage or destruction of data or any other property which may
>> >> arise
>> >> from relying on this email's technical content is explicitly
>> >> disclaimed. The
>> >> author will in no case be liable for any monetary damages arising from
>> >> such
>> >> loss, damage or destruction.
>> >>
>> >>
>> >>
>> >>
>> >> On 13 September 2016 at 14:25, DandyDev  wrote:
>> >>>
>> >>> Hi all!
>> >>>
>> >>> When reading about Spark Streaming and its execution model, I see
>> >>> diagrams
>> >>> like this a lot:
>> >>>
>> >>>
>> >>>
>> >>> 
>> >>>
>> >>> It does a fine job explaining how DStreams consist of micro batches
>> >>> that
>> >>> are
>> >>> basically RDDs. There are however some things I don't understand:
>> >>>
>> >>> - RDDs are distributed by design, but micro batches are conceptually
>> >>> small.
>> >>> How/why are these micro batches distributed so that they need to be
>> >>> implemented as RDD?
>> >>> - The above image doesn't explain how Spark Streaming parallelizes
>> >>> data.
>> >>> According to the image, a stream of events get broken into micro
>> >>> batches
>> >>> over the axis of time (time 0 to 1 is a micro batch, time 1 to 2 is a
>> >>> micro
>> >>> batch, etc.). How does parallelism come into play here? Is it that
>> >>> even
>> >>> within a "time slot" (eg. time 0 to 1) there can be so many events,
>> >>> that
>> >>> multiple micro batches for that time slot will be created and
>> >>> distributed
>> >>> across the executors?
>> >>>
>> >>> Clarification would be helpful!
>> >>>
>> >>> Daan
>> >>>
>> >>>
>> >>>
>> >>> --
>> >>> View this message in context:
>> >>>
>> >>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-dividing-DStream-into-mini-batches-tp27699.html
>> >>> Sent from the Apache Spark User List mailing list archive at
>> >>> Nabble.com.
>> >>>
>> >>> -
>> >>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> >>>
>> >>
>> >
>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Streaming - dividing DStream into mini batches

2016-09-13 Thread Cody Koeninger
A micro batch is an RDD.

An RDD has partitions, so different executors can work on different
partitions concurrently.

Don't think of that as multiple micro-batches within a time slot.
It's one RDD within a time slot, with multiple partitions.

On Tue, Sep 13, 2016 at 9:01 AM, Daan Debie  wrote:
> Thanks, but that thread does not answer my questions, which are about the
> distributed nature of RDDs vs the small nature of "micro batches" and on how
> Spark Streaming distributes work.
>
> On Tue, Sep 13, 2016 at 3:34 PM, Mich Talebzadeh 
> wrote:
>>
>> Hi Daan,
>>
>> You may find this link Re: Is "spark streaming" streaming or mini-batch?
>> helpful. This was a thread in this forum not long ago.
>>
>> HTH
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> Disclaimer: Use it at your own risk. Any and all responsibility for any
>> loss, damage or destruction of data or any other property which may arise
>> from relying on this email's technical content is explicitly disclaimed. The
>> author will in no case be liable for any monetary damages arising from such
>> loss, damage or destruction.
>>
>>
>>
>>
>> On 13 September 2016 at 14:25, DandyDev  wrote:
>>>
>>> Hi all!
>>>
>>> When reading about Spark Streaming and its execution model, I see
>>> diagrams
>>> like this a lot:
>>>
>>>
>>> 
>>>
>>> It does a fine job explaining how DStreams consist of micro batches that
>>> are
>>> basically RDDs. There are however some things I don't understand:
>>>
>>> - RDDs are distributed by design, but micro batches are conceptually
>>> small.
>>> How/why are these micro batches distributed so that they need to be
>>> implemented as RDD?
>>> - The above image doesn't explain how Spark Streaming parallelizes data.
>>> According to the image, a stream of events get broken into micro batches
>>> over the axis of time (time 0 to 1 is a micro batch, time 1 to 2 is a
>>> micro
>>> batch, etc.). How does parallelism come into play here? Is it that even
>>> within a "time slot" (eg. time 0 to 1) there can be so many events, that
>>> multiple micro batches for that time slot will be created and distributed
>>> across the executors?
>>>
>>> Clarification would be helpful!
>>>
>>> Daan
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-dividing-DStream-into-mini-batches-tp27699.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Master OOM in "master-rebuild-ui-thread" while running stream app

2016-09-13 Thread Mariano Semelman
Hello everybody,

I am running a spark streaming app and I am planning to use it as a long
running service. However while trying the app in a rc environment I got
this exception in the master daemon after 1 hour of running:

​​Exception in thread "master-rebuild-ui-thread"
java.lang.OutOfMemoryError: GC overhead limit exceeded
at java.util.regex.Pattern.compile(Pattern.java:1667)
at java.util.regex.Pattern.(Pattern.java:1351)
at java.util.regex.Pattern.compile(Pattern.java:1054)
at java.lang.String.replace(String.java:2239)
at
org.apache.spark.util.Utils$.getFormattedClassName(Utils.scala:1632)
at
org.apache.spark.util.JsonProtocol$.sparkEventFromJson(JsonProtocol.scala:486)
at
org.apache.spark.scheduler.ReplayListenerBus.replay(ReplayListenerBus.scala:58)
at
org.apache.spark.deploy.master.Master$$anonfun$17.apply(Master.scala:972)
at
org.apache.spark.deploy.master.Master$$anonfun$17.apply(Master.scala:952)
at
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at
scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)


As a palliative measure I've increased the master memory to 1.5gb.
My job is running with a batch interval of 5 seconds.
I'm using spark version 1.6.2.

I think it might be related to this issues:

https://issues.apache.org/jira/browse/SPARK-6270
https://issues.apache.org/jira/browse/SPARK-12062
https://issues.apache.org/jira/browse/SPARK-12299

But I don't see a clear road to solve this apart from upgrading spark.
What would you recommend?


Thanks in advance
Mariano


Re: Strings not converted when calling Scala code from a PySpark app

2016-09-13 Thread Alexis Seigneurin
Makes sense. Thanks Holden.

Alexis

On Mon, Sep 12, 2016 at 5:28 PM, Holden Karau  wrote:

> Ah yes so the Py4J conversions only apply on the driver program - your
> DStream however is RDDs of pickled objects. If you want to with a transform
> function use Spark SQL transferring DataFrames back and forth between
> Python and Scala spark can be much easier.
>
>
> On Monday, September 12, 2016, Alexis Seigneurin 
> wrote:
>
>> Hi,
>>
>>
>> *TL;DR - I have what looks like a DStream of Strings in a PySpark
>> application. I want to send it as a DStream[String] to a Scala library.
>> Strings are not converted by Py4j, though.*
>>
>>
>> I'm working on a PySpark application that pulls data from Kafka using
>> Spark Streaming. My messages are strings and I would like to call a method
>> in Scala code, passing it a DStream[String] instance. However, I'm unable
>> to receive proper JVM strings in the Scala code. It looks to me like the
>> Python strings are not converted into Java strings but, instead, are
>> serialized.
>>
>> My question would be: how to get Java strings out of the DStream object?
>>
>>
>> Here is the simplest Python code I came up with:
>>
>> from pyspark.streaming import StreamingContext
>> ssc = StreamingContext(sparkContext=sc, batchDuration=int(1))
>>
>> from pyspark.streaming.kafka import KafkaUtils
>> stream = KafkaUtils.createDirectStream(ssc, ["IN"],
>> {"metadata.broker.list": "localhost:9092"})
>> values = stream.map(lambda tuple: tuple[1])
>>
>> ssc._jvm.com.seigneurin.MyPythonHelper.doSomething(values._jdstream)
>>
>> ssc.start()
>>
>>
>> I'm running this code in PySpark, passing it the path to my JAR:
>>
>> pyspark --driver-class-path ~/path/to/my/lib-0.1.1-SNAPSHOT.jar
>>
>>
>> On the Scala side, I have:
>>
>> package com.seigneurin
>>
>> import org.apache.spark.streaming.api.java.JavaDStream
>>
>> object MyPythonHelper {
>>   def doSomething(jdstream: JavaDStream[String]) = {
>> val dstream = jdstream.dstream
>> dstream.foreachRDD(rdd => {
>>   rdd.foreach(println)
>> })
>>   }
>> }
>>
>>
>> Now, let's say I send some data into Kafka:
>>
>> echo 'foo bar' | $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list
>> localhost:9092 --topic IN
>>
>>
>> The println statement in the Scala code prints something that looks like:
>>
>> [B@758aa4d9
>>
>>
>> I expected to get foo bar instead.
>>
>> Now, if I replace the simple println statement in the Scala code with the
>> following:
>>
>> rdd.foreach(v => println(v.getClass.getCanonicalName))
>>
>>
>> I get:
>>
>> java.lang.ClassCastException: [B cannot be cast to java.lang.String
>>
>>
>> This suggests that the strings are actually passed as arrays of bytes.
>>
>> If I simply try to convert this array of bytes into a string (I know I'm
>> not even specifying the encoding):
>>
>>   def doSomething(jdstream: JavaDStream[Array[Byte]]) = {
>> val dstream = jdstream.dstream
>> dstream.foreachRDD(rdd => {
>>   rdd.foreach(bytes => println(new String(bytes)))
>> })
>>   }
>>
>>
>> I get something that looks like (special characters might be stripped
>> off):
>>
>> �]qXfoo barqa.
>>
>>
>> This suggests the Python string was serialized (pickled?). How could I
>> retrieve a proper Java string instead?
>>
>>
>> Thanks,
>> Alexis
>>
>
>
> --
> Cell : 425-233-8271
> Twitter: https://twitter.com/holdenkarau
>
>


-- 

*Alexis Seigneurin*
*Managing Consultant*
(202) 459-1591 <202%20459.1591> - LinkedIn



Rate our service 


Re: Why there is no top method in dataset api

2016-09-13 Thread Jakub Dubovsky
Thanks Sean,

the important part of your answer for me is that orderBy + limit is doing
only "partial sort" because of optimizer. That's what I was missing. I will
give it a try...

J.D.

On Mon, Sep 5, 2016 at 2:26 PM, Sean Owen  wrote:

> ​No, ​
> I'm not advising you to use .rdd, just saying it is possible.
> ​Although I'd only use RDDs if you had a good reason to, given Datasets
> now, they are not gone or even deprecated.​
>
> You do not need to order the whole data set to get the top eleme
> ​nt. That isn't what top does though. You might be interested to look at
> the source code. Nor is it what orderBy does if the optimizer is any good.
>
> ​Computing .rdd doesn't materialize an RDD. It involves some non-zero
> overhead in creating a plan, which should be minor compared to execution.
> So would any computation of "top N" on a Dataset, so I don't think this is
> relevant.
>
>
> ​orderBy + take is already the way to accomplish "Dataset.top". It works
> on Datasets, and therefore DataFrames too, for the reason you give. I'm not
> sure what you're asking there.
>
>
> On Mon, Sep 5, 2016, 13:01 Jakub Dubovsky 
> wrote:
>
>> Thanks Sean,
>>
>> I was under impression that spark creators are trying to persuade user
>> community not to use RDD api directly. Spark summit I attended was full of
>> this. So I am a bit surprised that I hear use-rdd-api as an advice from
>> you. But if this is a way then I have a second question. For conversion
>> from dataset to rdd I would use Dataset.rdd lazy val. Since it is a lazy
>> val it suggests there is some computation going on to create rdd as a copy.
>> The question is how much computationally expansive is this conversion? If
>> there is a significant overhead then it is clear why one would want to have
>> top method directly on Dataset class.
>>
>> Ordering whole dataset only to take first 10 or so top records is not
>> really an acceptable option for us. Comparison function can be expansive
>> and the size of dataset is (unsurprisingly) big.
>>
>> To be honest I do not really understand what do you mean by b). Since
>> DataFrame is now only an alias for Dataset[Row] what do you mean by
>> "DataFrame-like counterpart"?
>>
>> Thanks
>>
>> On Thu, Sep 1, 2016 at 2:31 PM, Sean Owen  wrote:
>>
>>> You can always call .rdd.top(n) of course. Although it's slightly
>>> clunky, you can also .orderBy($"value".desc).take(n). Maybe there's an
>>> easier way.
>>>
>>> I don't think if there's a strong reason other than it wasn't worth it
>>> to write this and many other utility wrappers that a) already exist on
>>> the underlying RDD API if you want them, and b) have a DataFrame-like
>>> counterpart already that doesn't really need wrapping in a different
>>> API.
>>>
>>> On Thu, Sep 1, 2016 at 12:53 PM, Jakub Dubovsky
>>>  wrote:
>>> > Hey all,
>>> >
>>> > in RDD api there is very usefull method called top. It finds top n
>>> records
>>> > in according to certain ordering without sorting all records. Very
>>> usefull!
>>> >
>>> > There is no top method nor similar functionality in Dataset api. Has
>>> anybody
>>> > any clue why? Is there any specific reason for this?
>>> >
>>> > Any thoughts?
>>> >
>>> > thanks
>>> >
>>> > Jakub D.
>>>
>>
>>


Re: Spark Streaming - dividing DStream into mini batches

2016-09-13 Thread Daan Debie
Thanks, but that thread does not answer my questions, which are about the
distributed nature of RDDs vs the small nature of "micro batches" and on
how Spark Streaming distributes work.

On Tue, Sep 13, 2016 at 3:34 PM, Mich Talebzadeh 
wrote:

> Hi Daan,
>
> You may find this link Re: Is "spark streaming" streaming or mini-batch?
> 
> helpful. This was a thread in this forum not long ago.
>
> HTH
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 13 September 2016 at 14:25, DandyDev  wrote:
>
>> Hi all!
>>
>> When reading about Spark Streaming and its execution model, I see diagrams
>> like this a lot:
>>
>> > n27699/lambda-architecture-with-spark-spark-streaming-
>> kafka-cassandra-akka-and-scala-31-638.jpg>
>>
>> It does a fine job explaining how DStreams consist of micro batches that
>> are
>> basically RDDs. There are however some things I don't understand:
>>
>> - RDDs are distributed by design, but micro batches are conceptually
>> small.
>> How/why are these micro batches distributed so that they need to be
>> implemented as RDD?
>> - The above image doesn't explain how Spark Streaming parallelizes data.
>> According to the image, a stream of events get broken into micro batches
>> over the axis of time (time 0 to 1 is a micro batch, time 1 to 2 is a
>> micro
>> batch, etc.). How does parallelism come into play here? Is it that even
>> within a "time slot" (eg. time 0 to 1) there can be so many events, that
>> multiple micro batches for that time slot will be created and distributed
>> across the executors?
>>
>> Clarification would be helpful!
>>
>> Daan
>>
>>
>>
>> --
>> View this message in context: http://apache-spark-user-list.
>> 1001560.n3.nabble.com/Spark-Streaming-dividing-DStream-into-
>> mini-batches-tp27699.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>


Fetching Hive table data from external cluster

2016-09-13 Thread Satish Chandra J
HI All,
Currently using Spark 14.2 version

Please provide inputs if anyone have encountered below mentioned scenario

Fetching Hive table data from external Hadoop cluster into a Dataframe via
Spark Job,

I am interested in having data directly into a Dataframe and apply
transformation on top of it before loading into target table

Thanks in advance for all your support


Regards,
Satish Chandra


Re: Spark Streaming - dividing DStream into mini batches

2016-09-13 Thread Mich Talebzadeh
Hi Daan,

You may find this link Re: Is "spark streaming" streaming or mini-batch?

helpful. This was a thread in this forum not long ago.

HTH

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 13 September 2016 at 14:25, DandyDev  wrote:

> Hi all!
>
> When reading about Spark Streaming and its execution model, I see diagrams
> like this a lot:
>
>  architecture-with-spark-spark-streaming-kafka-cassandra-
> akka-and-scala-31-638.jpg>
>
> It does a fine job explaining how DStreams consist of micro batches that
> are
> basically RDDs. There are however some things I don't understand:
>
> - RDDs are distributed by design, but micro batches are conceptually small.
> How/why are these micro batches distributed so that they need to be
> implemented as RDD?
> - The above image doesn't explain how Spark Streaming parallelizes data.
> According to the image, a stream of events get broken into micro batches
> over the axis of time (time 0 to 1 is a micro batch, time 1 to 2 is a micro
> batch, etc.). How does parallelism come into play here? Is it that even
> within a "time slot" (eg. time 0 to 1) there can be so many events, that
> multiple micro batches for that time slot will be created and distributed
> across the executors?
>
> Clarification would be helpful!
>
> Daan
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Spark-Streaming-dividing-DStream-
> into-mini-batches-tp27699.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Spark Streaming - dividing DStream into mini batches

2016-09-13 Thread DandyDev
Hi all!

When reading about Spark Streaming and its execution model, I see diagrams
like this a lot:


 

It does a fine job explaining how DStreams consist of micro batches that are
basically RDDs. There are however some things I don't understand:

- RDDs are distributed by design, but micro batches are conceptually small.
How/why are these micro batches distributed so that they need to be
implemented as RDD?
- The above image doesn't explain how Spark Streaming parallelizes data.
According to the image, a stream of events get broken into micro batches
over the axis of time (time 0 to 1 is a micro batch, time 1 to 2 is a micro
batch, etc.). How does parallelism come into play here? Is it that even
within a "time slot" (eg. time 0 to 1) there can be so many events, that
multiple micro batches for that time slot will be created and distributed
across the executors?

Clarification would be helpful!

Daan



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-dividing-DStream-into-mini-batches-tp27699.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: LDA spark ML visualization

2016-09-13 Thread janardhan shetty
Any help is appreciated to proceed in this problem.
On Sep 12, 2016 11:45 AM, "janardhan shetty"  wrote:

> Hi,
>
> I am trying to visualize the LDA model developed in spark scala (2.0 ML)
> in LDAvis.
>
> Is there any links to convert the spark model parameters to the following
> 5 params to visualize ?
>
> 1. φ, the K × W matrix containing the estimated probability mass function
> over the W terms in the vocabulary for each of the K topics in the model.
> Note that φkw > 0 for all k ∈ 1...K and all w ∈ 1...W, because of the
> priors. (Although our software allows values of zero due to rounding). Each
> of the K rows of φ must sum to one.
> 2. θ, the D × K matrix containing the estimated probability mass function
> over the K topics in the model for each of the D documents in the corpus.
> Note that θdk > 0 for all d ∈ 1...D and all k ∈ 1...K, because of the
> priors (although, as above, our software accepts zeroes due to rounding).
> Each of the D rows of θ must sum to one.
> 3. nd, the number of tokens observed in document d, where nd is required
> to be an integer greater than zero, for documents d = 1...D. Denoted
> doc.length in our code.
> 4. vocab, the length-W character vector containing the terms in the
> vocabulary (listed in the same order as the columns of φ).
> 5. Mw, the frequency of term w across the entire corpus, where Mw is
> required to be an integer greater than zero for each term w = 1...W.
> Denoted term.frequency in our code.
>


Character encoding corruption in Spark JDBC connector

2016-09-13 Thread Mark Bittmann
Hello Spark community,

I'm reading from a MySQL database into a Spark dataframe using the JDBC
connector functionality, and I'm experiencing some character encoding
issues. The default encoding for MySQL stings is latin1, but the mysql JDBC
connector implementation of "ResultSet.getString()" will return an
incorrect encoding of the data for certain characters such as the "all
rights reserved" char. Instead, you can use "new
String(ResultSet.getBytes())" which will return the correctly encoded
string. I've confirmed this behavior with the mysql connector classes
(i.e., without using the Spark wrapper).

I can see here that the Spark JDBC connector uses getString(), though there
is a note to move to getBytes() for performance reasons:

https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala#L389

For some special chars, I can reverse the behavior with a UDF that applies
new String(badString.getBytes("Cp1252") , "UTF-8"), however for some
languages the underlying byte array is irreversibly changed and the data is
corrupted.

I can submit an issue/PR to fix it going forward if "new
String(ResultSet.getBytes())" is the correct approach.

Meanwhile, can anyone offer any recommendations on how to correct this
behavior prior to it getting to a dataframe? I've tried every permutation
of the settings in the JDBC connection url (characterSetResults,
characterEncoding).

I'm on Spark 1.6.

Thanks!


Re: Unable to compare SparkSQL Date columns

2016-09-13 Thread Mich Talebzadeh
Can you send the rdds that just creates those two dates?

HTH

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 13 September 2016 at 13:54, Praseetha  wrote:

>
> Hi All,
>
> I have a case class in scala case class TestDate (id: String, loginTime:
> java.sql.Date)
>
> I created 2 RDD's of type TestDate
>
> I wanted to do an inner join on two rdd's where the values of loginTime
> column is equal. Please find the code snippet below,
>
> firstRDD.toDF.registerTempTable("firstTable")
> secondRDD.toDF.registerTempTable("secondTable")
> val res = sqlContext.sql("select * from firstTable INNER JOIN secondTable on 
> to_date(firstTable.loginTime) = to_date(secondTable.loginTime)")
>
> I'm not getting any exception. But i'm not getting correct answer too. It
> does a cartesian and some random dates are generated in the result.
>
>
> Regds,
> --Praseetha
>


Unable to compare SparkSQL Date columns

2016-09-13 Thread Praseetha
Hi All,

I have a case class in scala case class TestDate (id: String, loginTime:
java.sql.Date)

I created 2 RDD's of type TestDate

I wanted to do an inner join on two rdd's where the values of loginTime
column is equal. Please find the code snippet below,

firstRDD.toDF.registerTempTable("firstTable")
secondRDD.toDF.registerTempTable("secondTable")
val res = sqlContext.sql("select * from firstTable INNER JOIN
secondTable on to_date(firstTable.loginTime) =
to_date(secondTable.loginTime)")

I'm not getting any exception. But i'm not getting correct answer too. It
does a cartesian and some random dates are generated in the result.


Regds,
--Praseetha


Re: Spark_JDBC_Partitions

2016-09-13 Thread Igor Racic
Hi,

One way can be to use NTILE function to partition data.
Example:

REM Creating test table
create table Test_part as select * from ( select rownum rn from all_tables
t1 ) where rn <= 1000;

REM Partition lines by Oracle block number, 11 partitions in this example.
select ntile(11) over( order by dbms_rowid.ROWID_BLOCK_NUMBER( rowid ) ) nt
from Test_part


Let's see distribution:

select nt, count(*) from ( select ntile(11) over( order by
dbms_rowid.ROWID_BLOCK_NUMBER( rowid ) ) nt from Test_part) group by nt;

NT   COUNT(*)
-- --
 1 10
 6 10
11  9
 2 10
 4 10
 5 10
 8 10
 3 10
 7 10
 9  9
10  9

11 rows selected.
^^ It looks good. Sure feel free to chose any other condition to order your
lines as best suits your case

So you can
1) have one session reading and then decide where line goes (1 reader )
2) Or do multiple reads by specifying partition number. Note that in this
case you read whole table n times (in parallel) and is more internsive on
read part. (multiple readers)

Regards,
Igor



2016-09-11 0:46 GMT+02:00 Mich Talebzadeh :

> Good points
>
> Unfortunately databump. expr, imp use binary format for import and export.
> that cannot be used to import data into HDFS in a suitable way.
>
> One can use what is known as flat,sh script to get data out tab or ,
> separated etc.
>
> ROWNUM is a pseudocolumn (not a real column) that is available in a query.
> The issue is that in a table of 280Million rows to get the position of the
> row it will have to do a table scan since no index cannot be built on it
> (assuming there is no other suitable index). Not ideal but can be done.
>
> I think a better alternative is to use datapump to take that table to
> DEV/TEST, add a sequence (like an IDENTITY column in Sybase), build a
> unique index on the sequence column and do the partitioning there.
>
> HTH
>
>
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 10 September 2016 at 22:37, ayan guha  wrote:
>
>> In oracle something called row num is present in every row.  You can
>> create an evenly distribution using that column. If it is one time work,
>> try using sqoop. Are you using Oracle's own appliance? Then you can use
>> data pump format
>> On 11 Sep 2016 01:59, "Mich Talebzadeh" 
>> wrote:
>>
>>> creating an Oracle sequence for a table of 200million is not going to be
>>> that easy without changing the schema. It is possible to export that table
>>> from prod and import it to DEV/TEST and create the sequence there.
>>>
>>> If it is a FACT table then the foreign keys from the Dimension tables
>>> will be bitmap indexes on the FACT table so they can be potentially used.
>>>
>>> HTH
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> *
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>> On 10 September 2016 at 16:42, Takeshi Yamamuro 
>>> wrote:
>>>
 Hi,

 Yea, spark does not have the same functionality with sqoop.
 I think one of simple solutions is to assign unique ids on the oracle
 table by yourself.
 Thought?

 // maropu


 On Sun, Sep 11, 2016 at 12:37 AM, Mich Talebzadeh <
 mich.talebza...@gmail.com> wrote:

> Strange that Oracle table of 200Million plus rows has not been
> partitioned.
>
> What matters here is to have parallel connections from JDBC to Oracle,
> each reading a sub-set of table. Any parallel fetch is going to be better
> than reading with one connection from Oracle.
>
> Surely among 404 columns there must be one with high cardinality to
> satisfy this work.
>
> May be you should just create table  as 

Spark SQL - Actions and Transformations

2016-09-13 Thread brccosta
Dear all,

We're performing some tests with cache and persist in datasets. In RDD, we
know that the transformations are lazy, being executed only when an action
occurs. So, for example, we put a .cache() in a RDD after an action, which
in turn is executed as the last operations of a sequence of transformations.

However, what are the lazy operations in Datasets and Dataframes? For
example, the following code (fragment):

(df_train, df_test) = df.randomSplit([0.8, 0.2])

r_tokenizer = RegexTokenizer(inputCol="review", outputCol="words_all",
gaps=False, pattern="\\p{L}+")
df_words_all = r_tokenizer.transform(df_train)

remover = StopWordsRemover(inputCol="words_all", outputCol="words_filtered")
df_filtered = remover.transform(df_words_all)
df_filtered = df_filtered.drop('words_all')

hashingTF = HashingTF(inputCol="words_filtered", outputCol="features")
df_features = hashingTF.transform(df_filtered)
df_features = df_features.drop('words_filtered')

lr = LogisticRegression(maxIter=iteractions, regParam=0.01)
model1 = lr.fit(df_features)

evaluator = BinaryClassificationEvaluator()
pipelineModel_features  = PipelineModel (stages=[r_tokenizer, remover,
hashingTF])
df_test_features = pipelineModel_features.transform(df_test)
predictions = model1.transform(df_test_features)
eval_test = evaluator.evaluate(predictions)

All transformations of df_train and df_test will only occur when the
operations fit() and evaluate() are executed?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-Actions-and-Transformations-tp27698.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Any viable DATEDIFF function in Spark/Scala

2016-09-13 Thread Mich Talebzadeh
Hi,

This tricky bit.

I use the following to get the current data and time

scala> val date = java.time.LocalDate.now.toString
date: String = 2016-09-13
scala> val hour = java.time.LocalTime.now.toString
hour: String = 11:49:13.577

I store a column called TIMECREATED as String in hdfs. For now these values
look like this

scala> val df3 = df2.filter('security > " " && 'price > "10" &&
'TIMECREATED >  current_date()).select('TIMECREATED, current_date(),
datediff(current_date(), 'TIMECREATED).as("datediff")).show(2)
+---+--++
|TIMECREATED|current_date()|datediff|
+---+--++
|2016-09-13 08:49:31|2016-09-13|   0|
|2016-09-13 08:49:54|2016-09-13|   0|
+---+--++
Which shows rows for today

Now I want to find all the rows where the rows are created in the past
15 minutes?

In other words something similar to this

*DATEDIFF* ( *date-part*, *date-expression1*, *date-expression2* )

Any available implementation


Thanks






Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.


Re: [Erorr:]vieiwng Web UI on EMR cluster

2016-09-13 Thread Natu Lauchande
Hi,

I think the spark UI will be accessible whenever you launch a spark app in
the cluster it should be the Application Tracker link.


Regards,
Natu

On Tue, Sep 13, 2016 at 9:37 AM, Divya Gehlot 
wrote:

> Hi ,
> Thank you all..
> Hurray ...I am able to view the hadoop web UI now  @ 8088 . even Spark
> Hisroty server Web UI @ 18080
> But unable to figure out the Spark UI web port ...
> Tried with 4044,4040.. ..
> getting below error
> This site can’t be reached
> How can I find out the Spark port ?
>
> Would really appreciate the help.
>
> Thanks,
> Divya
>
>
> On 13 September 2016 at 15:09, Divya Gehlot 
> wrote:
>
>> Hi,
>> Thanks all for your prompt response.
>> I followed the instruction in the docs EMR SSH tunnel
>> 
>> shared by Jonathan.
>> I am on MAC and set up foxy proxy in my chrome browser
>>
>> Divyas-MacBook-Pro:.ssh divyag$ ssh  -N -D 8157
>> had...@ec2-xx-xxx-xxx-xx.ap-southeast-1.compute.amazonaws.com
>>
>> channel 3: open failed: connect failed: Connection refused
>>
>> channel 3: open failed: connect failed: Connection refused
>>
>> channel 4: open failed: connect failed: Connection refused
>>
>> channel 3: open failed: connect failed: Connection refused
>>
>> channel 4: open failed: connect failed: Connection refused
>>
>> channel 3: open failed: connect failed: Connection refused
>>
>> channel 3: open failed: connect failed: Connection refused
>>
>> channel 4: open failed: connect failed: Connection refused
>>
>> channel 5: open failed: connect failed: Connection refused
>>
>> channel 22: open failed: connect failed: Connection refused
>>
>> channel 23: open failed: connect failed: Connection refused
>>
>> channel 22: open failed: connect failed: Connection refused
>>
>> channel 23: open failed: connect failed: Connection refused
>>
>> channel 22: open failed: connect failed: Connection refused
>>
>> channel 8: open failed: administratively prohibited: open failed
>>
>>
>> What am I missing now ?
>>
>>
>> Thanks,
>>
>> Divya
>>
>> On 13 September 2016 at 14:23, Jonathan Kelly 
>> wrote:
>>
>>> I would not recommend opening port 50070 on your cluster, as that would
>>> give the entire world access to your data on HDFS. Instead, you should
>>> follow the instructions found here to create a secure tunnel to the
>>> cluster, through which you can proxy requests to the UIs using a browser
>>> plugin like FoxyProxy: https://docs.aws.amazon.com/ElasticMapReduce/late
>>> st/ManagementGuide/emr-ssh-tunnel.html
>>>
>>> ~ Jonathan
>>>
>>> On Mon, Sep 12, 2016 at 10:40 PM Mohammad Tariq 
>>> wrote:
>>>
 Hi Divya,

 Do you you have inbounds enabled on port 50070 of your NN machine.
 Also, it's a good idea to have the public DNS in your /etc/hosts for proper
 name resolution.


 [image: --]

 Tariq, Mohammad
 [image: https://]about.me/mti

 




 [image: http://] 
 Tariq, Mohammad
 about.me/mti
 [image: http://]
 

 On Tue, Sep 13, 2016 at 9:28 AM, Divya Gehlot 
 wrote:

> Hi,
> I am on EMR 4.7 with Spark 1.6.1   and Hadoop 2.7.2
> When I am trying to view Any of the web UI of the cluster either
> hadoop or Spark ,I am getting below error
> "
> This site can’t be reached
>
> "
> Has anybody using EMR and able to view WebUI .
> Could you please share the steps.
>
> Would really appreciate the help.
>
> Thanks,
> Divya
>


>>
>


Re: Spark with S3 DirectOutputCommitter

2016-09-13 Thread Steve Loughran

On 12 Sep 2016, at 19:58, Srikanth 
> wrote:

Thanks Steve!

We are already using HDFS as an intermediate store. This is for the last stage 
of processing which has to put data in S3.
The output is partitioned by 3 fields, like 
.../field1=111/field2=999/date=-MM-DD/*
Given that there are 100s for folders and 1000s of subfolder and part files, 
rename from _temporary is just not practical in S3.
I guess we have to add another stage with S3Distcp??

Afraid so

Srikanth

On Sun, Sep 11, 2016 at 2:34 PM, Steve Loughran 
> wrote:

> On 9 Sep 2016, at 21:54, Srikanth 
> > wrote:
>
> Hello,
>
> I'm trying to use DirectOutputCommitter for s3a in Spark 2.0. I've tried a 
> few configs and none of them seem to work.
> Output always creates _temporary directory. Rename is killing performance.

> I read some notes about DirectOutputcommitter causing problems with 
> speculation turned on. Was this option removed entirely?

Spark turns off any committer with the word "direct' in its name if 
speculation==true . Concurrency, see.

even on on-speculative execution, the trouble with the direct options is that 
executor/job failures can leave incomplete/inconsistent work around —and the 
things downstream wouldn't even notice

There's work underway to address things, work which requires a consistent 
metadata store alongside S3 ( HADOOP-13345 : S3Guard).

For now: stay with the file output committer

hadoop.mapreduce.fileoutputcommitter.algorithm.version=2
hadoop.mapreduce.fileoutputcommitter.cleanup.skipped=true

Even better: use HDFS as the intermediate store for work, only do a bulk upload 
at the end.

>
>   val spark = SparkSession.builder()
> .appName("MergeEntities")
> .config("spark.sql.warehouse.dir", 
> mergeConfig.getString("sparkSqlWarehouseDir"))
> .config("fs.s3a.buffer.dir", "/tmp")
> .config("spark.hadoop.mapred.output.committer.class", 
> classOf[DirectOutputCommitter].getCanonicalName)
> .config("mapred.output.committer.class", 
> classOf[DirectOutputCommitter].getCanonicalName)
> .config("mapreduce.use.directfileoutputcommitter", "true")
> //.config("spark.sql.sources.outputCommitterClass", 
> classOf[DirectOutputCommitter].getCanonicalName)
> .getOrCreate()
>
> Srikanth





Re: Spark + Parquet + IBM Block Storage at Bluemix

2016-09-13 Thread Steve Loughran

On 12 Sep 2016, at 13:04, Daniel Lopes 
> wrote:

Thanks Steve,

But this error occurs only with parquet files, CSVs works.


out my depth then, I'm afraid. sorry


Best,

Daniel Lopes
Chief Data and Analytics Officer | OneMatch
c: +55 (18) 99764-2733 | https://www.linkedin.com/in/dslopes

www.onematch.com.br

On Sun, Sep 11, 2016 at 3:28 PM, Steve Loughran 
> wrote:

On 9 Sep 2016, at 17:56, Daniel Lopes 
> wrote:

Hi, someone can help

I'm trying to use parquet in IBM Block Storage at Spark but when I try to load 
get this error:

using this config

credentials = {
  "name": "keystone",
  "auth_url": 
"https://identity.open.softlayer.com",
  "project": "object_storage_23f274c1_d11XXXe634",
  "projectId": "XXd9c4aa39b7c7eb",
  "region": "dallas",
  "userId": "X64087180b40X2b909",
  "username": "admin_9dd810f8901d48778XX",
  "password": "chX6_",
  "domainId": "c1ddad17cfcX41",
  "domainName": "10XX",
  "role": "admin"
}

def set_hadoop_config(credentials):
"""This function sets the Hadoop configuration with given credentials,
so it is possible to access data using SparkContext"""

prefix = "fs.swift.service." + credentials['name']
hconf = sc._jsc.hadoopConfiguration()
hconf.set(prefix + ".auth.url", credentials['auth_url']+'/v3/auth/tokens')
hconf.set(prefix + ".auth.endpoint.prefix", "endpoints")
hconf.set(prefix + ".tenant", credentials['projectId'])
hconf.set(prefix + ".username", credentials['userId'])
hconf.set(prefix + ".password", credentials['password'])
hconf.setInt(prefix + ".http.port", 8080)
hconf.set(prefix + ".region", credentials['region'])
hconf.setBoolean(prefix + ".public", True)

set_hadoop_config(credentials)

-

Py4JJavaErrorTraceback (most recent call last)
 in ()
> 1 train.groupby('Acordo').count().show()

Py4JJavaError: An error occurred while calling o406.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 60 in 
stage 30.0 failed 10 times, most recent failure: Lost task 60.9 in stage 30.0 
(TID 2556, yp-spark-dal09-env5-0039): 
org.apache.hadoop.fs.swift.exceptions.SwiftConfigurationException: Missing 
mandatory configuration option: fs.swift.service.keystone.auth.url


In my own code, I'd assume that the value of credentials['name'] didn't match 
that of the URL, assuming you have something like swift://bucket.keystone . 
Failing that: the options were set too late.

Instead of asking for the hadoop config and editing that, set the option in 
your spark context, before it is launched, with the prefix "hadoop"


at 
org.apache.hadoop.fs.swift.http.RestClientBindings.copy(RestClientBindings.java:223)
at 
org.apache.hadoop.fs.swift.http.RestClientBindings.bind(RestClientBindings.java:147)


Daniel Lopes
Chief Data and Analytics Officer | OneMatch
c: +55 (18) 99764-2733 | 
https://www.linkedin.com/in/dslopes

www.onematch.com.br





Re: Zeppelin patterns with the streaming data

2016-09-13 Thread Mich Talebzadeh
Hi Chanh,

Yes indeed. Apparently it is implemented through a class of its own. I have
specified a refresh of every 15 seconds.

Obviously if there is an issue then the cron will not be able to refresh
but you cannot sort out that problem from the web page anyway

Thanks

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 13 September 2016 at 09:00, Chanh Le  wrote:

> Hi Mich,
> I think it can http://www.quartz-scheduler.org/documentation/
> quartz-2.1.x/tutorials/crontrigger
>
>
>
>
> On Sep 13, 2016, at 1:57 PM, Mich Talebzadeh 
> wrote:
>
> Thanks Sachin.
>
> The cron gives the granularity of 1 min. On normal one can use wait 10 and
> loop in the cron to run the job every 10 seconds. I am not sure that is
> possible with Zeppelin?
>
> cheers
>
> Dr Mich Talebzadeh
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
> http://talebzadehmich.wordpress.com
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 13 September 2016 at 05:05, Sachin Janani 
> wrote:
>
>> Good to see that you are enjoying zeppelin.You can schedule the paragraph
>> running after every x seconds.You can find this option on top of the
>> notebook just beside delete notebook button.
>>
>>
>> Regards,
>> Sachin Janani
>> http://snappydatainc.github.io/snappydata/
>>
>>
>> On Tue, Sep 13, 2016 at 3:13 AM, Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> The latest version of Zeppelin 0.6.1 looks pretty impressive with Spark
>>> 2 and also with Spark Thrift server (it runs on Hive Thrift server) and
>>> uses Hive execution engine. Make sure that you do not use MapReduce as
>>> Hive's execution engine.
>>>
>>> Now for streaming data (in this case some test generated data using
>>> Kafka topic), I store them as  text file on HDFS. To my surprise text files
>>> (all created every two seconds under some HDFS some directory) with
>>> timestamp added to the file name seem to be pretty fast. I am sceptical
>>> what benefit one gets if I decide to store them as Hbase file? Anyone can
>>> shed some light on it?
>>>
>>> Also I use Zeppelin to do some plots on the stored security prices.
>>> These are all  Fictitious (the data belonging to these securities are all
>>> fictitious and randomly generated). Now I don't think there is anyway one
>>> can automate Zeppelin to run the same code say every x seconds? Thinking
>>> loud the other alternative is to add new data as comes in and tail off the
>>> old data? Has anyone done any work of this type?
>>>
>>> Anyway I show you a sample graph below. Appreciate any ideas. My view is
>>> that Zeppelin is not designed for real time dashboard but increasingly
>>> looking good and may be with some change one can use it near real time?
>>>
>>>
>>> 
>>>
>>> Thanks
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> *
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>
>>
>
>


Re: Zeppelin patterns with the streaming data

2016-09-13 Thread Chanh Le
Hi Mich,
I think it can 
http://www.quartz-scheduler.org/documentation/quartz-2.1.x/tutorials/crontrigger
 





> On Sep 13, 2016, at 1:57 PM, Mich Talebzadeh  
> wrote:
> 
> Thanks Sachin.
> 
> The cron gives the granularity of 1 min. On normal one can use wait 10 and 
> loop in the cron to run the job every 10 seconds. I am not sure that is 
> possible with Zeppelin?
> 
> cheers
> 
> Dr Mich Talebzadeh
>  
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>  
> 
>  
> http://talebzadehmich.wordpress.com 
> 
> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
> damage or destruction of data or any other property which may arise from 
> relying on this email's technical content is explicitly disclaimed. The 
> author will in no case be liable for any monetary damages arising from such 
> loss, damage or destruction.
>  
> 
> On 13 September 2016 at 05:05, Sachin Janani  > wrote:
> Good to see that you are enjoying zeppelin.You can schedule the paragraph 
> running after every x seconds.You can find this option on top of the notebook 
> just beside delete notebook button.
> 
> 
> Regards,
> Sachin Janani
> http://snappydatainc.github.io/snappydata/ 
> 
> 
> 
> On Tue, Sep 13, 2016 at 3:13 AM, Mich Talebzadeh  > wrote:
> The latest version of Zeppelin 0.6.1 looks pretty impressive with Spark 2 and 
> also with Spark Thrift server (it runs on Hive Thrift server) and uses Hive 
> execution engine. Make sure that you do not use MapReduce as Hive's execution 
> engine.
> 
> Now for streaming data (in this case some test generated data using Kafka 
> topic), I store them as  text file on HDFS. To my surprise text files (all 
> created every two seconds under some HDFS some directory) with timestamp 
> added to the file name seem to be pretty fast. I am sceptical what benefit 
> one gets if I decide to store them as Hbase file? Anyone can shed some light 
> on it?
> 
> Also I use Zeppelin to do some plots on the stored security prices. These are 
> all  Fictitious (the data belonging to these securities are all fictitious 
> and randomly generated). Now I don't think there is anyway one can automate 
> Zeppelin to run the same code say every x seconds? Thinking loud the other 
> alternative is to add new data as comes in and tail off the old data? Has 
> anyone done any work of this type?
> 
> Anyway I show you a sample graph below. Appreciate any ideas. My view is that 
> Zeppelin is not designed for real time dashboard but increasingly looking 
> good and may be with some change one can use it near real time?
> 
>   
> 
> 
> Thanks
> 
> Dr Mich Talebzadeh
>  
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>  
> 
>  
> http://talebzadehmich.wordpress.com 
> 
> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
> damage or destruction of data or any other property which may arise from 
> relying on this email's technical content is explicitly disclaimed. The 
> author will in no case be liable for any monetary damages arising from such 
> loss, damage or destruction.
>  
> 
> 



Re: Ways to check Spark submit running

2016-09-13 Thread Deepak Sharma
Use yarn-client mode and you can see the logs n console after you submit.

On Tue, Sep 13, 2016 at 11:47 AM, Divya Gehlot 
wrote:

> Hi,
>
> Some how for time being  I am unable to view Spark Web UI and Hadoop Web
> UI.
> Looking for other ways ,I can check my job is running fine apart from keep
> checking current yarn logs .
>
>
> Thanks,
> Divya
>



-- 
Thanks
Deepak
www.bigdatabig.com
www.keosha.net


Re: Partition n keys into exacly n partitions

2016-09-13 Thread Christophe Préaud
Hi,

A custom partitioner is indeed the solution.

Here is a sample code:
import org.apache.spark.Partitioner

class KeyPartitioner(keyList: Seq[Any]) extends Partitioner {

  def numPartitions: Int = keyList.size + 1

  def getPartition(key: Any): Int = keyList.indexOf(key) + 1

  override def equals(other: Any): Boolean = other match {
case h: KeyPartitioner =>
  h.numPartitions == numPartitions
case _ =>
  false
  }

  override def hashCode: Int = numPartitions
}


It allows to repartition a RDD[(K, V)] so that all lines with the same
key value (and only those lines) will be on the same partition.

You need to pass as parameter to the constructor a Seq[K] keyList containing 
all the possible values for the keys in the RDD[(K, V)], e.g.:
val rdd = sc.parallelize(
  Seq((1,'a),(2,'a),(3,'a),(1,'b),(2,'b),(1,'c),(3,'c),(4,'d))
)
rdd.partitionBy(new KeyPartitioner(Seq(1,2,3,4)))

will put:
- (1,'a) (1,'b) and (1,'c) in partition 1
- (2,'a) and (2,'b) in partition 2
- (3,'a) and (3,'c) in partition 3
- (4,'d) in partition 4
and nothing in partition 0

If a key is not defined in the keyList, it will be put in partition 0:
rdd.partitionBy(new KeyPartitioner(Seq(1,2,3)))
will put:
- (1,'a) (1,'b) and (1,'c) in partition 1
- (2,'a) and (2,'b) in partition 2
- (3,'a) and (3,'c) in partition 3
- (4,'d) in partition 0


Please let me know if it fits your needs.

Regards,
Christophe.

On 12/09/16 19:03, Denis Bolshakov wrote:
Just provide own partitioner.

One I wrote a partitioner which keeps similar keys together in one  partitioner.

Best regards,
Denis

On 12 September 2016 at 19:44, sujeet jog 
> wrote:
Hi,

Is there a way to partition set of data with n keys into exactly n partitions.

For ex : -

tuple of 1008 rows with key as x
tuple of 1008 rows with key as y   and so on  total 10 keys ( x, y etc )

Total records = 10080
NumOfKeys = 10

i want to partition the 10080 elements into exactly 10 partitions with each 
partition having elements with unique key

Is there a way to make this happen ?.. any ideas on implementing custom 
partitioner.


The current partitioner i'm using is HashPartitioner from which there are cases 
where key.hascode() % numPartitions  for keys of x & y become same.

 hence many elements with different keys fall into single partition at times.



Thanks,
Sujeet



--
//with Best Regards
--Denis Bolshakov
e-mail: bolshakov.de...@gmail.com



Kelkoo SAS
Société par Actions Simplifiée
Au capital de € 4.168.964,30
Siège social : 158 Ter Rue du Temple 75003 Paris
425 093 069 RCS Paris

Ce message et les pièces jointes sont confidentiels et établis à l'attention 
exclusive de leurs destinataires. Si vous n'êtes pas le destinataire de ce 
message, merci de le détruire et d'en avertir l'expéditeur.


Re: [Erorr:]vieiwng Web UI on EMR cluster

2016-09-13 Thread Divya Gehlot
Hi ,
Thank you all..
Hurray ...I am able to view the hadoop web UI now  @ 8088 . even Spark
Hisroty server Web UI @ 18080
But unable to figure out the Spark UI web port ...
Tried with 4044,4040.. ..
getting below error
This site can’t be reached
How can I find out the Spark port ?

Would really appreciate the help.

Thanks,
Divya


On 13 September 2016 at 15:09, Divya Gehlot  wrote:

> Hi,
> Thanks all for your prompt response.
> I followed the instruction in the docs EMR SSH tunnel
> 
> shared by Jonathan.
> I am on MAC and set up foxy proxy in my chrome browser
>
> Divyas-MacBook-Pro:.ssh divyag$ ssh  -N -D 8157
> had...@ec2-xx-xxx-xxx-xx.ap-southeast-1.compute.amazonaws.com
>
> channel 3: open failed: connect failed: Connection refused
>
> channel 3: open failed: connect failed: Connection refused
>
> channel 4: open failed: connect failed: Connection refused
>
> channel 3: open failed: connect failed: Connection refused
>
> channel 4: open failed: connect failed: Connection refused
>
> channel 3: open failed: connect failed: Connection refused
>
> channel 3: open failed: connect failed: Connection refused
>
> channel 4: open failed: connect failed: Connection refused
>
> channel 5: open failed: connect failed: Connection refused
>
> channel 22: open failed: connect failed: Connection refused
>
> channel 23: open failed: connect failed: Connection refused
>
> channel 22: open failed: connect failed: Connection refused
>
> channel 23: open failed: connect failed: Connection refused
>
> channel 22: open failed: connect failed: Connection refused
>
> channel 8: open failed: administratively prohibited: open failed
>
>
> What am I missing now ?
>
>
> Thanks,
>
> Divya
>
> On 13 September 2016 at 14:23, Jonathan Kelly 
> wrote:
>
>> I would not recommend opening port 50070 on your cluster, as that would
>> give the entire world access to your data on HDFS. Instead, you should
>> follow the instructions found here to create a secure tunnel to the
>> cluster, through which you can proxy requests to the UIs using a browser
>> plugin like FoxyProxy: https://docs.aws.amazon.com/ElasticMapReduce/late
>> st/ManagementGuide/emr-ssh-tunnel.html
>>
>> ~ Jonathan
>>
>> On Mon, Sep 12, 2016 at 10:40 PM Mohammad Tariq 
>> wrote:
>>
>>> Hi Divya,
>>>
>>> Do you you have inbounds enabled on port 50070 of your NN machine. Also,
>>> it's a good idea to have the public DNS in your /etc/hosts for proper name
>>> resolution.
>>>
>>>
>>> [image: --]
>>>
>>> Tariq, Mohammad
>>> [image: https://]about.me/mti
>>>
>>> 
>>>
>>>
>>>
>>>
>>> [image: http://] 
>>> Tariq, Mohammad
>>> about.me/mti
>>> [image: http://]
>>> 
>>>
>>> On Tue, Sep 13, 2016 at 9:28 AM, Divya Gehlot 
>>> wrote:
>>>
 Hi,
 I am on EMR 4.7 with Spark 1.6.1   and Hadoop 2.7.2
 When I am trying to view Any of the web UI of the cluster either hadoop
 or Spark ,I am getting below error
 "
 This site can’t be reached

 "
 Has anybody using EMR and able to view WebUI .
 Could you please share the steps.

 Would really appreciate the help.

 Thanks,
 Divya

>>>
>>>
>


Re: [Erorr:]vieiwng Web UI on EMR cluster

2016-09-13 Thread Jonathan Kelly
I would not recommend opening port 50070 on your cluster, as that would
give the entire world access to your data on HDFS. Instead, you should
follow the instructions found here to create a secure tunnel to the
cluster, through which you can proxy requests to the UIs using a browser
plugin like FoxyProxy:
https://docs.aws.amazon.com/ElasticMapReduce/latest/ManagementGuide/emr-ssh-tunnel.html

~ Jonathan

On Mon, Sep 12, 2016 at 10:40 PM Mohammad Tariq  wrote:

> Hi Divya,
>
> Do you you have inbounds enabled on port 50070 of your NN machine. Also,
> it's a good idea to have the public DNS in your /etc/hosts for proper name
> resolution.
>
>
> [image: --]
>
> Tariq, Mohammad
> [image: https://]about.me/mti
>
> 
>
>
>
>
> [image: http://] 
> Tariq, Mohammad
> about.me/mti
> [image: http://]
> 
>
> On Tue, Sep 13, 2016 at 9:28 AM, Divya Gehlot 
> wrote:
>
>> Hi,
>> I am on EMR 4.7 with Spark 1.6.1   and Hadoop 2.7.2
>> When I am trying to view Any of the web UI of the cluster either hadoop
>> or Spark ,I am getting below error
>> "
>> This site can’t be reached
>>
>> "
>> Has anybody using EMR and able to view WebUI .
>> Could you please share the steps.
>>
>> Would really appreciate the help.
>>
>> Thanks,
>> Divya
>>
>
>


Ways to check Spark submit running

2016-09-13 Thread Divya Gehlot
Hi,

Some how for time being  I am unable to view Spark Web UI and Hadoop Web UI.
Looking for other ways ,I can check my job is running fine apart from keep
checking current yarn logs .


Thanks,
Divya