Re: Pasting into spark-shell doesn't work for Databricks example

2016-11-22 Thread Denis Bolshakov
Hello Zhu,

Thank you very much for such detailed explanation and providing workaround,
it works fine.

But since the problem is related to scala issue can we expect the fix in
Spark 2.0? Or it's not a good idea to update such important dependency as
scala in minor maintenance release?

Kind regards,
Denis

On 22 November 2016 at 22:13, Shixiong(Ryan) Zhu 
wrote:

> The workaround is defining the imports and class together using ":paste".
>
> On Tue, Nov 22, 2016 at 11:12 AM, Shixiong(Ryan) Zhu <
> shixi...@databricks.com> wrote:
>
>> This relates to a known issue: https://issues.apache.o
>> rg/jira/browse/SPARK-14146 and https://issues.scala-lang.
>> org/browse/SI-9799
>>
>> On Tue, Nov 22, 2016 at 6:37 AM, dbolshak 
>> wrote:
>>
>>> Hello,
>>>
>>> We have the same issue,
>>>
>>> We use latest release 2.0.2.
>>>
>>> Setup with 1.6.1 works fine.
>>>
>>> Could somebody provide a workaround how to fix that?
>>>
>>> Kind regards,
>>> Denis
>>>
>>>
>>>
>>> --
>>> View this message in context: http://apache-spark-user-list.
>>> 1001560.n3.nabble.com/Pasting-into-spark-shell-doesn-t-work-
>>> for-Databricks-example-tp28113p28116.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>>
>>
>


-- 
//with Best Regards
--Denis Bolshakov
e-mail: bolshakov.de...@gmail.com


Re: Pregel Question

2016-11-22 Thread Saliya Ekanayake
Just realized the attached file has text formatting wrong. The github link
to the file is
https://github.com/esaliya/graphxprimer/blob/master/src/main/scala-2.10/org/saliya/graphxprimer/PregelExample2.scala

On Tue, Nov 22, 2016 at 3:08 PM, Saliya Ekanayake  wrote:

> Hi,
>
> I've created a graph with vertex data of the form (Int, Array[Int]). I am
> using the pregel operator to update values of the array for each vertex.
>
> So my vprog has the following signature. Note the message is a map of
> arrays from neighbors
>
> def vprog(vertexId: VertexId, value: (Int, Array[Int]), message:
> scala.collection.mutable.HashMap[Int, Array[Int]])
>
> The full program is attached here. The expectation is vprog() to update
> the array and then sendMsg() to send the updates to the neighbors.
>
> However, this requires cloning the vertex every time in the vprog()
> function. If I don't clone Spark would send the same array that it got
> after the initial call.
>
> Is there a way to turn off this caching effect?
>
> Thank you,
> Saliya
>
>
> --
> Saliya Ekanayake, Ph.D
> Applied Computer Scientist
> Network Dynamics and Simulation Science Laboratory (NDSSL)
> Virginia Tech, Blacksburg
>
>


-- 
Saliya Ekanayake, Ph.D
Applied Computer Scientist
Network Dynamics and Simulation Science Laboratory (NDSSL)
Virginia Tech, Blacksburg


[Spark MLlib]: Does Spark MLlib supports nonlinear optimization with nonlinear constraints.

2016-11-22 Thread himanshu.gpt
Hi,

Component: Spark MLlib
Level: Beginner
Scenario: Does Spark MLlib supports nonlinear optimization with nonlinear 
constraints?

Our business application supports two types of function convex and S-shaped 
curves and linear & non-linear constraints. These constraints can be combined 
with any one type of functional form at a time.

Example of convex curve -

[cid:image009.png@01D2456D.1E896BC0]

Example of S-shaped curve -

[cid:image010.png@01D2456D.1E896BC0]

Example of non-linear constraints -

Min Bound (50%) < [cid:image011.png@01D2456D.1E896BC0]  < Max Bound (150%)

Example of linear constraints -

Min Bound (50%) < [cid:image012.png@01D2456D.1E896BC0]  < Max Bound (150%)


At present we are using SAS to solve these business problems. We are looking 
for SAS replacement software, which can solve similar kind of problems with 
performance equivalent to SAS.

Also, please share benchmarking of its performance. How it perform as no. of 
variables keep on increasing




Thanks and regards,
Himanshu Gupta
Accenture Interactive (AI)
Gurgaon, India
Mobile: +91 9910070743
E Mail: himanshu@accenture.com




This message is for the designated recipient only and may contain privileged, 
proprietary, or otherwise confidential information. If you have received it in 
error, please notify the sender immediately and delete the original. Any other 
use of the e-mail by you is prohibited. Where allowed by local law, electronic 
communications with Accenture and its affiliates, including e-mail and instant 
messaging (including content), may be scanned by our systems for the purposes 
of information security and assessment of internal compliance with Accenture 
policy.
__

www.accenture.com


Re: Best practice for preprocessing feature with DataFrame

2016-11-22 Thread Yan Facai
Thanks, White.

On Thu, Nov 17, 2016 at 11:15 PM, Stuart White 
wrote:

> Sorry.  Small typo.  That last part should be:
>
> val modifiedRows = rows
>   .select(
> substring('age, 0, 2) as "age",
> when('gender === 1, "male").otherwise(when('gender === 2,
> "female").otherwise("unknown")) as "gender"
>   )
> modifiedRows.show
>
> +---+---+
> |age| gender|
> +---+---+
> | 90|   male|
> | 80| female|
> | 80|unknown|
> +---+---+
>
> On Thu, Nov 17, 2016 at 8:57 AM, Stuart White 
> wrote:
> > import org.apache.spark.sql.functions._
> >
> > val rows = Seq(("90s", 1), ("80s", 2), ("80s", 3)).toDF("age", "gender")
> > rows.show
> >
> > +---+--+
> > |age|gender|
> > +---+--+
> > |90s| 1|
> > |80s| 2|
> > |80s| 3|
> > +---+--+
> >
> > val modifiedRows
> >   .select(
> > substring('age, 0, 2) as "age",
> > when('gender === 1, "male").otherwise(when('gender === 2,
> > "female").otherwise("unknown")) as "gender"
> >   )
> > modifiedRows.show
> >
> > +---+---+
> > |age| gender|
> > +---+---+
> > | 90|   male|
> > | 80| female|
> > | 80|unknown|
> > +---+---+
> >
> > On Thu, Nov 17, 2016 at 3:37 AM, 颜发才(Yan Facai) 
> wrote:
> >> Could you give me an example, how to use Column function?
> >> Thanks very much.
> >>
> >> On Thu, Nov 17, 2016 at 12:23 PM, Divya Gehlot  >
> >> wrote:
> >>>
> >>> Hi,
> >>>
> >>> You can use the Column functions provided by Spark API
> >>>
> >>>
> >>> https://spark.apache.org/docs/1.6.2/api/java/org/apache/
> spark/sql/functions.html
> >>>
> >>> Hope this helps .
> >>>
> >>> Thanks,
> >>> Divya
> >>>
> >>>
> >>> On 17 November 2016 at 12:08, 颜发才(Yan Facai)  wrote:
> 
>  Hi,
>  I have a sample, like:
>  +---+--++
>  |age|gender| city_id|
>  +---+--++
>  |   | 1|1042015:city_2044...|
>  |90s| 2|1042015:city_2035...|
>  |80s| 2|1042015:city_2061...|
>  +---+--++
> 
>  and expectation is:
>  "age":  90s -> 90, 80s -> 80
>  "gender": 1 -> "male", 2 -> "female"
> 
>  I have two solutions:
>  1. Handle each column separately,  and then join all by index.
>  val age = input.select("age").map(...)
>  val gender = input.select("gender").map(...)
>  val result = ...
> 
>  2. Write utf function for each column, and then use in together:
>   val result = input.select(ageUDF($"age"), genderUDF($"gender"))
> 
>  However, both are awkward,
> 
>  Does anyone have a better work flow?
>  Write some custom Transforms and use pipeline?
> 
>  Thanks.
> 
> 
> 
> >>>
> >>
>


Re: Re: Re: Multiple streaming aggregations in structured streaming

2016-11-22 Thread Reynold Xin
It's just the "approx_count_distinct" aggregate function.


On Tue, Nov 22, 2016 at 6:51 PM, Xinyu Zhang  wrote:

> Could you please tell me how to use the approximate count distinct? Is
> there any docs?
>
> Thanks
>
>
> At 2016-11-21 15:56:21, "Reynold Xin"  wrote:
>
> Can you use the approximate count distinct?
>
>
> On Sun, Nov 20, 2016 at 11:51 PM, Xinyu Zhang  wrote:
>
>>
>> MapWithState is also very useful.
>> I want to calculate UV in real time, but "distinct count" and "multiple
>> streaming aggregations" are not supported.
>> Is there any method to calculate real-time UV in the current version?
>>
>>
>>
>> At 2016-11-19 06:01:45, "Michael Armbrust" 
>> wrote:
>>
>> Doing this generally is pretty hard.  We will likely support algebraic
>> aggregate eventually, but this is not currently slotted for 2.2.  Instead I
>> think we will add something like mapWithState that lets users compute
>> arbitrary stateful things.  What is your use case?
>>
>>
>> On Wed, Nov 16, 2016 at 6:58 PM, wszxyh  wrote:
>>
>>> Hi
>>>
>>> Multiple streaming aggregations are not yet supported. When will it be
>>> supported? Is it in the plan?
>>>
>>> Thanks
>>>
>>>
>>>
>>>
>>
>>
>>
>>
>>
>
>
>
>
>


Re:Re: Re: Multiple streaming aggregations in structured streaming

2016-11-22 Thread Xinyu Zhang
Could you please tell me how to use the approximate count distinct? Is there 
any docs?


Thanks



At 2016-11-21 15:56:21, "Reynold Xin"  wrote:

Can you use the approximate count distinct?




On Sun, Nov 20, 2016 at 11:51 PM, Xinyu Zhang  wrote:



MapWithState is also very useful. 
I want to calculate UV in real time, but "distinct count" and "multiple 
streaming aggregations" are not supported.
Is there any method to calculate real-time UV in the current version?




At 2016-11-19 06:01:45, "Michael Armbrust"  wrote:

Doing this generally is pretty hard.  We will likely support algebraic 
aggregate eventually, but this is not currently slotted for 2.2.  Instead I 
think we will add something like mapWithState that lets users compute arbitrary 
stateful things.  What is your use case?




On Wed, Nov 16, 2016 at 6:58 PM, wszxyh  wrote:

Hi


Multiple streaming aggregations are not yet supported. When will it be 
supported? Is it in the plan?


Thanks




 







 




Re: RDD Partitions on HDFS file in Hive on Spark Query

2016-11-22 Thread yeshwanth kumar
Hi Ayan,

, thanks for the explanation,
I am aware of compression codecs.

How does locality level set?
Is it done by Spark or yarn?

Please let me know,



Thanks,
Yesh




On Nov 22, 2016 5:13 PM, "ayan guha"  wrote:

Hi

RACK_LOCAL = Task running on the same rack but not on the same node where
data is
NODE_LOCAL = task and data is co-located. Probably you were looking for
this one?

GZIP - Read is through GZIP codec, but because it is non-splittable, so you
can have atmost 1 task reading a gzip file. Now, the content of gzip may be
across multiple node. Ex: GZIP file of say 1GB and block size is 256 MB (ie
4 blocks). Assume not all 4 blocks are on same data node.

When you start reading the gzip file, 1 task will be assigned. It will read
local blocks if available, and it will read remote blocks (streaming read).
While reading the stream, gzip codec will uncompress the data.

This is really is not a spark thing, but a hadoop input format
discussion

HTH?

On Wed, Nov 23, 2016 at 10:00 AM, yeshwanth kumar 
wrote:

> Hi Ayan,
>
> we have  default rack topology.
>
>
>
> -Yeshwanth
> Can you Imagine what I would do if I could do all I can - Art of War
>
> On Tue, Nov 22, 2016 at 6:37 AM, ayan guha  wrote:
>
>> Because snappy is not splittable, so single task makes sense.
>>
>> Are sure about rack topology? Ie 225 is in a different rack than 227 or
>> 228? What does your topology file says?
>> On 22 Nov 2016 10:14, "yeshwanth kumar"  wrote:
>>
>>> Thanks for your reply,
>>>
>>> i can definitely change the underlying compression format.
>>> but i am trying to understand the Locality Level,
>>> why executor ran on a different node, where the blocks are not present,
>>> when Locality Level is RACK_LOCAL
>>>
>>> can you shed some light on this.
>>>
>>>
>>> Thanks,
>>> Yesh
>>>
>>>
>>> -Yeshwanth
>>> Can you Imagine what I would do if I could do all I can - Art of War
>>>
>>> On Mon, Nov 21, 2016 at 4:59 PM, Jörn Franke 
>>> wrote:
>>>
 Use as a format orc, parquet or avro because they support any
 compression type with parallel processing. Alternatively split your file in
 several smaller ones. Another alternative would be bzip2 (but slower in
 general) or Lzo (usually it is not included by default in many
 distributions).

 On 21 Nov 2016, at 23:17, yeshwanth kumar 
 wrote:

 Hi,

 we are running Hive on Spark, we have an external table over snappy
 compressed csv file of size 917.4 M
 HDFS block size is set to 256 MB

 as per my Understanding, if i run a query over that external table , it
 should launch 4 tasks. one for each block.
 but i am seeing one executor and one task processing all the file.

 trying to understand the reason behind,

 i went one step further to understand the block locality
 when i get the block locations for that file, i found

 [DatanodeInfoWithStorage[10.11.0.226:50010,DS-bf39d33d-48e1-
 4a8f-be48-b0953fdaad37,DISK],
  DatanodeInfoWithStorage[10.11.0.227:50010,DS-a760c1c8-ce0c-
 4eb8-8183-8d8ff5f24115,DISK],
  DatanodeInfoWithStorage[10.11.0.228:50010,DS-0e5427e2-b030-
 43f8-91c9-d8517e68414a,DISK]]

 DatanodeInfoWithStorage[10.11.0.226:50010,DS-f50ddf2f-b827-4
 845-b043-8b91ae4017c0,DISK],
 DatanodeInfoWithStorage[10.11.0.228:50010,DS-e8c9785f-c352-4
 89b-8209-4307f3296211,DISK],
 DatanodeInfoWithStorage[10.11.0.225:50010,DS-6f6a3ffd-334b-4
 5fd-ae0f-cc6eb268b0d2,DISK]]

 DatanodeInfoWithStorage[10.11.0.226:50010,DS-f8bea6a8-a433-4
 601-8070-f6c5da840e09,DISK],
 DatanodeInfoWithStorage[10.11.0.227:50010,DS-8aa3f249-790e-4
 94d-87ee-bcfff2182a96,DISK],
 DatanodeInfoWithStorage[10.11.0.228:50010,DS-d06714f4-2fbb-4
 8d3-b858-a023b5c44e9c,DISK]

 DatanodeInfoWithStorage[10.11.0.226:50010,DS-b3a00781-c6bd-4
 98c-a487-5ce6aaa66f48,DISK],
 DatanodeInfoWithStorage[10.11.0.228:50010,DS-fa5aa339-e266-4
 e20-a360-e7cdad5dacc3,DISK],
 DatanodeInfoWithStorage[10.11.0.225:50010,DS-9d597d3f-cd4f-4
 c8f-8a13-7be37ce769c9,DISK]]

 and in the spark UI i see the Locality Level is  RACK_LOCAL. for that
 task

 if it is RACK_LOCAL then it should run either in node 10.11.0.226 or
 10.11.0.228, because these 2 nodes has all the four blocks needed for
 computation
 but the executor is running in 10.11.0.225

 my theory is not applying anywhere.

 please help me in understanding how spark/yarn calculates number of
 executors/tasks.

 Thanks,
 -Yeshwanth


>>>
>


-- 
Best Regards,
Ayan Guha


Re: How do I persist the data after I process the data with Structured streaming...

2016-11-22 Thread shyla deshpande
Has anyone written a custom sink to persist data to Cassandra from
structured streaming.

Please provide me any link or reference. Thanks

On Tue, Nov 22, 2016 at 2:40 PM, Michael Armbrust 
wrote:

> Forgot the link: https://spark.apache.org/docs/latest/structured-
> streaming-programming-guide.html#using-foreach
>
> On Tue, Nov 22, 2016 at 2:40 PM, Michael Armbrust 
> wrote:
>
>> We are looking to add a native JDBC sink in Spark 2.2.  Until then you
>> can write your own connector using df.writeStream.foreach.
>>
>> On Tue, Nov 22, 2016 at 12:55 PM, shyla deshpande <
>> deshpandesh...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> Structured streaming works great with Kafka source but I need to persist
>>> the data after processing in some database like Cassandra or at least
>>> Postgres.
>>>
>>> Any suggestions, help please.
>>>
>>> Thanks
>>>
>>
>>
>


Re: getting error on spark streaming : java.lang.OutOfMemoryError: unable to create new native thread

2016-11-22 Thread Shixiong(Ryan) Zhu
Possibly https://issues.apache.org/jira/browse/SPARK-17396

On Tue, Nov 22, 2016 at 1:42 PM, Mohit Durgapal 
wrote:

> Hi Everyone,
>
>
> I am getting the following error while running a spark streaming example
> on my local machine, the being ingested is only 506kb.
>
>
> *16/11/23 03:05:54 INFO MappedDStream: Slicing from 1479850537180 ms to
> 1479850537235 ms (aligned to 1479850537180 ms and 1479850537235 ms)*
>
> *Exception in thread "streaming-job-executor-0"
> java.lang.OutOfMemoryError: unable to create new native thread*
>
>
> I looked it up and found out that it could be related to ulimit, I even
> increased the ulimit to 1 but still the same error.
>
>
> Regards
>
> Mohit
>


Re: RDD Partitions on HDFS file in Hive on Spark Query

2016-11-22 Thread ayan guha
Hi

RACK_LOCAL = Task running on the same rack but not on the same node where
data is
NODE_LOCAL = task and data is co-located. Probably you were looking for
this one?

GZIP - Read is through GZIP codec, but because it is non-splittable, so you
can have atmost 1 task reading a gzip file. Now, the content of gzip may be
across multiple node. Ex: GZIP file of say 1GB and block size is 256 MB (ie
4 blocks). Assume not all 4 blocks are on same data node.

When you start reading the gzip file, 1 task will be assigned. It will read
local blocks if available, and it will read remote blocks (streaming read).
While reading the stream, gzip codec will uncompress the data.

This is really is not a spark thing, but a hadoop input format
discussion

HTH?

On Wed, Nov 23, 2016 at 10:00 AM, yeshwanth kumar 
wrote:

> Hi Ayan,
>
> we have  default rack topology.
>
>
>
> -Yeshwanth
> Can you Imagine what I would do if I could do all I can - Art of War
>
> On Tue, Nov 22, 2016 at 6:37 AM, ayan guha  wrote:
>
>> Because snappy is not splittable, so single task makes sense.
>>
>> Are sure about rack topology? Ie 225 is in a different rack than 227 or
>> 228? What does your topology file says?
>> On 22 Nov 2016 10:14, "yeshwanth kumar"  wrote:
>>
>>> Thanks for your reply,
>>>
>>> i can definitely change the underlying compression format.
>>> but i am trying to understand the Locality Level,
>>> why executor ran on a different node, where the blocks are not present,
>>> when Locality Level is RACK_LOCAL
>>>
>>> can you shed some light on this.
>>>
>>>
>>> Thanks,
>>> Yesh
>>>
>>>
>>> -Yeshwanth
>>> Can you Imagine what I would do if I could do all I can - Art of War
>>>
>>> On Mon, Nov 21, 2016 at 4:59 PM, Jörn Franke 
>>> wrote:
>>>
 Use as a format orc, parquet or avro because they support any
 compression type with parallel processing. Alternatively split your file in
 several smaller ones. Another alternative would be bzip2 (but slower in
 general) or Lzo (usually it is not included by default in many
 distributions).

 On 21 Nov 2016, at 23:17, yeshwanth kumar 
 wrote:

 Hi,

 we are running Hive on Spark, we have an external table over snappy
 compressed csv file of size 917.4 M
 HDFS block size is set to 256 MB

 as per my Understanding, if i run a query over that external table , it
 should launch 4 tasks. one for each block.
 but i am seeing one executor and one task processing all the file.

 trying to understand the reason behind,

 i went one step further to understand the block locality
 when i get the block locations for that file, i found

 [DatanodeInfoWithStorage[10.11.0.226:50010,DS-bf39d33d-48e1-
 4a8f-be48-b0953fdaad37,DISK],
  DatanodeInfoWithStorage[10.11.0.227:50010,DS-a760c1c8-ce0c-
 4eb8-8183-8d8ff5f24115,DISK],
  DatanodeInfoWithStorage[10.11.0.228:50010,DS-0e5427e2-b030-
 43f8-91c9-d8517e68414a,DISK]]

 DatanodeInfoWithStorage[10.11.0.226:50010,DS-f50ddf2f-b827-4
 845-b043-8b91ae4017c0,DISK],
 DatanodeInfoWithStorage[10.11.0.228:50010,DS-e8c9785f-c352-4
 89b-8209-4307f3296211,DISK],
 DatanodeInfoWithStorage[10.11.0.225:50010,DS-6f6a3ffd-334b-4
 5fd-ae0f-cc6eb268b0d2,DISK]]

 DatanodeInfoWithStorage[10.11.0.226:50010,DS-f8bea6a8-a433-4
 601-8070-f6c5da840e09,DISK],
 DatanodeInfoWithStorage[10.11.0.227:50010,DS-8aa3f249-790e-4
 94d-87ee-bcfff2182a96,DISK],
 DatanodeInfoWithStorage[10.11.0.228:50010,DS-d06714f4-2fbb-4
 8d3-b858-a023b5c44e9c,DISK]

 DatanodeInfoWithStorage[10.11.0.226:50010,DS-b3a00781-c6bd-4
 98c-a487-5ce6aaa66f48,DISK],
 DatanodeInfoWithStorage[10.11.0.228:50010,DS-fa5aa339-e266-4
 e20-a360-e7cdad5dacc3,DISK],
 DatanodeInfoWithStorage[10.11.0.225:50010,DS-9d597d3f-cd4f-4
 c8f-8a13-7be37ce769c9,DISK]]

 and in the spark UI i see the Locality Level is  RACK_LOCAL. for that
 task

 if it is RACK_LOCAL then it should run either in node 10.11.0.226 or
 10.11.0.228, because these 2 nodes has all the four blocks needed for
 computation
 but the executor is running in 10.11.0.225

 my theory is not applying anywhere.

 please help me in understanding how spark/yarn calculates number of
 executors/tasks.

 Thanks,
 -Yeshwanth


>>>
>


-- 
Best Regards,
Ayan Guha


Re: RDD Partitions on HDFS file in Hive on Spark Query

2016-11-22 Thread yeshwanth kumar
Hi Ayan,

we have  default rack topology.



-Yeshwanth
Can you Imagine what I would do if I could do all I can - Art of War

On Tue, Nov 22, 2016 at 6:37 AM, ayan guha  wrote:

> Because snappy is not splittable, so single task makes sense.
>
> Are sure about rack topology? Ie 225 is in a different rack than 227 or
> 228? What does your topology file says?
> On 22 Nov 2016 10:14, "yeshwanth kumar"  wrote:
>
>> Thanks for your reply,
>>
>> i can definitely change the underlying compression format.
>> but i am trying to understand the Locality Level,
>> why executor ran on a different node, where the blocks are not present,
>> when Locality Level is RACK_LOCAL
>>
>> can you shed some light on this.
>>
>>
>> Thanks,
>> Yesh
>>
>>
>> -Yeshwanth
>> Can you Imagine what I would do if I could do all I can - Art of War
>>
>> On Mon, Nov 21, 2016 at 4:59 PM, Jörn Franke 
>> wrote:
>>
>>> Use as a format orc, parquet or avro because they support any
>>> compression type with parallel processing. Alternatively split your file in
>>> several smaller ones. Another alternative would be bzip2 (but slower in
>>> general) or Lzo (usually it is not included by default in many
>>> distributions).
>>>
>>> On 21 Nov 2016, at 23:17, yeshwanth kumar  wrote:
>>>
>>> Hi,
>>>
>>> we are running Hive on Spark, we have an external table over snappy
>>> compressed csv file of size 917.4 M
>>> HDFS block size is set to 256 MB
>>>
>>> as per my Understanding, if i run a query over that external table , it
>>> should launch 4 tasks. one for each block.
>>> but i am seeing one executor and one task processing all the file.
>>>
>>> trying to understand the reason behind,
>>>
>>> i went one step further to understand the block locality
>>> when i get the block locations for that file, i found
>>>
>>> [DatanodeInfoWithStorage[10.11.0.226:50010,DS-bf39d33d-48e1-
>>> 4a8f-be48-b0953fdaad37,DISK],
>>>  DatanodeInfoWithStorage[10.11.0.227:50010,DS-a760c1c8-ce0c-
>>> 4eb8-8183-8d8ff5f24115,DISK],
>>>  DatanodeInfoWithStorage[10.11.0.228:50010,DS-0e5427e2-b030-
>>> 43f8-91c9-d8517e68414a,DISK]]
>>>
>>> DatanodeInfoWithStorage[10.11.0.226:50010,DS-f50ddf2f-b827-4
>>> 845-b043-8b91ae4017c0,DISK],
>>> DatanodeInfoWithStorage[10.11.0.228:50010,DS-e8c9785f-c352-4
>>> 89b-8209-4307f3296211,DISK],
>>> DatanodeInfoWithStorage[10.11.0.225:50010,DS-6f6a3ffd-334b-4
>>> 5fd-ae0f-cc6eb268b0d2,DISK]]
>>>
>>> DatanodeInfoWithStorage[10.11.0.226:50010,DS-f8bea6a8-a433-4
>>> 601-8070-f6c5da840e09,DISK],
>>> DatanodeInfoWithStorage[10.11.0.227:50010,DS-8aa3f249-790e-4
>>> 94d-87ee-bcfff2182a96,DISK],
>>> DatanodeInfoWithStorage[10.11.0.228:50010,DS-d06714f4-2fbb-4
>>> 8d3-b858-a023b5c44e9c,DISK]
>>>
>>> DatanodeInfoWithStorage[10.11.0.226:50010,DS-b3a00781-c6bd-4
>>> 98c-a487-5ce6aaa66f48,DISK],
>>> DatanodeInfoWithStorage[10.11.0.228:50010,DS-fa5aa339-e266-4
>>> e20-a360-e7cdad5dacc3,DISK],
>>> DatanodeInfoWithStorage[10.11.0.225:50010,DS-9d597d3f-cd4f-4
>>> c8f-8a13-7be37ce769c9,DISK]]
>>>
>>> and in the spark UI i see the Locality Level is  RACK_LOCAL. for that
>>> task
>>>
>>> if it is RACK_LOCAL then it should run either in node 10.11.0.226 or
>>> 10.11.0.228, because these 2 nodes has all the four blocks needed for
>>> computation
>>> but the executor is running in 10.11.0.225
>>>
>>> my theory is not applying anywhere.
>>>
>>> please help me in understanding how spark/yarn calculates number of
>>> executors/tasks.
>>>
>>> Thanks,
>>> -Yeshwanth
>>>
>>>
>>


Re: Any equivalent method lateral and explore

2016-11-22 Thread Michael Armbrust
Both collect_list and explode are available in the function library

.

The following is an example of using it:
df.select($"*", explode($"myArray") as 'arrayItem)

On Tue, Nov 22, 2016 at 2:42 PM, Mahender Sarangam <
mahender.bigd...@outlook.com> wrote:

> Hi,
>
> We are converting our hive logic which is using lateral view and explode
> functions. Is there any builtin function in scala for performing lateral
> view explore.
>
>
> Below is our  query in Hive. temparray is temp table with c0 and c1 columns
>
> SELECT id, CONCAT_WS(',', collect_list(LineID)) as LineiD
> FROM (SELECT cast(LineID as STRING) as LineiD, cast(id as STRING) as id
> FROM temparray LATERAL VIEW explode(`_c1`) adTable AS id) T
> GROUP BY id;
>
>
> Can any one provide pointer for string functions available in scala. We
> would like perform operations like Collect_List, get starting index of
> matching string.
>
>
>
> Nanu
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Any equivalent method lateral and explore

2016-11-22 Thread Mahender Sarangam
Hi,

We are converting our hive logic which is using lateral view and explode 
functions. Is there any builtin function in scala for performing lateral 
view explore.


Below is our  query in Hive. temparray is temp table with c0 and c1 columns

SELECT id, CONCAT_WS(',', collect_list(LineID)) as LineiD
FROM (SELECT cast(LineID as STRING) as LineiD, cast(id as STRING) as id
FROM temparray LATERAL VIEW explode(`_c1`) adTable AS id) T
GROUP BY id;


Can any one provide pointer for string functions available in scala. We 
would like perform operations like Collect_List, get starting index of 
matching string.



Nanu


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: How do I flatten JSON blobs into a Data Frame using Spark/Spark SQL

2016-11-22 Thread Michael Armbrust
The first release candidate should be coming out this week. You can
subscribe to the dev list if you want to follow the release schedule.

On Mon, Nov 21, 2016 at 9:34 PM, kant kodali  wrote:

> Hi Michael,
>
> I only see spark 2.0.2 which is what I am using currently. Any idea on
> when 2.1 will be released?
>
> Thanks,
> kant
>
> On Mon, Nov 21, 2016 at 5:12 PM, Michael Armbrust 
> wrote:
>
>> In Spark 2.1 we've added a from_json
>> 
>> function that I think will do what you want.
>>
>> On Fri, Nov 18, 2016 at 2:29 AM, kant kodali  wrote:
>>
>>> This seem to work
>>>
>>> import org.apache.spark.sql._
>>> val rdd = df2.rdd.map { case Row(j: String) => j }
>>> spark.read.json(rdd).show()
>>>
>>> However I wonder if this any inefficiency here ? since I have to apply
>>> this function for billion rows.
>>>
>>>
>>
>


Re: How do I persist the data after I process the data with Structured streaming...

2016-11-22 Thread Michael Armbrust
We are looking to add a native JDBC sink in Spark 2.2.  Until then you can
write your own connector using df.writeStream.foreach.

On Tue, Nov 22, 2016 at 12:55 PM, shyla deshpande 
wrote:

> Hi,
>
> Structured streaming works great with Kafka source but I need to persist
> the data after processing in some database like Cassandra or at least
> Postgres.
>
> Any suggestions, help please.
>
> Thanks
>


Re: How do I persist the data after I process the data with Structured streaming...

2016-11-22 Thread Michael Armbrust
Forgot the link:
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach

On Tue, Nov 22, 2016 at 2:40 PM, Michael Armbrust 
wrote:

> We are looking to add a native JDBC sink in Spark 2.2.  Until then you can
> write your own connector using df.writeStream.foreach.
>
> On Tue, Nov 22, 2016 at 12:55 PM, shyla deshpande <
> deshpandesh...@gmail.com> wrote:
>
>> Hi,
>>
>> Structured streaming works great with Kafka source but I need to persist
>> the data after processing in some database like Cassandra or at least
>> Postgres.
>>
>> Any suggestions, help please.
>>
>> Thanks
>>
>
>


getting error on spark streaming : java.lang.OutOfMemoryError: unable to create new native thread

2016-11-22 Thread Mohit Durgapal
Hi Everyone,


I am getting the following error while running a spark streaming example on
my local machine, the being ingested is only 506kb.


*16/11/23 03:05:54 INFO MappedDStream: Slicing from 1479850537180 ms to
1479850537235 ms (aligned to 1479850537180 ms and 1479850537235 ms)*

*Exception in thread "streaming-job-executor-0" java.lang.OutOfMemoryError:
unable to create new native thread*


I looked it up and found out that it could be related to ulimit, I even
increased the ulimit to 1 but still the same error.


Regards

Mohit


How do I persist the data after I process the data with Structured streaming...

2016-11-22 Thread shyla deshpande
Hi,

Structured streaming works great with Kafka source but I need to persist
the data after processing in some database like Cassandra or at least
Postgres.

Any suggestions, help please.

Thanks


Re: How to write a custom file system?

2016-11-22 Thread Steve Loughran

On 21 Nov 2016, at 17:26, Samy Dindane 
> wrote:

Hi,

I'd like to extend the file:// file system and add some custom logic to the API 
that lists files.
I think I need to extend FileSystem or LocalFileSystem from 
org.apache.hadoop.fs, but I am not sure how to go about it exactly.


subclass it, then declare it as a

spark.hadoop.fs.samy.impl=SamyFSClass

to use urls like samy://home/files/data/*


You can also rebind file:// to point to your new fs, by overrriding fs.file.impl

There's a fairly formal definition of what a filesystem is meant to do

https://hadoop.apache.org/docs/stable2/hadoop-project-dist/hadoop-common/filesystem/filesystem.html

and lots of contract tests on each of the operations; you can find them al in 
the Hadoop-common/tests source tree...if you are thinking of doing anything 
non-trivial with filesystems, get these tests working before you start changing 
things. But know that as these tests don't generate load or concurrent 
requests, aren't sufficient to say that stuff works, only identify when it is 
broken at a basic level.


 GlusterFS comes from Redhat, they've got a connector which works with Hadoop & 
Spark code. Have you used it?


How to write a custom file system and make it usable by Spark?

Thank you,

Samy

-
To unsubscribe e-mail: 
user-unsubscr...@spark.apache.org





Fault-tolerant Accumulators in stateful operators.

2016-11-22 Thread Amit Sela
Hi all,

To recover (functionally) Accumulators from Driver failure in a streaming
application, we wrap them in a "getOrCreate" Singleton as shown here

.
I was wondering how that works if I use the Accumulators inside a
mapWithState/updateStateByKey operator ? Has anyone used Accumulators in
one of the stateful operators ?

Thanks,
Amit


Pregel Question

2016-11-22 Thread Saliya Ekanayake
Hi,

I've created a graph with vertex data of the form (Int, Array[Int]). I am
using the pregel operator to update values of the array for each vertex.

So my vprog has the following signature. Note the message is a map of
arrays from neighbors

def vprog(vertexId: VertexId, value: (Int, Array[Int]), message:
scala.collection.mutable.HashMap[Int, Array[Int]])

The full program is attached here. The expectation is vprog() to update the
array and then sendMsg() to send the updates to the neighbors.

However, this requires cloning the vertex every time in the vprog()
function. If I don't clone Spark would send the same array that it got
after the initial call.

Is there a way to turn off this caching effect?

Thank you,
Saliya


-- 
Saliya Ekanayake, Ph.D
Applied Computer Scientist
Network Dynamics and Simulation Science Laboratory (NDSSL)
Virginia Tech, Blacksburg


PregelExample2.rtf
Description: RTF file

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Re: Pasting into spark-shell doesn't work for Databricks example

2016-11-22 Thread Shixiong(Ryan) Zhu
The workaround is defining the imports and class together using ":paste".

On Tue, Nov 22, 2016 at 11:12 AM, Shixiong(Ryan) Zhu <
shixi...@databricks.com> wrote:

> This relates to a known issue: https://issues.apache.
> org/jira/browse/SPARK-14146 and https://issues.scala-lang.
> org/browse/SI-9799
>
> On Tue, Nov 22, 2016 at 6:37 AM, dbolshak 
> wrote:
>
>> Hello,
>>
>> We have the same issue,
>>
>> We use latest release 2.0.2.
>>
>> Setup with 1.6.1 works fine.
>>
>> Could somebody provide a workaround how to fix that?
>>
>> Kind regards,
>> Denis
>>
>>
>>
>> --
>> View this message in context: http://apache-spark-user-list.
>> 1001560.n3.nabble.com/Pasting-into-spark-shell-doesn-t-work-
>> for-Databricks-example-tp28113p28116.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>


Re: Pasting into spark-shell doesn't work for Databricks example

2016-11-22 Thread Shixiong(Ryan) Zhu
This relates to a known issue:
https://issues.apache.org/jira/browse/SPARK-14146 and
https://issues.scala-lang.org/browse/SI-9799

On Tue, Nov 22, 2016 at 6:37 AM, dbolshak  wrote:

> Hello,
>
> We have the same issue,
>
> We use latest release 2.0.2.
>
> Setup with 1.6.1 works fine.
>
> Could somebody provide a workaround how to fix that?
>
> Kind regards,
> Denis
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Pasting-into-spark-shell-doesn-t-work-
> for-Databricks-example-tp28113p28116.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


parallelizing model training ..

2016-11-22 Thread debasishg
Hello -

I have a question on parallelization of model training in Spark ..

Suppose I have this code fragment for training a model with KMeans ..

labeledData.foreachRDD { rdd =>
  val normalizedData: RDD[Vector] = normalize(rdd)
  val trainedModel: KMeansModel = trainModel(normalizedData, noOfClusters)
  //.. compute WCSSE
}

Here labeledData is a DStream that I fetched from Kafka.

Is there any way I can use the above fragment to train multiple models
parallely with different values of noOfClusters ? e.g.

(1 to 100).foreach { i =>
  labeledData.foreachRDD { rdd =>
val normalizedData: RDD[Vector] = normalize(rdd)
val trainedModel: KMeansModel = trainModel(normalizedData, i)
//.. compute WCSSE
  }
}

which will use all available CPUs parallely for the training ..

regards.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/parallelizing-model-training-tp28118.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: two spark-shells spark on mesos not working

2016-11-22 Thread Michael Gummelt
What are the full driver logs?  If you enable DEBUG logging, it should give
you more information about the rejected offers.  This can also happen if
offers are being accepted, but tasks immediately die for some reason.  You
should check the Mesos UI for failed tasks.  If they exist, please include
those logs here as well.

On Tue, Nov 22, 2016 at 4:52 AM, John Yost  wrote:

> Hi Everyone,
>
> There is probably an obvious answer to this, but not sure what it is. :)
>
> I am attempting to launch 2..n spark shells using Mesos as the master
> (this is to support 1..n researchers running pyspark stuff on our data). I
> can launch two or more spark shells without any problem. But, when I
> attempt any kind of operation that requires a Spark executor outside the
> driver program such as:
>
> val numbers = Ranger(1,1000)
> val pNumbers = sc.parallelize(numbers)
> pNumbers.take(5)
>
> I get the dreaded message:
> TaskSchedulerImpl: Initial job has not accepted any resources; check your
> cluster UI to ensure that workers are registered and sufficient resources
>
> I confirmed that both spark shells are listed as separate, uniquely-named
> Mesos frameworks and that there are plenty of CPU core and memory resources
> on our cluster.
>
> I am using Spark 2.0.1 on Mesos 0.28.1. Any ideas that y'all may have
> would be very much appreciated.
>
> Thanks! :)
>
> --John
>
>


-- 
Michael Gummelt
Software Engineer
Mesosphere


Re: Cluster deploy mode driver location

2016-11-22 Thread Masood Krohy
You may also try distributing your JARS along with your Spark app; see 
options below. You put on the client node whatever that is necessary and 
submit them all in each run. There is also a --files option which you can 
remove below, but may be helpful for some configs.

You do not need to specify all the arguments; the default values are 
picked up when not explicitly given.

spark-submit \
--master yarn \
--deploy-mode cluster \
--num-executors 2 \
--driver-memory 4g \
--executor-memory 8g \
--files /usr/hdp/current/spark-client/conf/hive-site.xml \
--jars 
/usr/hdp/current/spark-client/lib/datanucleus-api-jdo-3.2.6.jar,/usr/hdp/current/spark-client/lib/datanucleus-rdbms-3.2.9.jar,/usr/hdp/current/spark-client/lib/datanucleus-core-3.2.10.jar
 
\
--class "SparkApp"  \
/pathToAppOnTheClientNode/SparkApp.jar (if any, arguments passed to the 
Spark App here)

Masood


--
Masood Krohy, Ph.D. 
Data Scientist, Intact Lab-R 
Intact Financial Corporation 
http://ca.linkedin.com/in/masoodkh 



De :Silvio Fiorito 
A : "saif.a.ell...@wellsfargo.com" , 
"user@spark.apache.org" 
Date :  2016-11-22 08:02
Objet : Re: Cluster deploy mode driver location



Hi Saif!

Unfortunately, I don't think this is possible for YARN driver-cluster 
mode. Regarding the JARs you're referring to, can you place them on HDFS 
so you can then have them in a central location and refer to them that way 
for dependencies?

http://spark.apache.org/docs/latest/submitting-applications.html#advanced-dependency-management

Thanks,
Silvio

From: saif.a.ell...@wellsfargo.com 
Sent: Monday, November 21, 2016 2:04:06 PM
To: user@spark.apache.org
Subject: Cluster deploy mode driver location 
 
Hello there,
 
I have a Spark program in 1.6.1, however, when I submit it to cluster, it 
randomly picks the driver.
 
I know there is a driver specification option, but along with it it is 
mandatory to define many other options I am not familiar with. The trouble 
is, the .jars I am launching need to be available at the driver host, and 
I would like to have this jars in just a specific host, which I like it to 
be the driver.
 
Any help?
 
Thanks!
Saif
 



how does create dataframe from scala collection handle executor failure?

2016-11-22 Thread Mendelson, Assaf
Lets say I have loop that reads some data from somewhere, stores it in a 
collection and creates a dataframe from it. Then an executor containing part of 
the dataframe dies. How does spark handle it?

For example:
val dfSeq = for {
  I <- 0 to 1000
 V <- 0 to 100
 } yield sc.parallelize(V).toDF

Then I would do something with the dataframes (e.g. union them and do some 
calculation).

What would happen if an executor, holding one of the partitions for one of the 
dataframes crashes?
Does this mean I would lose the data? Or would spark save the original data to 
recreate it? If it saves the original data, where would it save it (the whole 
data could be very large, larger than driver memory).

If it loses the data, is there a way to give it a function or something to 
recreate it (e.g. V is read from somewhere and I can reread it if I just know 
what to read).

Thanks,
Assaf.



Re: find outliers within data

2016-11-22 Thread Yong Zhang
Spark Dataframe window functions?


https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html

Introducing Window Functions in Spark SQL - 
Databricks
databricks.com
To use window functions, users need to mark that a function is used as a window 
function by either. Adding an OVER clause after a supported function in SQL, 
e.g. avg ...






From: anup ahire 
Sent: Tuesday, November 22, 2016 11:00 AM
To: user@spark.apache.org
Subject: find outliers within data


I have a large data set with millions of records which is something like

Movie Likes Comments Shares Views
 A 100 10  20 30
 A 102 11  22 35
 A 104 12  25 45
 A *103*   13 *24*50
 B 200 10  20 30
 B 205*9*  21 35
 B *203*   12  29 42
 B 210 13 *23*   *39*


Likes, comments etc are rolling totals and they are suppose to increase. If 
there is drop in any of this for a movie then its a bad data needs to be 
identified.

I have initial thoughts about groupby movie and then sort within the group. I 
am using dataframes in spark 1.6 for processing and it does not seem to be 
achievable as there is no sorting within the grouped data in dataframe.

Buidling something for outlier detection can be another approach but because of 
time constraint I have not explored it yet.

Is there anyway I can achieve this ?

Thanks !!


Re: Is there a processing speed difference between DataFrames and Datasets?

2016-11-22 Thread Sean Owen
DataFrames are a narrower, more specific type of abstraction, for tabular
data. Where your data is tabular, it makes more sense to use, especially
because this knowledge means a lot more can be optimized under the hood for
you, whereas the framework can do nothing with an RDD of arbitrary objects.
DataFrames are not somehow a "better RDD".

Datasets are more like the new RDDs, supporting more general objects and
programmatic access. Still a different thing for a different purpose from
DataFrames. But has an API more similar to DataFrames and some of the same
types of benefits for simple types via Encoders.

On Tue, Nov 22, 2016 at 2:50 PM jggg777  wrote:

> I've seen a number of visuals showing the processing time benefits of using
> Datasets+DataFrames over RDDs, but I'd assume that there are performance
> benefits to using a defined case class instead a generic Dataset[Row].  The
> tale of three Spark APIs post mentions "If you want higher degree of
> type-safety at compile time, want typed JVM objects, *take advantage of
> Catalyst optimization, and benefit from Tungsten’s efficient code
> generation, use Dataset.*"
>
> Are there any comparisons showing the performance differences between
> Datasets and DataFrames?  Or more information about how Catalyst/Tungsten
> handle them differently?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Is-there-a-processing-speed-difference-between-DataFrames-and-Datasets-tp28117.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Is there a processing speed difference between DataFrames and Datasets?

2016-11-22 Thread jggg777
I've seen a number of visuals showing the processing time benefits of using
Datasets+DataFrames over RDDs, but I'd assume that there are performance
benefits to using a defined case class instead a generic Dataset[Row].  The
tale of three Spark APIs post mentions "If you want higher degree of
type-safety at compile time, want typed JVM objects, *take advantage of
Catalyst optimization, and benefit from Tungsten’s efficient code
generation, use Dataset.*"

Are there any comparisons showing the performance differences between
Datasets and DataFrames?  Or more information about how Catalyst/Tungsten
handle them differently?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Is-there-a-processing-speed-difference-between-DataFrames-and-Datasets-tp28117.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Pasting into spark-shell doesn't work for Databricks example

2016-11-22 Thread dbolshak
Hello,

We have the same issue,

We use latest release 2.0.2.

Setup with 1.6.1 works fine.

Could somebody provide a workaround how to fix that?

Kind regards,
Denis



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Pasting-into-spark-shell-doesn-t-work-for-Databricks-example-tp28113p28116.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Pasting into spark-shell doesn't work for Databricks example

2016-11-22 Thread Denis Bolshakov
Hello,

We have the same issue,

We use latest release 2.0.2.

Setup with 1.6.1 works fine.

Could somebody provide a workaround how to fix that?

Kind regards,
Denis

On 21 November 2016 at 20:23, jggg777  wrote:

> I'm simply pasting in the UDAF example from this page and getting errors
> (basic EMR setup with Spark 2.0):
> https://docs.cloud.databricks.com/docs/latest/databricks_
> guide/index.html#04%20SQL,%20DataFrames%20%26%20Datasets/
> 03%20UDF%20and%20UDAF%20-%20scala.html
>
> The imports appear to work, but then I see errors like "not found: type
> UserDefinedAggregateFunction".
>
> If I run ":paste" and paste it in that way it does work, but I'm interested
> in knowing why Ctrl-V doesn't.  What is happening under the hood which
> makes
> it seem like the imports are working even though they aren't?  And is there
> a way to fix this in general?
>
> >>>
> scala> import org.apache.spark.sql.expressions.MutableAggregationBuffer
> import org.apache.spark.sql.expressions.MutableAggregationBuffer
>
> scala> import org.apache.spark.sql.expressions.
> UserDefinedAggregateFunction
> import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
>
> scala> import org.apache.spark.sql.Row
> import org.apache.spark.sql.Row
>
> scala> import org.apache.spark.sql.types._
> import org.apache.spark.sql.types._
>
> scala>
>
> scala> class GeometricMean extends UserDefinedAggregateFunction {
>  |   // This is the input fields for your aggregate function.
>  |   override def inputSchema: org.apache.spark.sql.types.StructType =
>  | StructType(StructField("value", DoubleType) :: Nil)
>  |
>  |   // This is the internal fields you keep for computing your
> aggregate.
>  |   override def bufferSchema: StructType = StructType(
>  | StructField("count", LongType) ::
>  | StructField("product", DoubleType) :: Nil
>  |   )
>  |
>  |   // This is the output type of your aggregatation function.
>  |   override def dataType: DataType = DoubleType
>  |
>  |   override def deterministic: Boolean = true
>  |
>  |   // This is the initial value for your buffer schema.
>  |   override def initialize(buffer: MutableAggregationBuffer): Unit =
> {
>  | buffer(0) = 0L
>  | buffer(1) = 1.0
>  |   }
>  |
>  |   // This is how to update your buffer schema given an input.
>  |   override def update(buffer: MutableAggregationBuffer, input: Row):
> Unit = {
>  | buffer(0) = buffer.getAs[Long](0) + 1
>  | buffer(1) = buffer.getAs[Double](1) * input.getAs[Double](0)
>  |   }
>  |
>  |   // This is how to merge two objects with the bufferSchema type.
>  |   override def merge(buffer1: MutableAggregationBuffer, buffer2:
> Row): Unit = {
>  | buffer1(0) = buffer1.getAs[Long](0) + buffer2.getAs[Long](0)
>  | buffer1(1) = buffer1.getAs[Double](1) * buffer2.getAs[Double](1)
>  |   }
>  |
>  |   // This is where you output the final value, given the final value
> of your bufferSchema.
>  |   override def evaluate(buffer: Row): Any = {
>  | math.pow(buffer.getDouble(1), 1.toDouble / buffer.getLong(0))
>  |   }
>  | }
> :11: error: not found: type UserDefinedAggregateFunction
>class GeometricMean extends UserDefinedAggregateFunction {
>^
> :14: error: not found: value StructType
>StructType(StructField("value", DoubleType) :: Nil)
>^
> :14: error: not found: value StructField
>StructType(StructField("value", DoubleType) :: Nil)
>   ^
> :14: error: not found: value DoubleType
>StructType(StructField("value", DoubleType) :: Nil)
>^
> :17: error: not found: type StructType
>  override def bufferSchema: StructType = StructType(
> ^
> :17: error: not found: value StructType
>  override def bufferSchema: StructType = StructType(
>  ^
> :18: error: not found: value StructField
>StructField("count", LongType) ::
>^
> :18: error: not found: value LongType
>StructField("count", LongType) ::
> ^
> :19: error: not found: value StructField
>StructField("product", DoubleType) :: Nil
>^
> :19: error: not found: value DoubleType
>StructField("product", DoubleType) :: Nil
>   ^
> :23: error: not found: type DataType
>  override def dataType: DataType = DoubleType
> ^
> :23: error: not found: value DoubleType
>  override def dataType: DataType = DoubleType
>^
> :28: error: not found: type MutableAggregationBuffer
>  override def initialize(buffer: 

[Spark Streaming] map and window operation on DStream only process one batch

2016-11-22 Thread Hao Ren
Spark Streaming v 1.6.2
Kafka v0.10.1

I am reading msgs from Kafka.
What surprised me is the following DStream only process the first batch.

KafkaUtils.createDirectStream[
  String,
  String,
  StringDecoder,
  StringDecoder](streamingContext, kafkaParams, Set(topic))
  .map(_._2)
  .window(Seconds(windowLengthInSec))

Some logs as below are endlessly repeated:

16/11/22 14:20:40 INFO MappedDStream: Slicing from 1479820835000 ms to
147982084 ms (aligned to 1479820835000 ms and 147982084 ms)
16/11/22 14:20:40 INFO JobScheduler: Added jobs for time 147982084 ms

And the action on the DStream is just a rdd count

windowedStream foreachRDD { rdd => rdd.count }

>From the webUI, only the first batch is in status: Processing, the others
are all Queued.
However, if I permute map and window operation, everything is ok.

KafkaUtils.createDirectStream[
  String,
  String,
  StringDecoder,
  StringDecoder](streamingContext, kafkaParams, Set(topic))
  .window(Seconds(windowLengthInSec))
  .map(_._2)

I think the two are equivalent. But they are not.

Furthermore, if I replace my KafkaDStream with a QueueStream, it works for
no matter which order of map and window operation.

I am not sure whether this is related with KafkaDStream or just DStream.

Any help is appreciated.

-- 
Hao Ren

Data Engineer @ leboncoin

Paris, France


Re: Cluster deploy mode driver location

2016-11-22 Thread Silvio Fiorito
Hi Saif!


Unfortunately, I don't think this is possible for YARN driver-cluster mode. 
Regarding the JARs you're referring to, can you place them on HDFS so you can 
then have them in a central location and refer to them that way for 
dependencies?


http://spark.apache.org/docs/latest/submitting-applications.html#advanced-dependency-management


Thanks,

Silvio


From: saif.a.ell...@wellsfargo.com 
Sent: Monday, November 21, 2016 2:04:06 PM
To: user@spark.apache.org
Subject: Cluster deploy mode driver location

Hello there,

I have a Spark program in 1.6.1, however, when I submit it to cluster, it 
randomly picks the driver.

I know there is a driver specification option, but along with it it is 
mandatory to define many other options I am not familiar with. The trouble is, 
the .jars I am launching need to be available at the driver host, and I would 
like to have this jars in just a specific host, which I like it to be the 
driver.

Any help?

Thanks!
Saif



two spark-shells spark on mesos not working

2016-11-22 Thread John Yost
Hi Everyone,

There is probably an obvious answer to this, but not sure what it is. :)

I am attempting to launch 2..n spark shells using Mesos as the master (this
is to support 1..n researchers running pyspark stuff on our data). I can
launch two or more spark shells without any problem. But, when I attempt
any kind of operation that requires a Spark executor outside the driver
program such as:

val numbers = Ranger(1,1000)
val pNumbers = sc.parallelize(numbers)
pNumbers.take(5)

I get the dreaded message:
TaskSchedulerImpl: Initial job has not accepted any resources; check your
cluster UI to ensure that workers are registered and sufficient resources

I confirmed that both spark shells are listed as separate, uniquely-named
Mesos frameworks and that there are plenty of CPU core and memory resources
on our cluster.

I am using Spark 2.0.1 on Mesos 0.28.1. Any ideas that y'all may have would
be very much appreciated.

Thanks! :)

--John


Re: RDD Partitions on HDFS file in Hive on Spark Query

2016-11-22 Thread ayan guha
Because snappy is not splittable, so single task makes sense.

Are sure about rack topology? Ie 225 is in a different rack than 227 or
228? What does your topology file says?
On 22 Nov 2016 10:14, "yeshwanth kumar"  wrote:

> Thanks for your reply,
>
> i can definitely change the underlying compression format.
> but i am trying to understand the Locality Level,
> why executor ran on a different node, where the blocks are not present,
> when Locality Level is RACK_LOCAL
>
> can you shed some light on this.
>
>
> Thanks,
> Yesh
>
>
> -Yeshwanth
> Can you Imagine what I would do if I could do all I can - Art of War
>
> On Mon, Nov 21, 2016 at 4:59 PM, Jörn Franke  wrote:
>
>> Use as a format orc, parquet or avro because they support any compression
>> type with parallel processing. Alternatively split your file in several
>> smaller ones. Another alternative would be bzip2 (but slower in general) or
>> Lzo (usually it is not included by default in many distributions).
>>
>> On 21 Nov 2016, at 23:17, yeshwanth kumar  wrote:
>>
>> Hi,
>>
>> we are running Hive on Spark, we have an external table over snappy
>> compressed csv file of size 917.4 M
>> HDFS block size is set to 256 MB
>>
>> as per my Understanding, if i run a query over that external table , it
>> should launch 4 tasks. one for each block.
>> but i am seeing one executor and one task processing all the file.
>>
>> trying to understand the reason behind,
>>
>> i went one step further to understand the block locality
>> when i get the block locations for that file, i found
>>
>> [DatanodeInfoWithStorage[10.11.0.226:50010,DS-bf39d33d-48e1-
>> 4a8f-be48-b0953fdaad37,DISK],
>>  DatanodeInfoWithStorage[10.11.0.227:50010,DS-a760c1c8-ce0c-
>> 4eb8-8183-8d8ff5f24115,DISK],
>>  DatanodeInfoWithStorage[10.11.0.228:50010,DS-0e5427e2-b030-
>> 43f8-91c9-d8517e68414a,DISK]]
>>
>> DatanodeInfoWithStorage[10.11.0.226:50010,DS-f50ddf2f-b827-4
>> 845-b043-8b91ae4017c0,DISK],
>> DatanodeInfoWithStorage[10.11.0.228:50010,DS-e8c9785f-c352-4
>> 89b-8209-4307f3296211,DISK],
>> DatanodeInfoWithStorage[10.11.0.225:50010,DS-6f6a3ffd-334b-4
>> 5fd-ae0f-cc6eb268b0d2,DISK]]
>>
>> DatanodeInfoWithStorage[10.11.0.226:50010,DS-f8bea6a8-a433-4
>> 601-8070-f6c5da840e09,DISK],
>> DatanodeInfoWithStorage[10.11.0.227:50010,DS-8aa3f249-790e-4
>> 94d-87ee-bcfff2182a96,DISK],
>> DatanodeInfoWithStorage[10.11.0.228:50010,DS-d06714f4-2fbb-4
>> 8d3-b858-a023b5c44e9c,DISK]
>>
>> DatanodeInfoWithStorage[10.11.0.226:50010,DS-b3a00781-c6bd-4
>> 98c-a487-5ce6aaa66f48,DISK],
>> DatanodeInfoWithStorage[10.11.0.228:50010,DS-fa5aa339-e266-4
>> e20-a360-e7cdad5dacc3,DISK],
>> DatanodeInfoWithStorage[10.11.0.225:50010,DS-9d597d3f-cd4f-4
>> c8f-8a13-7be37ce769c9,DISK]]
>>
>> and in the spark UI i see the Locality Level is  RACK_LOCAL. for that task
>>
>> if it is RACK_LOCAL then it should run either in node 10.11.0.226 or
>> 10.11.0.228, because these 2 nodes has all the four blocks needed for
>> computation
>> but the executor is running in 10.11.0.225
>>
>> my theory is not applying anywhere.
>>
>> please help me in understanding how spark/yarn calculates number of
>> executors/tasks.
>>
>> Thanks,
>> -Yeshwanth
>>
>>
>


Re: Kafka direct approach,App UI shows wrong input rate

2016-11-22 Thread Julian Keppel
Oh, sorry. I made a mistake... It's spark version 2.0.1, not 2.0.2.

When I wrote the initial message I built my app with 2.0.2 and deployed it
on a cluster with 2.0.1. So I thought this could be the problem. But now I
changed it and build my app with 2.0.1 but the problem still remains.

2016-11-19 18:06 GMT+01:00 Cody Koeninger :

> There have definitely been issues with UI reporting for the direct
> stream in the past, but I'm not able to reproduce this with 2.0.2 and
> 0.8.  See below:
>
> https://i.imgsafe.org/086019ae57.png
>
>
>
> On Fri, Nov 18, 2016 at 4:38 AM, Julian Keppel
>  wrote:
> > Hello,
> >
> > I use Spark 2.0.2 with Kafka integration 0-8. The Kafka version is
> 0.10.0.1
> > (Scala 2.11). I read data from Kafka with the direct approach. The
> complete
> > infrastructure runs on Google Container Engine.
> >
> > I wonder why the corresponding application UI says the input rate is zero
> > records per second. This is definitely wrong. I checked it while I
> printed
> > out the incoming records to the driver console. All other metrics seem
> to be
> > correct (at least they are realistic).
> >
> > What is going on here? Do you have any idea? Thanks for you help.
> >
> > Julian
>


Re: newbie question about RDD

2016-11-22 Thread Mohit Durgapal
Hi Raghav,

Please refer to the following code:

SparkConf sparkConf = new
SparkConf().setMaster("local[2]").setAppName("PersonApp");

//creating java spark context

JavaSparkContext sc = new JavaSparkContext(sparkConf);

//reading file from hfs into spark rdd , the name node is localhost
JavaRDD personStringRDD =
sc.textFile("hdfs://localhost:9000/custom/inputPersonFile.txt");


//Converting from String RDD to Person RDD ...this is just an example,
you can replace the parsing with a better exception handled code

JavaRDD personObjectRDD = personStringRDD.map(personRow -> {
String[] personValues = personRow.split("\t");

return new Person(Long.parseLong(personValues[0]),
personValues[1], personValues[2],
personValues[3]);
});

//finally just printing the count of objects
System.out.println("Person count = "+personObjectRDD.count());


Regards
Mohit


On Tue, Nov 22, 2016 at 11:17 AM, Raghav  wrote:

> Sorry I forgot to ask how can I use spark context here ? I have hdfs
> directory path of the files, as well as the name node of hdfs cluster.
>
> Thanks for your help.
>
> On Mon, Nov 21, 2016 at 9:45 PM, Raghav  wrote:
>
>> Hi
>>
>> I am extremely new to Spark. I have to read a file form HDFS, and get it
>> in memory  in RDD format.
>>
>> I have a Java class as follows:
>>
>> class Person {
>> private long UUID;
>> private String FirstName;
>> private String LastName;
>> private String zip;
>>
>>// public methods
>> }
>>
>> The file in HDFS is as follows:
>>
>> UUID. FirstName LastName Zip
>> 7462   John Doll06903
>> 5231   Brad Finley 32820
>>
>>
>> Can someone point me how to get a JavaRDD object by reading the
>> file in HDFS ?
>>
>> Thanks.
>>
>> --
>> Raghav
>>
>
>
>
> --
> Raghav
>