Re: How to export a project to a JAR in Scala IDE for eclipse Correctly?

2016-07-26 Thread Sachin Mittal
Why don't you install sbt and try sbt assembly to create a scala jar.
You can using this jar to your spark submit jobs.

In case there are additional dependencies these can be passed as --jars
(comma separated jar paths) option to spark submit.



On Wed, Jul 27, 2016 at 11:53 AM,  wrote:

> Hi there:
>  I export a project into jar like this "right click my project->choose
> export ->java-> jar file-> next->choose "src/main/resouces" and
> ''src/main/scala"'-> clikc browse and choose a jar file export location->
> choose overwrite it",  and this jar is unable to run with "java -jar
> myjar.jar".  It says "no main manifest attribute, in /opt/MyJar.jar". It
> seems the file MANIFEST.MF lost the main class info when I export the jar.
> I opened the MANIFEST.MF in my exported jar, and finds only below info:
> Manifest-Version: 1.0
>
>
>
> However I checked the MANIFEST.MF in my project, it is like this:
> Manifest-Version: 1.0
> Main-Class: org.abc.spark.streaming.Producer
> Class-Path: lib/spark-core_2.10-1.6.1.jar ...
>
> in my class of producer, I didn't use spark, so I use java -jar to run it.
>
> I also tried to export my project as a runnable jar, but my class can not
> be run as a java application, the run button is not able to click.
>
> So could anyone share a hand?
>
> 
>
> Thanks&Best regards!
> San.Luo
>


Setting spark.sql.shuffle.partitions Dynamically

2016-07-26 Thread Brandon White
Hello,

My platform runs hundreds of Spark jobs every day each with its own
datasize from 20mb to 20TB. This means that we need to set resources
dynamically. One major pain point for doing this is
spark.sql.shuffle.partitions, the number of partitions to use when
shuffling data for joins or aggregations. It is to be arbitrarily hard
coded to 200. The only way to set this config is in the spark submit
command or in the SparkConf before the executor is created.

This creates a lot of problems when I want to set this config dynamically
based on the in memory size of a dataframe. I only know the in memory size
of the dataframe halfway through the spark job. So I would need to stop the
context and recreate it in order to set this config.

Is there any better way to set this? How does  spark.sql.shuffle.partitions
work differently than .repartition?

Brandon


How to export a project to a JAR in Scala IDE for eclipse Correctly?

2016-07-26 Thread luohui20001
Hi there: I export a project into jar like this "right click my 
project->choose export ->java-> jar file-> next->choose "src/main/resouces" and 
''src/main/scala"'-> clikc browse and choose a jar file export location-> 
choose overwrite it",  and this jar is unable to run with "java -jar 
myjar.jar".  It says "no main manifest attribute, in /opt/MyJar.jar". It seems 
the file MANIFEST.MF lost the main class info when I export the jar. I opened 
the MANIFEST.MF in my exported jar, and finds only below info:Manifest-Version: 
1.0

 However I checked the MANIFEST.MF in my project, it is like 
this:Manifest-Version: 1.0
Main-Class: org.abc.spark.streaming.Producer
Class-Path: lib/spark-core_2.10-1.6.1.jar ...
in my class of producer, I didn't use spark, so I use java -jar to run it.
I also tried to export my project as a runnable jar, but my class can not be 
run as a java application, the run button is not able to click.
So could anyone share a hand?



 

Thanks&Best regards!
San.Luo


[ANNOUNCE] Announcing Apache Spark 2.0.0

2016-07-26 Thread Reynold Xin
Hi all,

Apache Spark 2.0.0 is the first release of Spark 2.x line. It includes
2500+ patches from 300+ contributors.

To download Spark 2.0, head over to the download page:
http://spark.apache.org/downloads.html

To view the release notes:
http://spark.apache.org/releases/spark-release-2-0-0.html


(note: it can take a few hours for everything to be propagated, so you
might get 404 on some download links.  If you see any issues with the
release notes or webpage *please contact me directly, off-list*)


Re: Configure Spark to run with MemSQL DB Cluster

2016-07-26 Thread yash datta
Read here:

https://github.com/memsql/memsql-spark-connector

Best Regards
Yash

On Wed, Jul 27, 2016 at 12:54 PM, Subhajit Purkayastha 
wrote:

> All,
>
>
>
> Is it possible to integrate spark 1.6.1 with MemSQL Cluster? Any pointers
> on how to start with the project will be appreciated.
>
>
>
> Thx,
>
>
>
> Subhajit
>



-- 
When events unfold with calm and ease
When the winds that blow are merely breeze
Learn from nature, from birds and bees
Live your life in love, and let joy not cease.


Configure Spark to run with MemSQL DB Cluster

2016-07-26 Thread Subhajit Purkayastha
All,

 

Is it possible to integrate spark 1.6.1 with MemSQL Cluster? Any pointers on
how to start with the project will be appreciated.

 

Thx,

 

Subhajit



Re: How to give name to Spark jobs shown in Spark UI

2016-07-26 Thread rahulkumar-aws
You can set name in SparkConf() or if You are using Spark submit set --name
flag

*val sparkconf = new SparkConf()*
* .setMaster("local[4]")*
* .setAppName("saveFileJob")*
*val sc = new SparkContext(sparkconf)*


or spark-submit :

*./bin/spark-submit --name "FileSaveJob" --master local[4]  fileSaver.jar*




On Mon, Jul 25, 2016 at 9:46 PM, neil90 [via Apache Spark User List] <
ml-node+s1001560n27406...@n3.nabble.com> wrote:

> As far as I know you can give a name to the SparkContext. I recommend
> using a cluster monitoring tool like Ganglia to determine were its slow in
> your spark jobs.
>
> --
> If you reply to this email, your message will be added to the discussion
> below:
>
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-give-name-to-Spark-jobs-shown-in-Spark-UI-tp27400p27406.html
> To start a new topic under Apache Spark User List, email
> ml-node+s1001560n1...@n3.nabble.com
> To unsubscribe from Apache Spark User List, click here
> 
> .
> NAML
> 
>




-
Software Developer
Sigmoid (SigmoidAnalytics), India

--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-give-name-to-Spark-jobs-shown-in-Spark-UI-tp27400p27414.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Spark Jobs not getting shown in Spark UI browser

2016-07-26 Thread Prashant verma
Hi All,
 I have recently started using  Spark 1.6.2 for running my spark jobs. But
now my jobs are not getting shown in the spark browser UI, even though the
job is running fine which i can see in shell output.

Any suggestions.

Thanks,
Prashant Verma


Fail a batch in Spark Streaming forcefully based on business rules

2016-07-26 Thread Hemalatha A
Hello,

I have a uescase where in, I have  to fail certain batches in my streaming
batches, based on my application specific business rules.
Ex: If in a batch of 2 seconds, I don't receive 100 message, I should fail
the batch and move on.

How to achieve this behavior?

-- 


Regards
Hemalatha


Re: Spark Beginner Question

2016-07-26 Thread Holden Karau
So you will need to convert your input DataFrame into something with
vectors and labels to train on - the Spark ML documentation has examples
http://spark.apache.org/docs/latest/ml-guide.html (although the website
seems to be having some issues mid update to Spark 2.0 so if you want to
read it right now
http://spark.apache.org/docs/1.6.2/ml-guide.html#example-pipeline )

As for why some algorithms are available in the RDD API and not the
DataFrame API yet - simply development time. The DataFrame/Pipeline time
will be the actively developed API going forward.

Cheers,

Holden :)

On Tuesday, July 26, 2016, Shi Yu  wrote:

> Hello,
>
> *Question 1: *I am new to Spark. I am trying to train classification
> model on Spark DataFrame. I am using PySpark.  And aFrame object in df:ted
> a Spark DataFrame object in df:
>
> from pyspark.sql.types import *
>
> query = """select * from table"""
>
> df = sqlContext.sql(query)
>
> My question is how to continue extend the code to train models (e.g., 
> classification model etc.) on object df?  I have checked many online 
> resources and haven't seen any similar approach like the following:
>
> lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)
> # Fit the modellrModel = lr.fit(df)
>
> Is it a feasible way to train the model? If yes, where could I find the 
> reference code?
>
> *Question 2:  *Why in MLib dataframe based API there is no SVM model support, 
> however, in RDD-based APIs there was SVM model?
>
> Thanks a lot!
>
>
> Best,
>
>
> Shi
>
>
>

-- 
Cell : 425-233-8271
Twitter: https://twitter.com/holdenkarau


Spark 2.0 just released

2016-07-26 Thread Chanh Le
Its official now http://spark.apache.org/releases/spark-release-2-0-0.html 

Everyone should check it out.




The Future Of DStream

2016-07-26 Thread Chang Chen
Hi guys

Structure Stream is coming with spark 2.0,  but I noticed that DStream is
still here

What's the future of the DStream, will it be deprecated and removed
eventually? Or co-existed with  Structure Stream forever?

Thanks
Chang


Spark Beginner Question

2016-07-26 Thread Shi Yu
Hello,

*Question 1: *I am new to Spark. I am trying to train classification model
on Spark DataFrame. I am using PySpark.  And aFrame object in df:ted a
Spark DataFrame object in df:

from pyspark.sql.types import *

query = """select * from table"""

df = sqlContext.sql(query)

My question is how to continue extend the code to train models (e.g.,
classification model etc.) on object df?  I have checked many online
resources and haven't seen any similar approach like the following:

lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)
# Fit the modellrModel = lr.fit(df)

Is it a feasible way to train the model? If yes, where could I find
the reference code?

*Question 2:  *Why in MLib dataframe based API there is no SVM model
support, however, in RDD-based APIs there was SVM model?

Thanks a lot!


Best,


Shi


Re: how to use spark.mesos.constraints

2016-07-26 Thread Rodrick Brown
Shuffle service has nothing to do with constraints it is however advised to
run the mesos-shuffle-service on each of your agent nodes running spark.

Here is the command I use to run a typical spark jobs on my cluster using
constraints (this is generated but from another script we run but should
give you a clear idea)

Jobs not being accepted by any resources could mean what you're asking for
is way larger than the resources you have available.

/usr/bin/timeout 3600 /opt/spark-1.6.1/bin/spark-submit
--master "mesos://zk://prod-zk-1:2181,prod-zk-2:2181,prod-zk-3:2181/mesos"
--conf spark.ui.port=40046
--conf spark.mesos.coarse=true
--conf spark.sql.broadcastTimeout=3600
--conf spark.cores.max=5
--conf spark.mesos.constraints="rack:spark"
--conf spark.sql.tungsten.enabled=true
--conf spark.shuffle.service.enabled=true
--conf spark.dynamicAllocation.enabled=true
--conf spark.mesos.executor.memoryOverhead=3211
--class
com.orchard.dataloader.library.originators..LoadAccountDetail_LC
--total-executor-cores 5
--driver-memory 5734M
--executor-memory 8028M
--jars /data/orchard/etc/config/load-accountdetail-accumulo-prod.jar
/data/orchard/jars/dataloader-library-assembled.jar 1

Nodes used for my spark jobs are all using the constraint 'rack:spark'

I hope this helps!


On Tue, Jul 26, 2016 at 7:10 PM, Jia Yu  wrote:

> Hi,
>
> I am also trying to use the spark.mesos.constraints but it gives me the
> same error: job has not be accepted by any resources.
>
> I am doubting that I should start some additional service like
> ./sbin/start-mesos-shuffle-service.sh. Am I correct?
>
> Thanks,
> Jia
>
> On Tue, Dec 1, 2015 at 5:14 PM, rarediel 
> wrote:
>
>> I am trying to add mesos constraints to my spark-submit command in my
>> marathon file I am setting it to spark.mesos.coarse=true.
>>
>> Here is an example of a constraint I am trying to set.
>>
>>  --conf spark.mesos.constraint=cpus:2
>>
>> I want to use the constraints to control the amount of executors are
>> created
>> so I can control the total memory of my spark job.
>>
>> I've tried many variations of resource constraints, but no matter which
>> resource or what number, range, etc. I do I always get the error "Initial
>> job has not accepted any resources; check your cluster UI...".  My cluster
>> has the available resources.  Is there any examples I can look at where
>> people use resource constraints?
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/how-to-use-spark-mesos-constraints-tp25541.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


-- 

[image: Orchard Platform] 

*Rodrick Brown */ *DevOPs*

9174456839 / rodr...@orchardplatform.com

Orchard Platform
101 5th Avenue, 4th Floor, New York, NY

-- 
*NOTICE TO RECIPIENTS*: This communication is confidential and intended for 
the use of the addressee only. If you are not an intended recipient of this 
communication, please delete it immediately and notify the sender by return 
email. Unauthorized reading, dissemination, distribution or copying of this 
communication is prohibited. This communication does not constitute an 
offer to sell or a solicitation of an indication of interest to purchase 
any loan, security or any other financial product or instrument, nor is it 
an offer to sell or a solicitation of an indication of interest to purchase 
any products or services to any persons who are prohibited from receiving 
such information under applicable law. The contents of this communication 
may not be accurate or complete and are subject to change without notice. 
As such, Orchard App, Inc. (including its subsidiaries and affiliates, 
"Orchard") makes no representation regarding the accuracy or completeness 
of the information contained herein. The intended recipient is advised to 
consult its own professional advisors, including those specializing in 
legal, tax and accounting matters. Orchard does not provide legal, tax or 
accounting advice.


Re: Maintaining order of pair rdd

2016-07-26 Thread Kuchekar
Hi Janardhan,

   You could something like this :

For maintaining the insertion order by the key  first partition by Key (so
that each key is located in the same partition) and after that you can do
something like this.

RDD.mapValues( x => ArrayBuffer(x)).reduceByKey(x,y => x++y )

The idea is create an ArrayBuffer (This maintains insertion order).


More elegant solution would be using zipByIndex on the pair RDD and then
sort by the index in each groupByKey

RDD.zipByIndex ==> This will give you something like this value
((x,y),index)

now map it like this (x,(y,index))

now reduce by key so and then sort the internal with the index value.


Thanks.
Kuchekar, Nilesh

On Tue, Jul 26, 2016 at 7:35 PM, janardhan shetty 
wrote:

> Let me provide step wise details:
>
> 1.
> I have an RDD  = {
> (ID2,18159) - *element 1  *
> (ID1,18159) - *element 2*
> (ID3,18159) - *element 3*
> (ID2,36318) - *element 4 *
> (ID1,36318) - *element 5*
> (ID3,36318)
> (ID2,54477)
> (ID1,54477)
> (ID3,54477)
> }
>
> 2. RDD.groupByKey().mapValues(v => v.toArray())
>
> Array(
> (ID1,Array(*18159*, 308703, 72636, 64544, 39244, 107937, *54477*, 145272,
> 100079, *36318*, 160992, 817, 89366, 150022, 19622, 44683, 58866, 162076,
> 45431, 100136)),
> (ID3,Array(100079, 19622, *18159*, 212064, 107937, 44683, 150022, 39244,
> 100136, 58866, 72636, 145272, 817, 89366, * 54477*, *36318*, 308703,
> 160992, 45431, 162076)),
> (ID2,Array(308703, * 54477*, 89366, 39244, 150022, 72636, 817, 58866,
> 44683, 19622, 160992, 107937, 100079, 100136, 145272, 64544, *18159*,
> 45431, *36318*, 162076))
> )
>
>
> whereas in Step 2 I need as below:
>
> Array(
> (ID1,Array(*18159*,*36318*, *54477,...*)),
> (ID3,Array(*18159*,*36318*, *54477, ...*)),
> (ID2,Array(*18159*,*36318*, *54477, ...*))
> )
>
> Does this help ?
>
> On Tue, Jul 26, 2016 at 2:25 AM, Marco Mistroni 
> wrote:
>
>> Apologies janardhan, i always get confused on this
>> Ok. so you have a  (key, val) RDD (val is irrelevant here)
>>
>> then you can do this
>> val reduced = myRDD.reduceByKey((first, second) => first  ++ second)
>>
>> val sorted = reduced.sortBy(tpl => tpl._1)
>>
>> hth
>>
>>
>>
>> On Tue, Jul 26, 2016 at 3:31 AM, janardhan shetty > > wrote:
>>
>>> groupBy is a shuffle operation and index is already lost in this process
>>> if I am not wrong and don't see *sortWith* operation on RDD.
>>>
>>> Any suggestions or help ?
>>>
>>> On Mon, Jul 25, 2016 at 12:58 AM, Marco Mistroni 
>>> wrote:
>>>
 Hi
  after you do a groupBy you should use a sortWith.
 Basically , a groupBy reduces your structure to (anyone correct me if i
 m wrong) a RDD[(key,val)], which you can see as a tuple.so you could
 use sortWith (or sortBy, cannot remember which one) (tpl=> tpl._1)
 hth

 On Mon, Jul 25, 2016 at 1:21 AM, janardhan shetty <
 janardhan...@gmail.com> wrote:

> Thanks Marco. This solved the order problem. Had another question
> which is prefix to this.
>
> As you can see below ID2,ID1 and ID3 are in order and I need to
> maintain this index order as well. But when we do groupByKey 
> operation(*rdd.distinct.groupByKey().mapValues(v
> => v.toArray*))
> everything is *jumbled*.
> Is there any way we can maintain this order as well ?
>
> scala> RDD.foreach(println)
> (ID2,18159)
> (ID1,18159)
> (ID3,18159)
>
> (ID2,18159)
> (ID1,18159)
> (ID3,18159)
>
> (ID2,36318)
> (ID1,36318)
> (ID3,36318)
>
> (ID2,54477)
> (ID1,54477)
> (ID3,54477)
>
> *Jumbled version : *
> Array(
> (ID1,Array(*18159*, 308703, 72636, 64544, 39244, 107937, *54477*,
> 145272, 100079, *36318*, 160992, 817, 89366, 150022, 19622, 44683,
> 58866, 162076, 45431, 100136)),
> (ID3,Array(100079, 19622, *18159*, 212064, 107937, 44683, 150022,
> 39244, 100136, 58866, 72636, 145272, 817, 89366, * 54477*, *36318*,
> 308703, 160992, 45431, 162076)),
> (ID2,Array(308703, * 54477*, 89366, 39244, 150022, 72636, 817, 58866,
> 44683, 19622, 160992, 107937, 100079, 100136, 145272, 64544, *18159*,
> 45431, *36318*, 162076))
> )
>
> *Expected output:*
> Array(
> (ID1,Array(*18159*,*36318*, *54477,...*)),
> (ID3,Array(*18159*,*36318*, *54477, ...*)),
> (ID2,Array(*18159*,*36318*, *54477, ...*))
> )
>
> As you can see after *groupbyKey* operation is complete item 18519 is
> in index 0 for ID1, index 2 for ID3 and index 16 for ID2 where as expected
> is index 0
>
>
> On Sun, Jul 24, 2016 at 12:43 PM, Marco Mistroni 
> wrote:
>
>> Hello
>>  Uhm you have an array containing 3 tuples?
>> If all the arrays have same length, you can just zip all of them,
>> creatings a list of tuples
>> then you can scan the list 5 by 5...?
>>
>> so something like
>>
>> (Array(0)_2,Array(1)._2,Array(2)._2).zipped.toList
>>
>> this will give you a list

Re: Maintaining order of pair rdd

2016-07-26 Thread janardhan shetty
Let me provide step wise details:

1.
I have an RDD  = {
(ID2,18159) - *element 1  *
(ID1,18159) - *element 2*
(ID3,18159) - *element 3*
(ID2,36318) - *element 4 *
(ID1,36318) - *element 5*
(ID3,36318)
(ID2,54477)
(ID1,54477)
(ID3,54477)
}

2. RDD.groupByKey().mapValues(v => v.toArray())

Array(
(ID1,Array(*18159*, 308703, 72636, 64544, 39244, 107937, *54477*, 145272,
100079, *36318*, 160992, 817, 89366, 150022, 19622, 44683, 58866, 162076,
45431, 100136)),
(ID3,Array(100079, 19622, *18159*, 212064, 107937, 44683, 150022, 39244,
100136, 58866, 72636, 145272, 817, 89366, * 54477*, *36318*, 308703,
160992, 45431, 162076)),
(ID2,Array(308703, * 54477*, 89366, 39244, 150022, 72636, 817, 58866,
44683, 19622, 160992, 107937, 100079, 100136, 145272, 64544, *18159*,
45431, *36318*, 162076))
)


whereas in Step 2 I need as below:

Array(
(ID1,Array(*18159*,*36318*, *54477,...*)),
(ID3,Array(*18159*,*36318*, *54477, ...*)),
(ID2,Array(*18159*,*36318*, *54477, ...*))
)

Does this help ?

On Tue, Jul 26, 2016 at 2:25 AM, Marco Mistroni  wrote:

> Apologies janardhan, i always get confused on this
> Ok. so you have a  (key, val) RDD (val is irrelevant here)
>
> then you can do this
> val reduced = myRDD.reduceByKey((first, second) => first  ++ second)
>
> val sorted = reduced.sortBy(tpl => tpl._1)
>
> hth
>
>
>
> On Tue, Jul 26, 2016 at 3:31 AM, janardhan shetty 
> wrote:
>
>> groupBy is a shuffle operation and index is already lost in this process
>> if I am not wrong and don't see *sortWith* operation on RDD.
>>
>> Any suggestions or help ?
>>
>> On Mon, Jul 25, 2016 at 12:58 AM, Marco Mistroni 
>> wrote:
>>
>>> Hi
>>>  after you do a groupBy you should use a sortWith.
>>> Basically , a groupBy reduces your structure to (anyone correct me if i
>>> m wrong) a RDD[(key,val)], which you can see as a tuple.so you could
>>> use sortWith (or sortBy, cannot remember which one) (tpl=> tpl._1)
>>> hth
>>>
>>> On Mon, Jul 25, 2016 at 1:21 AM, janardhan shetty <
>>> janardhan...@gmail.com> wrote:
>>>
 Thanks Marco. This solved the order problem. Had another question which
 is prefix to this.

 As you can see below ID2,ID1 and ID3 are in order and I need to
 maintain this index order as well. But when we do groupByKey 
 operation(*rdd.distinct.groupByKey().mapValues(v
 => v.toArray*))
 everything is *jumbled*.
 Is there any way we can maintain this order as well ?

 scala> RDD.foreach(println)
 (ID2,18159)
 (ID1,18159)
 (ID3,18159)

 (ID2,18159)
 (ID1,18159)
 (ID3,18159)

 (ID2,36318)
 (ID1,36318)
 (ID3,36318)

 (ID2,54477)
 (ID1,54477)
 (ID3,54477)

 *Jumbled version : *
 Array(
 (ID1,Array(*18159*, 308703, 72636, 64544, 39244, 107937, *54477*,
 145272, 100079, *36318*, 160992, 817, 89366, 150022, 19622, 44683,
 58866, 162076, 45431, 100136)),
 (ID3,Array(100079, 19622, *18159*, 212064, 107937, 44683, 150022,
 39244, 100136, 58866, 72636, 145272, 817, 89366, * 54477*, *36318*,
 308703, 160992, 45431, 162076)),
 (ID2,Array(308703, * 54477*, 89366, 39244, 150022, 72636, 817, 58866,
 44683, 19622, 160992, 107937, 100079, 100136, 145272, 64544, *18159*,
 45431, *36318*, 162076))
 )

 *Expected output:*
 Array(
 (ID1,Array(*18159*,*36318*, *54477,...*)),
 (ID3,Array(*18159*,*36318*, *54477, ...*)),
 (ID2,Array(*18159*,*36318*, *54477, ...*))
 )

 As you can see after *groupbyKey* operation is complete item 18519 is
 in index 0 for ID1, index 2 for ID3 and index 16 for ID2 where as expected
 is index 0


 On Sun, Jul 24, 2016 at 12:43 PM, Marco Mistroni 
 wrote:

> Hello
>  Uhm you have an array containing 3 tuples?
> If all the arrays have same length, you can just zip all of them,
> creatings a list of tuples
> then you can scan the list 5 by 5...?
>
> so something like
>
> (Array(0)_2,Array(1)._2,Array(2)._2).zipped.toList
>
> this will give you a list of tuples of 3 elements containing each
> items from ID1, ID2 and ID3  ... sample below
> res: List((18159,100079,308703), (308703, 19622, 54477), (72636,18159,
> 89366)..)
>
> then you can use a recursive function to compare each element such as
>
> def iterate(lst:List[(Int, Int, Int)]):T = {
> if (lst.isEmpty): /// return your comparison
> else {
>  val splits = lst.splitAt(5)
>  // do sometjhing about it using splits._1
>  iterate(splits._2)
>}
>
> will this help? or am i still missing something?
>
> kr
>
>
>
>
>
>
>
>
>
>
>
>
> On 24 Jul 2016 5:52 pm, "janardhan shetty" 
> wrote:
>
>> Array(
>> (ID1,Array(18159, 308703, 72636, 64544, 39244, 107937, 54477, 145272,
>> 100079, 36318, 160992, 817, 89366, 150022, 19622, 44683, 58866, 16207

Re: how to use spark.mesos.constraints

2016-07-26 Thread Jia Yu
Hi,

I am also trying to use the spark.mesos.constraints but it gives me the
same error: job has not be accepted by any resources.

I am doubting that I should start some additional service like
./sbin/start-mesos-shuffle-service.sh. Am I correct?

Thanks,
Jia

On Tue, Dec 1, 2015 at 5:14 PM, rarediel 
wrote:

> I am trying to add mesos constraints to my spark-submit command in my
> marathon file I am setting it to spark.mesos.coarse=true.
>
> Here is an example of a constraint I am trying to set.
>
>  --conf spark.mesos.constraint=cpus:2
>
> I want to use the constraints to control the amount of executors are
> created
> so I can control the total memory of my spark job.
>
> I've tried many variations of resource constraints, but no matter which
> resource or what number, range, etc. I do I always get the error "Initial
> job has not accepted any resources; check your cluster UI...".  My cluster
> has the available resources.  Is there any examples I can look at where
> people use resource constraints?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/how-to-use-spark-mesos-constraints-tp25541.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: ORC v/s Parquet for Spark 2.0

2016-07-26 Thread Koert Kuipers
i dont think so, but that sounds like a good idea

On Tue, Jul 26, 2016 at 6:19 PM, Sudhir Babu Pothineni <
sbpothin...@gmail.com> wrote:

> Just correction:
>
> ORC Java libraries from Hive are forked into Apache ORC. Vectorization
> default.
>
> Do not know If Spark leveraging this new repo?
>
> 
>  org.apache.orc
> orc
> 1.1.2
> pom
> 
>
>
>
>
>
>
>
>
> Sent from my iPhone
> On Jul 26, 2016, at 4:50 PM, Koert Kuipers  wrote:
>
> parquet was inspired by dremel but written from the ground up as a library
> with support for a variety of big data systems (hive, pig, impala,
> cascading, etc.). it is also easy to add new support, since its a proper
> library.
>
> orc bas been enhanced while deployed at facebook in hive and at yahoo in
> hive. just hive. it didn't really exist by itself. it was part of the big
> java soup that is called hive, without an easy way to extract it. hive does
> not expose proper java apis. it never cared for that.
>
> On Tue, Jul 26, 2016 at 9:57 AM, Ovidiu-Cristian MARCU <
> ovidiu-cristian.ma...@inria.fr> wrote:
>
>> Interesting opinion, thank you
>>
>> Still, on the website parquet is basically inspired by Dremel (Google)
>> [1] and part of orc has been enhanced while deployed for Facebook, Yahoo
>> [2].
>>
>> Other than this presentation [3], do you guys know any other benchmark?
>>
>> [1]https://parquet.apache.org/documentation/latest/
>> [2]https://orc.apache.org/docs/
>> [3]
>> http://www.slideshare.net/oom65/file-format-benchmarks-avro-json-orc-parquet
>>
>> On 26 Jul 2016, at 15:19, Koert Kuipers  wrote:
>>
>> when parquet came out it was developed by a community of companies, and
>> was designed as a library to be supported by multiple big data projects.
>> nice
>>
>> orc on the other hand initially only supported hive. it wasn't even
>> designed as a library that can be re-used. even today it brings in the
>> kitchen sink of transitive dependencies. yikes
>>
>> On Jul 26, 2016 5:09 AM, "Jörn Franke"  wrote:
>>
>>> I think both are very similar, but with slightly different goals. While
>>> they work transparently for each Hadoop application you need to enable
>>> specific support in the application for predicate push down.
>>> In the end you have to check which application you are using and do some
>>> tests (with correct predicate push down configuration). Keep in mind that
>>> both formats work best if they are sorted on filter columns (which is your
>>> responsibility) and if their optimatizations are correctly configured (min
>>> max index, bloom filter, compression etc) .
>>>
>>> If you need to ingest sensor data you may want to store it first in
>>> hbase and then batch process it in large files in Orc or parquet format.
>>>
>>> On 26 Jul 2016, at 04:09, janardhan shetty 
>>> wrote:
>>>
>>> Just wondering advantages and disadvantages to convert data into ORC or
>>> Parquet.
>>>
>>> In the documentation of Spark there are numerous examples of Parquet
>>> format.
>>>
>>> Any strong reasons to chose Parquet over ORC file format ?
>>>
>>> Also : current data compression is bzip2
>>>
>>>
>>> http://stackoverflow.com/questions/32373460/parquet-vs-orc-vs-orc-with-snappy
>>> This seems like biased.
>>>
>>>
>>
>


Re: dynamic coalesce to pick file size

2016-07-26 Thread Pedro Rodriguez
I asked something similar if you search for "Tools for Balancing Partitions
By Size" (I couldn't find link on archives). Unfortunately there doesn't
seem to be something good right now other than knowing your job statistics.
I am planning on implementing the idea I explained in the last paragraph or
so of the last email I sent in this library
https://github.com/EntilZha/spark-s3 although it could be a while to make
my way up to data frames (adds for now).

On Tue, Jul 26, 2016 at 1:02 PM, Maurin Lenglart 
wrote:

> Hi,
>
> I am doing a Sql query that return a Dataframe. Then I am writing the
> result of the query using “df.write”, but the result get written in a lot
> of different small files (~100 of 200 ko). So now I am doing a
> “.coalesce(2)” before the write.
>
> But the number “2” that I picked is static, is there have a way of
> dynamically picking the number depending of the file size wanted? (around
> 256mb would be perfect)
>
>
>
> I am running spark 1.6 on CDH using yarn, the files are written in parquet
> format.
>
>
>
> Thanks
>
>
>



-- 
Pedro Rodriguez
PhD Student in Distributed Machine Learning | CU Boulder
UC Berkeley AMPLab Alumni

ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423
Github: github.com/EntilZha | LinkedIn:
https://www.linkedin.com/in/pedrorodriguezscience


Re: ORC v/s Parquet for Spark 2.0

2016-07-26 Thread Sudhir Babu Pothineni
Just correction:

ORC Java libraries from Hive are forked into Apache ORC. Vectorization default. 

Do not know If Spark leveraging this new repo?


 org.apache.orc
orc
1.1.2
pom









Sent from my iPhone
> On Jul 26, 2016, at 4:50 PM, Koert Kuipers  wrote:
> 
> parquet was inspired by dremel but written from the ground up as a library 
> with support for a variety of big data systems (hive, pig, impala, cascading, 
> etc.). it is also easy to add new support, since its a proper library.
> 
> orc bas been enhanced while deployed at facebook in hive and at yahoo in 
> hive. just hive. it didn't really exist by itself. it was part of the big 
> java soup that is called hive, without an easy way to extract it. hive does 
> not expose proper java apis. it never cared for that.
> 
>> On Tue, Jul 26, 2016 at 9:57 AM, Ovidiu-Cristian MARCU 
>>  wrote:
>> Interesting opinion, thank you
>> 
>> Still, on the website parquet is basically inspired by Dremel (Google) [1] 
>> and part of orc has been enhanced while deployed for Facebook, Yahoo [2].
>> 
>> Other than this presentation [3], do you guys know any other benchmark?
>> 
>> [1]https://parquet.apache.org/documentation/latest/
>> [2]https://orc.apache.org/docs/
>> [3] 
>> http://www.slideshare.net/oom65/file-format-benchmarks-avro-json-orc-parquet
>> 
>>> On 26 Jul 2016, at 15:19, Koert Kuipers  wrote:
>>> 
>>> when parquet came out it was developed by a community of companies, and was 
>>> designed as a library to be supported by multiple big data projects. nice
>>> 
>>> orc on the other hand initially only supported hive. it wasn't even 
>>> designed as a library that can be re-used. even today it brings in the 
>>> kitchen sink of transitive dependencies. yikes
>>> 
>>> 
 On Jul 26, 2016 5:09 AM, "Jörn Franke"  wrote:
 I think both are very similar, but with slightly different goals. While 
 they work transparently for each Hadoop application you need to enable 
 specific support in the application for predicate push down. 
 In the end you have to check which application you are using and do some 
 tests (with correct predicate push down configuration). Keep in mind that 
 both formats work best if they are sorted on filter columns (which is your 
 responsibility) and if their optimatizations are correctly configured (min 
 max index, bloom filter, compression etc) . 
 
 If you need to ingest sensor data you may want to store it first in hbase 
 and then batch process it in large files in Orc or parquet format.
 
> On 26 Jul 2016, at 04:09, janardhan shetty  wrote:
> 
> Just wondering advantages and disadvantages to convert data into ORC or 
> Parquet. 
> 
> In the documentation of Spark there are numerous examples of Parquet 
> format. 
> 
> Any strong reasons to chose Parquet over ORC file format ?
> 
> Also : current data compression is bzip2
> 
> http://stackoverflow.com/questions/32373460/parquet-vs-orc-vs-orc-with-snappy
>  
> This seems like biased.
> 


Re: ORC v/s Parquet for Spark 2.0

2016-07-26 Thread Koert Kuipers
parquet was inspired by dremel but written from the ground up as a library
with support for a variety of big data systems (hive, pig, impala,
cascading, etc.). it is also easy to add new support, since its a proper
library.

orc bas been enhanced while deployed at facebook in hive and at yahoo in
hive. just hive. it didn't really exist by itself. it was part of the big
java soup that is called hive, without an easy way to extract it. hive does
not expose proper java apis. it never cared for that.

On Tue, Jul 26, 2016 at 9:57 AM, Ovidiu-Cristian MARCU <
ovidiu-cristian.ma...@inria.fr> wrote:

> Interesting opinion, thank you
>
> Still, on the website parquet is basically inspired by Dremel (Google) [1]
> and part of orc has been enhanced while deployed for Facebook, Yahoo [2].
>
> Other than this presentation [3], do you guys know any other benchmark?
>
> [1]https://parquet.apache.org/documentation/latest/
> [2]https://orc.apache.org/docs/
> [3]
> http://www.slideshare.net/oom65/file-format-benchmarks-avro-json-orc-parquet
>
> On 26 Jul 2016, at 15:19, Koert Kuipers  wrote:
>
> when parquet came out it was developed by a community of companies, and
> was designed as a library to be supported by multiple big data projects.
> nice
>
> orc on the other hand initially only supported hive. it wasn't even
> designed as a library that can be re-used. even today it brings in the
> kitchen sink of transitive dependencies. yikes
>
> On Jul 26, 2016 5:09 AM, "Jörn Franke"  wrote:
>
>> I think both are very similar, but with slightly different goals. While
>> they work transparently for each Hadoop application you need to enable
>> specific support in the application for predicate push down.
>> In the end you have to check which application you are using and do some
>> tests (with correct predicate push down configuration). Keep in mind that
>> both formats work best if they are sorted on filter columns (which is your
>> responsibility) and if their optimatizations are correctly configured (min
>> max index, bloom filter, compression etc) .
>>
>> If you need to ingest sensor data you may want to store it first in hbase
>> and then batch process it in large files in Orc or parquet format.
>>
>> On 26 Jul 2016, at 04:09, janardhan shetty 
>> wrote:
>>
>> Just wondering advantages and disadvantages to convert data into ORC or
>> Parquet.
>>
>> In the documentation of Spark there are numerous examples of Parquet
>> format.
>>
>> Any strong reasons to chose Parquet over ORC file format ?
>>
>> Also : current data compression is bzip2
>>
>>
>> http://stackoverflow.com/questions/32373460/parquet-vs-orc-vs-orc-with-snappy
>> This seems like biased.
>>
>>
>


Re: libraryDependencies

2016-07-26 Thread Michael Armbrust
libraryDependencies  ++= Seq(
  // other dependencies here
  "org.apache.spark" %% "spark-core" % "1.6.2" % "provided",
  "org.apache.spark" %% "spark-mllib" % "1.6.2" % "provided",
  "org.scalanlp" %% "breeze" % "0.12",
  // native libraries are not included by default. add this if
you want them (as of 0.7)
  // native libraries greatly improve performance, but increase
jar sizes.
  "org.scalanlp" %% "breeze-natives" % "0.12",
)

On Tue, Jul 26, 2016 at 12:49 PM, Martin Somers  wrote:

> cheers - I updated
>
> libraryDependencies  ++= Seq(
>   // other dependencies here
>   "org.apache.spark" %% "spark-core" % "1.6.2" % "provided",
>   "org.apache.spark" %% "spark-mllib_2.10" % "1.6.2",
>   "org.scalanlp" %% "breeze" % "0.12",
>   // native libraries are not included by default. add this if
> you want them (as of 0.7)
>   // native libraries greatly improve performance, but
> increase jar sizes.
>   "org.scalanlp" %% "breeze-natives" % "0.12",
> )
>
> and getting similar error
>
> Compiling 1 Scala source to
> /Users/studio/.sbt/0.13/staging/42f93875138543b4e1d3/sparksample/target/scala-2.11/classes...
> [error]
> /Users/studio/.sbt/0.13/staging/42f93875138543b4e1d3/sparksample/src/main/scala/MyApp.scala:2:
> object mllib is not a member of package org.apache.spark
> [error] import org.apache.spark.mllib.linalg.distributed.RowMatrix
> [error] ^
> [error]
> /Users/studio/.sbt/0.13/staging/42f93875138543b4e1d3/sparksample/src/main/scala/MyApp.scala:3:
> object mllib is not a member of package org.apache.spark
> [error] import org.apache.spark.mllib.linalg.SingularValueDecomposition
> [error] ^
> [error]
> /Users/studio/.sbt/0.13/staging/42f93875138543b4e1d3/sparksample/src/main/scala/MyApp.scala:5:
> object mllib is not a member of package org.apache.spark
> [error] import org.apache.spark.mllib.linalg.{Vector, Vectors}
> [error] ^
> [error]
> /Users/studio/.sbt/0.13/staging/42f93875138543b4e1d3/sparksample/src/main/scala/MyApp.scala:8:
> not found: object breeze
>
> On Tue, Jul 26, 2016 at 8:36 PM, Michael Armbrust 
> wrote:
>
>> Also, you'll want all of the various spark versions to be the same.
>>
>> On Tue, Jul 26, 2016 at 12:34 PM, Michael Armbrust <
>> mich...@databricks.com> wrote:
>>
>>> If you are using %% (double) then you do not need _2.11.
>>>
>>> On Tue, Jul 26, 2016 at 12:18 PM, Martin Somers 
>>> wrote:
>>>

 my build file looks like

 libraryDependencies  ++= Seq(
   // other dependencies here
   "org.apache.spark" %% "spark-core" % "1.6.2" % "provided",
   "org.apache.spark" %% "spark-mllib_2.11" % "1.6.0",
   "org.scalanlp" % "breeze_2.11" % "0.7",
   // native libraries are not included by default. add this
 if you want them (as of 0.7)
   // native libraries greatly improve performance, but
 increase jar sizes.
   "org.scalanlp" % "breeze-natives_2.11" % "0.7",
 )

 not 100% sure on the version numbers if they are indeed correct
 getting an error of

 [info] Resolving jline#jline;2.12.1 ...
 [info] Done updating.
 [info] Compiling 1 Scala source to
 /Users/studio/.sbt/0.13/staging/42f93875138543b4e1d3/sparksample/target/scala-2.11/classes...
 [error]
 /Users/studio/.sbt/0.13/staging/42f93875138543b4e1d3/sparksample/src/main/scala/MyApp.scala:2:
 object mllib is not a member of package org.apache.spark
 [error] import org.apache.spark.mllib.linalg.distributed.RowMatrix
 
 ...


 Im trying to import in

 import org.apache.spark.mllib.linalg.distributed.RowMatrix
 import org.apache.spark.mllib.linalg.SingularValueDecomposition

 import org.apache.spark.mllib.linalg.{Vector, Vectors}


 import breeze.linalg._
 import breeze.linalg.{ Matrix => B_Matrix }
 import breeze.linalg.{ Vector => B_Matrix }
 import breeze.linalg.DenseMatrix

 object MyApp {
   def main(args: Array[String]): Unit = {
 //code here
 }


 It might not be the correct way of doing this

 Anyone got any suggestion
 tks
 M




>>>
>>
>
>
> --
> M
>


Re: libraryDependencies

2016-07-26 Thread Martin Somers
cheers - I updated

libraryDependencies  ++= Seq(
  // other dependencies here
  "org.apache.spark" %% "spark-core" % "1.6.2" % "provided",
  "org.apache.spark" %% "spark-mllib_2.10" % "1.6.2",
  "org.scalanlp" %% "breeze" % "0.12",
  // native libraries are not included by default. add this if
you want them (as of 0.7)
  // native libraries greatly improve performance, but increase
jar sizes.
  "org.scalanlp" %% "breeze-natives" % "0.12",
)

and getting similar error

Compiling 1 Scala source to
/Users/studio/.sbt/0.13/staging/42f93875138543b4e1d3/sparksample/target/scala-2.11/classes...
[error]
/Users/studio/.sbt/0.13/staging/42f93875138543b4e1d3/sparksample/src/main/scala/MyApp.scala:2:
object mllib is not a member of package org.apache.spark
[error] import org.apache.spark.mllib.linalg.distributed.RowMatrix
[error] ^
[error]
/Users/studio/.sbt/0.13/staging/42f93875138543b4e1d3/sparksample/src/main/scala/MyApp.scala:3:
object mllib is not a member of package org.apache.spark
[error] import org.apache.spark.mllib.linalg.SingularValueDecomposition
[error] ^
[error]
/Users/studio/.sbt/0.13/staging/42f93875138543b4e1d3/sparksample/src/main/scala/MyApp.scala:5:
object mllib is not a member of package org.apache.spark
[error] import org.apache.spark.mllib.linalg.{Vector, Vectors}
[error] ^
[error]
/Users/studio/.sbt/0.13/staging/42f93875138543b4e1d3/sparksample/src/main/scala/MyApp.scala:8:
not found: object breeze

On Tue, Jul 26, 2016 at 8:36 PM, Michael Armbrust 
wrote:

> Also, you'll want all of the various spark versions to be the same.
>
> On Tue, Jul 26, 2016 at 12:34 PM, Michael Armbrust  > wrote:
>
>> If you are using %% (double) then you do not need _2.11.
>>
>> On Tue, Jul 26, 2016 at 12:18 PM, Martin Somers 
>> wrote:
>>
>>>
>>> my build file looks like
>>>
>>> libraryDependencies  ++= Seq(
>>>   // other dependencies here
>>>   "org.apache.spark" %% "spark-core" % "1.6.2" % "provided",
>>>   "org.apache.spark" %% "spark-mllib_2.11" % "1.6.0",
>>>   "org.scalanlp" % "breeze_2.11" % "0.7",
>>>   // native libraries are not included by default. add this
>>> if you want them (as of 0.7)
>>>   // native libraries greatly improve performance, but
>>> increase jar sizes.
>>>   "org.scalanlp" % "breeze-natives_2.11" % "0.7",
>>> )
>>>
>>> not 100% sure on the version numbers if they are indeed correct
>>> getting an error of
>>>
>>> [info] Resolving jline#jline;2.12.1 ...
>>> [info] Done updating.
>>> [info] Compiling 1 Scala source to
>>> /Users/studio/.sbt/0.13/staging/42f93875138543b4e1d3/sparksample/target/scala-2.11/classes...
>>> [error]
>>> /Users/studio/.sbt/0.13/staging/42f93875138543b4e1d3/sparksample/src/main/scala/MyApp.scala:2:
>>> object mllib is not a member of package org.apache.spark
>>> [error] import org.apache.spark.mllib.linalg.distributed.RowMatrix
>>> 
>>> ...
>>>
>>>
>>> Im trying to import in
>>>
>>> import org.apache.spark.mllib.linalg.distributed.RowMatrix
>>> import org.apache.spark.mllib.linalg.SingularValueDecomposition
>>>
>>> import org.apache.spark.mllib.linalg.{Vector, Vectors}
>>>
>>>
>>> import breeze.linalg._
>>> import breeze.linalg.{ Matrix => B_Matrix }
>>> import breeze.linalg.{ Vector => B_Matrix }
>>> import breeze.linalg.DenseMatrix
>>>
>>> object MyApp {
>>>   def main(args: Array[String]): Unit = {
>>> //code here
>>> }
>>>
>>>
>>> It might not be the correct way of doing this
>>>
>>> Anyone got any suggestion
>>> tks
>>> M
>>>
>>>
>>>
>>>
>>
>


-- 
M


Re: libraryDependencies

2016-07-26 Thread Michael Armbrust
Also, you'll want all of the various spark versions to be the same.

On Tue, Jul 26, 2016 at 12:34 PM, Michael Armbrust 
wrote:

> If you are using %% (double) then you do not need _2.11.
>
> On Tue, Jul 26, 2016 at 12:18 PM, Martin Somers  wrote:
>
>>
>> my build file looks like
>>
>> libraryDependencies  ++= Seq(
>>   // other dependencies here
>>   "org.apache.spark" %% "spark-core" % "1.6.2" % "provided",
>>   "org.apache.spark" %% "spark-mllib_2.11" % "1.6.0",
>>   "org.scalanlp" % "breeze_2.11" % "0.7",
>>   // native libraries are not included by default. add this
>> if you want them (as of 0.7)
>>   // native libraries greatly improve performance, but
>> increase jar sizes.
>>   "org.scalanlp" % "breeze-natives_2.11" % "0.7",
>> )
>>
>> not 100% sure on the version numbers if they are indeed correct
>> getting an error of
>>
>> [info] Resolving jline#jline;2.12.1 ...
>> [info] Done updating.
>> [info] Compiling 1 Scala source to
>> /Users/studio/.sbt/0.13/staging/42f93875138543b4e1d3/sparksample/target/scala-2.11/classes...
>> [error]
>> /Users/studio/.sbt/0.13/staging/42f93875138543b4e1d3/sparksample/src/main/scala/MyApp.scala:2:
>> object mllib is not a member of package org.apache.spark
>> [error] import org.apache.spark.mllib.linalg.distributed.RowMatrix
>> 
>> ...
>>
>>
>> Im trying to import in
>>
>> import org.apache.spark.mllib.linalg.distributed.RowMatrix
>> import org.apache.spark.mllib.linalg.SingularValueDecomposition
>>
>> import org.apache.spark.mllib.linalg.{Vector, Vectors}
>>
>>
>> import breeze.linalg._
>> import breeze.linalg.{ Matrix => B_Matrix }
>> import breeze.linalg.{ Vector => B_Matrix }
>> import breeze.linalg.DenseMatrix
>>
>> object MyApp {
>>   def main(args: Array[String]): Unit = {
>> //code here
>> }
>>
>>
>> It might not be the correct way of doing this
>>
>> Anyone got any suggestion
>> tks
>> M
>>
>>
>>
>>
>


Re: libraryDependencies

2016-07-26 Thread Michael Armbrust
If you are using %% (double) then you do not need _2.11.

On Tue, Jul 26, 2016 at 12:18 PM, Martin Somers  wrote:

>
> my build file looks like
>
> libraryDependencies  ++= Seq(
>   // other dependencies here
>   "org.apache.spark" %% "spark-core" % "1.6.2" % "provided",
>   "org.apache.spark" %% "spark-mllib_2.11" % "1.6.0",
>   "org.scalanlp" % "breeze_2.11" % "0.7",
>   // native libraries are not included by default. add this if
> you want them (as of 0.7)
>   // native libraries greatly improve performance, but
> increase jar sizes.
>   "org.scalanlp" % "breeze-natives_2.11" % "0.7",
> )
>
> not 100% sure on the version numbers if they are indeed correct
> getting an error of
>
> [info] Resolving jline#jline;2.12.1 ...
> [info] Done updating.
> [info] Compiling 1 Scala source to
> /Users/studio/.sbt/0.13/staging/42f93875138543b4e1d3/sparksample/target/scala-2.11/classes...
> [error]
> /Users/studio/.sbt/0.13/staging/42f93875138543b4e1d3/sparksample/src/main/scala/MyApp.scala:2:
> object mllib is not a member of package org.apache.spark
> [error] import org.apache.spark.mllib.linalg.distributed.RowMatrix
> 
> ...
>
>
> Im trying to import in
>
> import org.apache.spark.mllib.linalg.distributed.RowMatrix
> import org.apache.spark.mllib.linalg.SingularValueDecomposition
>
> import org.apache.spark.mllib.linalg.{Vector, Vectors}
>
>
> import breeze.linalg._
> import breeze.linalg.{ Matrix => B_Matrix }
> import breeze.linalg.{ Vector => B_Matrix }
> import breeze.linalg.DenseMatrix
>
> object MyApp {
>   def main(args: Array[String]): Unit = {
> //code here
> }
>
>
> It might not be the correct way of doing this
>
> Anyone got any suggestion
> tks
> M
>
>
>
>


libraryDependencies

2016-07-26 Thread Martin Somers
my build file looks like

libraryDependencies  ++= Seq(
  // other dependencies here
  "org.apache.spark" %% "spark-core" % "1.6.2" % "provided",
  "org.apache.spark" %% "spark-mllib_2.11" % "1.6.0",
  "org.scalanlp" % "breeze_2.11" % "0.7",
  // native libraries are not included by default. add this if
you want them (as of 0.7)
  // native libraries greatly improve performance, but increase
jar sizes.
  "org.scalanlp" % "breeze-natives_2.11" % "0.7",
)

not 100% sure on the version numbers if they are indeed correct
getting an error of

[info] Resolving jline#jline;2.12.1 ...
[info] Done updating.
[info] Compiling 1 Scala source to
/Users/studio/.sbt/0.13/staging/42f93875138543b4e1d3/sparksample/target/scala-2.11/classes...
[error]
/Users/studio/.sbt/0.13/staging/42f93875138543b4e1d3/sparksample/src/main/scala/MyApp.scala:2:
object mllib is not a member of package org.apache.spark
[error] import org.apache.spark.mllib.linalg.distributed.RowMatrix

...


Im trying to import in

import org.apache.spark.mllib.linalg.distributed.RowMatrix
import org.apache.spark.mllib.linalg.SingularValueDecomposition

import org.apache.spark.mllib.linalg.{Vector, Vectors}


import breeze.linalg._
import breeze.linalg.{ Matrix => B_Matrix }
import breeze.linalg.{ Vector => B_Matrix }
import breeze.linalg.DenseMatrix

object MyApp {
  def main(args: Array[String]): Unit = {
//code here
}


It might not be the correct way of doing this

Anyone got any suggestion
tks
M


Re: read only specific jsons

2016-07-26 Thread Cody Koeninger
Have you tried filtering out corrupt records with something along the lines of

 df.filter(df("_corrupt_record").isNull)

On Tue, Jul 26, 2016 at 1:53 PM, vr spark  wrote:
> i am reading data from kafka using spark streaming.
>
> I am reading json and creating dataframe.
> I am using pyspark
>
> kvs = KafkaUtils.createDirectStream(ssc, kafkaTopic1, kafkaParams)
>
> lines = kvs.map(lambda x: x[1])
>
> lines.foreachRDD(mReport)
>
> def mReport(clickRDD):
>
>clickDF = sqlContext.jsonRDD(clickRDD)
>
>clickDF.registerTempTable("clickstream")
>
>PagesDF = sqlContext.sql(
>
> "SELECT   request.clientIP as ip "
>
> "FROM clickstream "
>
> "WHERE request.clientIP is not null "
>
> " limit 2000 "
>
>
> The problem is that not all the jsons from the stream have the same format.
>
> It works when it reads a json which has ip.
>
> Some of the json strings do not have client ip in their schema.
>
> So i am getting error and my job is failing when it encounters such a json.
>
> How do read only those json which has ip in their schema?
>
> Please suggest.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



dynamic coalesce to pick file size

2016-07-26 Thread Maurin Lenglart
Hi,
I am doing a Sql query that return a Dataframe. Then I am writing the result of 
the query using “df.write”, but the result get written in a lot of different 
small files (~100 of 200 ko). So now I am doing a “.coalesce(2)” before the 
write.
But the number “2” that I picked is static, is there have a way of dynamically 
picking the number depending of the file size wanted? (around 256mb would be 
perfect)

I am running spark 1.6 on CDH using yarn, the files are written in parquet 
format.

Thanks



read only specific jsons

2016-07-26 Thread vr spark
i am reading data from kafka using spark streaming.

I am reading json and creating dataframe.
I am using pyspark

kvs = KafkaUtils.createDirectStream(ssc, kafkaTopic1, kafkaParams)

lines = kvs.map(lambda x: x[1])

lines.foreachRDD(mReport)

def mReport(clickRDD):

   clickDF = sqlContext.jsonRDD(clickRDD)

   clickDF.registerTempTable("clickstream")

   PagesDF = sqlContext.sql(

"SELECT   request.clientIP as ip "

"FROM clickstream "

"WHERE request.clientIP is not null "

" limit 2000 "


The problem is that not all the jsons from the stream have the same format.

It works when it reads a json which has ip.

Some of the json strings do not have client ip in their schema.

So i am getting error and my job is failing when it encounters such a json.

How do read only those json which has ip in their schema?

Please suggest.


read only specific jsons

2016-07-26 Thread vr spark
i am reading data from kafka using spark streaming.

I am reading json and creating dataframe.

kvs = KafkaUtils.createDirectStream(ssc, kafkaTopic1, kafkaParams)

lines = kvs.map(lambda x: x[1])

lines.foreachRDD(mReport)

def mReport(clickRDD):

clickDF = sqlContext.jsonRDD(clickRDD)

clickDF.registerTempTable("clickstream")

PagesDF = sqlContext.sql(

"SELECT   request.clientIP as ip "

"FROM clickstream "

"WHERE request.clientIP is not null "

" limit 2000 "


The problem is that not all the jsons from the stream have the same format.

It works when it reads a json which has ip.

Some of the json strings do not have client ip in their schema.

So i am getting error and my job is failing when it encounters such a json.

How do read only those json which has ip in their schema?

Please suggest.


File System closed while submitting job in spark

2016-07-26 Thread KhajaAsmath Mohammed
Hi,

My spark job is failing while pulling the properties file from hdfs.Same
code is running fine when i am running in windows but not able to run when
testing it on yarn.

*spark submit script: spark-submit --class
com.mcd.sparksql.datahub.DataMarts --master  local[*] gdwspark.jar
 hdfs://10.1.144.79:8020/daas/spark/Daas.properties
*

*Exception:*


*Code: Place where the code is failing in yarn.*

16/07/26 18:48:11 INFO InputStream$: File
SystemDFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_1681311895_1,
ugi=hadoop (auth:SIMPLE)]]
16/07/26 18:48:11 INFO InputStream$: File Path --> hdfs://
10.1.144.79:8020/daas/spark/Daas.properties
Exception in thread "main" java.io.IOException: Filesystem closed
at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:808)
at
org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:868)
at
org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:934)
at java.io.DataInputStream.read(DataInputStream.java:100)
at java.util.Properties$LineReader.readLine(Properties.java:434)
at java.util.Properties.load0(Properties.java:353)
at java.util.Properties.load(Properties.java:341)
at com.mcd.sparksql.util.DaasUtil$.get(DaasUtil.scala:44)
at com.mcd.sparksql.datahub.DataMarts$.main(DataMarts.scala:25)
at com.mcd.sparksql.datahub.DataMarts.main(DataMarts.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
at
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
at
org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

*Code:*

 logger.info("File System"+fs)
try {
  val p = new Path(fname)
  logger.info("File Path --> "+p)
  //var br= new BufferedReader(new FileReader(fname))
*  fs.open(p)*


any ideas on how to resolve this.

Thanks,
Asmath.


getting more concurrency best practices

2016-07-26 Thread Andy Davidson
Bellow is a very simple application. It runs very slowly. It does not look
like I am getting a lot of parallel execution. I image this is a very common
work flow. Periodically I want to runs some standard summary statistics
across several different data sets.

Any suggestions would be greatly appreciated.

Andy

Overview 
All the sets use the same data format. The data is twitter tweet stored in
JSON. The JSON is very complicated. Each record could be as large as 4k. The
data is collected using spark streaming. Every mini batch is stored in S3 as
separate object. E.G. s3n://buckName/date/timestampMS/parts*. I only select
one col. From the data frame. The column is “top” level key in the JSON
structure

The program is simple

For each data set
1. Find all the part files
2. Load them into a data frame
3. Calculate the summary stat and print
4. Free memory
In my example bellow the data sets are not very big.

# fullPath is list of part files.
sqlContext.read.format('json').load(fullPath).select("body") #.cache()


1
%%timeit -n 1 -r 1
2
# %timeit # line magic
3
# %%timeit # cell magic
4
# -n 1 -r 1 # run cell once
5
​
6
for prefix in districtDataSets:
7
dataSet = [name for name in constituentDataSets if
name.startswith(prefix)]
8
# print(dataSets)
9
# would be nice if we could have this loop run in parallel
10
constituentDFS = getDataFrames(dataSet) # returns a dictionary
11
# we could union but would probably be slower
12
total = 0
13
for name in constituentDFS:
14
c = constituentDFS[name].count();
15
total = total + c;
16
print("{} {:15,}".format(prefix, total))
17
# free memory
18
del constituentDFS
19

ne-2 110169
fl-8 12
mi-1 2552
ny-19 27657
ny-24 59739
pa-8 42867
wi-8 7352
ny-3 51136
ny-1 105296
ny-22 5671287
mn-2 34834
tx-8 5246
il-6 12772
co-6 24700
1 loop, best of 1: 2h 41min 8s per loop
Environment

I am using spark-1.6.1

My app is using
10 cores, 
6GB per node
5 executors
1 driver

Each executor has at most 2 active tasks


Over all the resources do not seem to be utilized well. I do not think
adding machines would improve performance.

I launch the notebook server as follows

#

# allow notebooks to use all avalible resources

# 

export PYSPARK_PYTHON=python3.4

export PYSPARK_DRIVER_PYTHON=python3.4

export IPYTHON_OPTS="notebook --no-browser --port=7000 --log-level=WARN"

$SPARK_ROOT/bin/pyspark \

--master $MASTER_URL \

--driver-memory 2G \

--executor-memory 6G \

$extraPkgs \

$*



All my data is being read from s3
- Is there an easy way to figure out how much time I am spending reading?
- I am guessing S3 is really slow. I have lot of objects to read.
- I image copying the data to HDFS would run faster how ever I have not
found an easy way to copy the data. I am using ec2. Looks like I would have
to copy from s3 to a file partition in my cluster and then copy to HDFS



Looking at the stages It does not look like shuffle is a major problem







Is RowMatrix missing in org.apache.spark.ml package?

2016-07-26 Thread Rohit Chaddha
It is present in mlib but I don't seem to find it in ml package.
Any suggestions please ?

-Rohit


Re: Spark Web UI port 4040 not working

2016-07-26 Thread Andy Davidson
Yup in cluster mode you need to figure out what machine the driver is
running on. That is the machine the UI will run on

https://issues.apache.org/jira/browse/SPARK-15829

You may also have fire wall issues

Here are some notes I made about how to figure out what machine the driver
is running on when using cluster mode

Application UI Bug work around
If our application is running in cluster mode the Master Console UI will
incorrectly assume the Application UI¹s are running on the master machine.
The UI is actually running on the same machine as the driver
To find the URL to the apps running in cluster mode
1. find the worker id for the app you are interested in.
* on the master console UI go to the Running App¹s section
* you should see a column ŒWorker¹
* a worker is something like worker-20160322041632-172.31.23.201-34909
2. got to the AWS ec2 console
3. from the private ip find the public DNS name
* the private ip in our example is 172.31.23.201

From:  Jacek Laskowski 
Date:  Tuesday, July 26, 2016 at 6:38 AM
To:  Jestin Ma 
Cc:  Chanh Le , "user @spark" 
Subject:  Re: Spark Web UI port 4040 not working

> Hi,
> 
> Do you perhaps deploy using cluster mode? Is this EC2? You'd need to
> figure out where the driver runs and use the machine's IP.
> 
> Pozdrawiam,
> Jacek Laskowski
> 
> https://medium.com/@jaceklaskowski/
> Mastering Apache Spark http://bit.ly/mastering-apache-spark
> Follow me at https://twitter.com/jaceklaskowski
> 
> 
> On Tue, Jul 26, 2016 at 3:36 PM, Jestin Ma  wrote:
>>  I tried doing that on my master node.
>>  I got nothing.
>>  However, I grep'd port 8080 and I got the standalone UI.
>> 
>>  On Tue, Jul 26, 2016 at 12:39 AM, Chanh Le  wrote:
>>> 
>>>  You¹re running in StandAlone Mode?
>>>  Usually inside active task it will show the address of current job.
>>>  or you can check in master node by using netstat -apn | grep 4040
>>> 
>>> 
>>> 
  > On Jul 26, 2016, at 8:21 AM, Jestin Ma 
  > wrote:
  >
  > Hello, when running spark jobs, I can access the master UI (port 8080
  > one) no problem. However, I'm confused as to how to access the web UI to
 see
  > jobs/tasks/stages/etc.
  >
  > I can access the master UI at http://:8080. But port 4040
  > gives me a -connection cannot be reached-.
  >
  > Is the web UI http:// with a port of 4040?
  >
  > I'm running my Spark job on a cluster machine and submitting it to a
  > master node part of the cluster. I heard of ssh tunneling; is that
 relevant
  > here?
  >
  > Thank you!
>>> 
>> 
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 
> 




Re: Upgrade from 1.2 to 1.6 - parsing flat files in working directory

2016-07-26 Thread Sumona Routh
Can anyone provide some guidance on how to get files on the classpath for
our Spark job? This used to work in 1.2, however after upgrading we are
getting nulls when attempting to load resources.

Thanks,
Sumona

On Thu, Jul 21, 2016 at 4:43 PM Sumona Routh  wrote:

> Hi all,
> We are running into a classpath issue when we upgrade our application from
> 1.2 to 1.6.
>
> In 1.2, we load properties from a flat file (from working directory of the
> spark-submit script) using classloader resource approach. This was executed
> up front (by the driver) before any processing happened.
>
>  val confStream =
> Thread.currentThread().getContextClassLoader().getResourceAsStream(appConfigPath)
>
> confProperties.load(confStream)
>
> In 1.6, the line getResourceAsStream returns a null and thus causes a
> subsequent NullPointerException when loading the properties.
>
> How do we pass flat files (there are many, so we really want to add a
> directory to the classpath) in Spark 1.6? We haven't had much luck with
> --files and --driver-class-path and spark.driver.extraClasspath. We also
> couldn't find much documentation on this.
>
> Thanks!
> Sumona
>


Re: spark sql aggregate function "Nth"

2016-07-26 Thread Alex Nastetsky
Ah, that gives me an idea.

val window = Window.partitionBy()
val getRand = udf((cnt:Int) =>  )

df
.withColumn("cnt", count().over(window))
.withColumn("rnd", getRand($"cnt"))
.where($"rnd" === $"cnt")

Not sure how performant this would be, but writing a UDF is much simpler
than a UDAF.

On Tue, Jul 26, 2016 at 11:48 AM, ayan guha  wrote:

> You can use rank with window function. Rank=1 is same as calling first().
>
> Not sure how you would randomly pick records though, if there is no Nth
> record. In your example, what happens if data is of only 2 rows?
> On 27 Jul 2016 00:57, "Alex Nastetsky" 
> wrote:
>
>> Spark SQL has a "first" function that returns the first item in a group.
>> Is there a similar function, perhaps in a third party lib, that allows you
>> to return an arbitrary (e.g. 3rd) item from the group? Was thinking of
>> writing a UDAF for it, but didn't want to reinvent the wheel. My endgoal is
>> to be able to select a random item from the group, using random number
>> generator.
>>
>> Thanks.
>>
>


Re: spark sql aggregate function "Nth"

2016-07-26 Thread ayan guha
You can use rank with window function. Rank=1 is same as calling first().

Not sure how you would randomly pick records though, if there is no Nth
record. In your example, what happens if data is of only 2 rows?
On 27 Jul 2016 00:57, "Alex Nastetsky" 
wrote:

> Spark SQL has a "first" function that returns the first item in a group.
> Is there a similar function, perhaps in a third party lib, that allows you
> to return an arbitrary (e.g. 3rd) item from the group? Was thinking of
> writing a UDAF for it, but didn't want to reinvent the wheel. My endgoal is
> to be able to select a random item from the group, using random number
> generator.
>
> Thanks.
>


Re: FileUtil.fullyDelete does ?

2016-07-26 Thread Divya Gehlot
What happened in my usecase ?
Even I know what it does :)
Need to know why they are deleting the src And destination file path

On Jul 26, 2016 10:20 PM, "praveenesh kumar"  wrote:

>
> https://hadoop.apache.org/docs/r2.7.1/api/org/apache/hadoop/fs/FileUtil.html#fullyDelete(java.io.File)
>
> On Tue, Jul 26, 2016 at 12:09 PM, Divya Gehlot 
> wrote:
>
>> Resending to right list
>> -- Forwarded message --
>> From: "Divya Gehlot" 
>> Date: Jul 26, 2016 6:51 PM
>> Subject: FileUtil.fullyDelete does ?
>> To: "user @spark" 
>> Cc:
>>
>> Hi,
>> When I am doing the using theFileUtil.copymerge function
>>
>> val file = "/tmp/primaryTypes.csv"
>>
>> FileUtil.fullyDelete(new File(file))
>>
>>  val destinationFile= "/tmp/singlePrimaryTypes.csv"
>>
>> FileUtil.fullyDelete(new File(destinationFile))
>>
>>  val counts = partitions.
>>
>> reduceByKey {case (x,y) => x + y}.
>>
>> sortBy {case (key, value) => -value}.
>>
>> map { case (key, value) => Array(key, value).mkString(",") }
>>
>>  counts.saveAsTextFile(file)
>>
>>  merge(file, destinationFile)
>>
>>
>> I am wondering here what does  FileUtil.fullyDelete(new 
>> File(destinationFile)) do ?
>>
>>   does it delete the merged file If yes,then how will we access the 
>> merged file ..?
>>
>>
>> Confused here ...
>>
>>
>>
>> Thanks,
>>
>> Divya
>>
>>
>>
>


sparse vector to dense vecotor in pyspark

2016-07-26 Thread pseudo oduesp
Hi ,
with standerscaler we get a sparse vector how i can transform it to list or
dense vector without missing the sparse values

thanks


Re: UDF returning generic Seq

2016-07-26 Thread Chris Beavers
Yong,

Thanks for the response. While those are good examples, they are able to
leverage the keytype/valuetype structure of Maps to specify an explicit
return type.

I guess maybe the more fundamental issue is that I want to support
heterogenous maps/arrays allowed by JSON: [1, "str", 2.345] or
{"name":"Chris","value":123}. Given the Spark SQL constraints that
ArrayType and MapType need explicit and consistent element types, I don't
see any way to support this in the current type system short of falling
back to binary data.

Open to other suggestions,
Chris

On Tue, Jul 26, 2016 at 9:42 AM Yong Zhang  wrote:

> I don't know the if "ANY" will work or not, but do you take a look about
> how "map_values" UDF implemented in Spark, which return map values of an
> array/seq of arbitrary type.
>
>
> https://issues.apache.org/jira/browse/SPARK-16279
>
>
> Yong
>
>
> --
> *From:* Chris Beavers 
> *Sent:* Monday, July 25, 2016 10:32 PM
> *To:* user@spark.apache.org
> *Subject:* UDF returning generic Seq
>
> Hey there,
>
> Interested in writing a UDF that returns an ArrayType column of unknown
> subtype. My understanding is that this translated JVM-type-wise be a Seq of
> generic templated type: Seq[Any]. I seem to be hitting the constraint at
> https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala:657
>  that
> basically necessitates a fully qualified schema on the return type (i.e.
> the templated Any is hitting the default exception throwing case at the end
> of schemaFor).
>
> Is there any more canonical way have a UDF produce an ArrayType column of
> unknown type? Or is my only alternative here to reduce this to BinaryType
> and use whatever encoding/data structures I want under the covers there and
> in subsequent UDFs?
>
> Thanks,
> Chris
>


ioStreams for DataFrameReader/Writer

2016-07-26 Thread Roger Holenweger

Hello all,

why are all DataFrameReader and Writer methods tied to Path objects, or 
in other words why are there no Reader/Writers that can load data from 
an Input or Output Stream?


After looking at the spark source code, i realize that there is no easy 
way to add methods that can handle ioStreams.


Does anybody know of a solution, without having to write a full-blown 
connector?



I basically want to export a DataFrame as a Parquet ByteStream and then 
take care of persisting the blob.


--
Regards
*Roger Holenweger*
LotaData 
/spatiotemporal intelligence/


spark sql aggregate function "Nth"

2016-07-26 Thread Alex Nastetsky
Spark SQL has a "first" function that returns the first item in a group. Is
there a similar function, perhaps in a third party lib, that allows you to
return an arbitrary (e.g. 3rd) item from the group? Was thinking of writing
a UDAF for it, but didn't want to reinvent the wheel. My endgoal is to be
able to select a random item from the group, using random number generator.

Thanks.


Re: UDF returning generic Seq

2016-07-26 Thread Yong Zhang
I don't know the if "ANY" will work or not, but do you take a look about how 
"map_values" UDF implemented in Spark, which return map values of an array/seq 
of arbitrary type.


https://issues.apache.org/jira/browse/SPARK-16279


Yong



From: Chris Beavers 
Sent: Monday, July 25, 2016 10:32 PM
To: user@spark.apache.org
Subject: UDF returning generic Seq

Hey there,

Interested in writing a UDF that returns an ArrayType column of unknown 
subtype. My understanding is that this translated JVM-type-wise be a Seq of 
generic templated type: Seq[Any]. I seem to be hitting the constraint at 
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala:657
 that basically necessitates a fully qualified schema on the return type (i.e. 
the templated Any is hitting the default exception throwing case at the end of 
schemaFor).

Is there any more canonical way have a UDF produce an ArrayType column of 
unknown type? Or is my only alternative here to reduce this to BinaryType and 
use whatever encoding/data structures I want under the covers there and in 
subsequent UDFs?

Thanks,
Chris


Re: sbt build under scala

2016-07-26 Thread Jacek Laskowski
Hi,

I don't think there are any sbt-related changes in Spark 2.0. Just
different versions in libraryDependencies.

As to the article, I'm surprised it didn't mention using sbt-assembly
[1] for docker-like deployment or sbt-native-packager [2] that could
create a Docker image.

[1] https://github.com/sbt/sbt-assembly
[2] http://www.scala-sbt.org/sbt-native-packager/

Pozdrawiam,
Jacek Laskowski

https://medium.com/@jaceklaskowski/
Mastering Apache Spark http://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski


On Tue, Jul 26, 2016 at 2:54 PM, Martin Somers  wrote:
> Just wondering
>
> Whats is the correct way of building a spark job using scala - are there any
> changes coming with spark v2
>
> Ive been following this post
>
> http://www.infoobjects.com/spark-submit-with-sbt/
>
>
>
> Then again Ive been mainly using docker locally what is decent container for
> submitting these jobs locally
>
> Im getting to a stage where I need to submit jobs remotely and thinking of
> the best way of doing so
>
>
> tks
>
> M

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Event Log Compression

2016-07-26 Thread Jacek Laskowski
Hi,

Guess it's spark.io.compression.codec with lz4 being default.
Supported are lzf or snappy.

Pozdrawiam,
Jacek Laskowski

https://medium.com/@jaceklaskowski/
Mastering Apache Spark http://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski


On Tue, Jul 26, 2016 at 3:33 PM, Bryan Jeffrey  wrote:
> All,
>
> I am running Spark 1.6.1. I enabled 'spark.eventLog.compress', and the data
> is now being compressed using lz4.  I would like to move that back to Snappy
> as I have some third party tools that require using Snappy.
>
> Is there a variable used to control Spark eventLog compression algorithm?
>
> Thank you,
>
> Bryan Jeffrey

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark streaming lost data when ReceiverTracker writes Blockinfo to hdfs timeout

2016-07-26 Thread Cody Koeninger
Can you go ahead and open a Jira ticket with that explanation?

Is there a reason you need to use receivers instead of the direct stream?

On Tue, Jul 26, 2016 at 4:45 AM, Andy Zhao  wrote:
> Hi guys,
>
> I wrote a spark streaming program which consume 1000 messages from one
> topic of Kafka, did some transformation, and wrote the result back to
> another topic. But only found 988 messages in the second topic. I checked
> log info and confirmed all messages was received by receivers. But I found a
> hdfs writing time out message printed from Class BatchedWriteAheadLog.
>
> I checkout source code and found code like this:
>
> /** Add received block. This event will get written to the write ahead
> log (if enabled). */
>   def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = {
> try {
>   val writeResult = writeToLog(BlockAdditionEvent(receivedBlockInfo))
>   if (writeResult) {
> synchronized {
>   getReceivedBlockQueue(receivedBlockInfo.streamId) +=
> receivedBlockInfo
> }
> logDebug(s"Stream ${receivedBlockInfo.streamId} received " +
>   s"block ${receivedBlockInfo.blockStoreResult.blockId}")
>   } else {
> logDebug(s"Failed to acknowledge stream
> ${receivedBlockInfo.streamId} receiving " +
>   s"block ${receivedBlockInfo.blockStoreResult.blockId} in the Write
> Ahead Log.")
>   }
>   writeResult
> } catch {
>   case NonFatal(e) =>
> logError(s"Error adding block $receivedBlockInfo", e)
> false
> }
>   }
>
>
> It seems that ReceiverTracker tries to write block info to hdfs, but the
> write operation time out, this cause writeToLog function return false, and
> this code "getReceivedBlockQueue(receivedBlockInfo.streamId) +=
> receivedBlockInfo" is skipped. so the block info is lost.
>
>The spark version I use is 1.6.1 and I did not turn on
> spark.streaming.receiver.writeAheadLog.enable.
>
>I want to know whether or not this is a designed behaviour.
>
> Thanks
>
>
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-lost-data-when-ReceiverTracker-writes-Blockinfo-to-hdfs-timeout-tp27410.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: ORC v/s Parquet for Spark 2.0

2016-07-26 Thread Ovidiu-Cristian MARCU
Interesting opinion, thank you

Still, on the website parquet is basically inspired by Dremel (Google) [1] and 
part of orc has been enhanced while deployed for Facebook, Yahoo [2].

Other than this presentation [3], do you guys know any other benchmark?

[1]https://parquet.apache.org/documentation/latest/ 

[2]https://orc.apache.org/docs/ 
[3] 
http://www.slideshare.net/oom65/file-format-benchmarks-avro-json-orc-parquet 


> On 26 Jul 2016, at 15:19, Koert Kuipers  wrote:
> 
> when parquet came out it was developed by a community of companies, and was 
> designed as a library to be supported by multiple big data projects. nice
> 
> orc on the other hand initially only supported hive. it wasn't even designed 
> as a library that can be re-used. even today it brings in the kitchen sink of 
> transitive dependencies. yikes
> 
> 
> On Jul 26, 2016 5:09 AM, "Jörn Franke"  > wrote:
> I think both are very similar, but with slightly different goals. While they 
> work transparently for each Hadoop application you need to enable specific 
> support in the application for predicate push down. 
> In the end you have to check which application you are using and do some 
> tests (with correct predicate push down configuration). Keep in mind that 
> both formats work best if they are sorted on filter columns (which is your 
> responsibility) and if their optimatizations are correctly configured (min 
> max index, bloom filter, compression etc) . 
> 
> If you need to ingest sensor data you may want to store it first in hbase and 
> then batch process it in large files in Orc or parquet format.
> 
> On 26 Jul 2016, at 04:09, janardhan shetty  > wrote:
> 
>> Just wondering advantages and disadvantages to convert data into ORC or 
>> Parquet. 
>> 
>> In the documentation of Spark there are numerous examples of Parquet format. 
>> 
>> Any strong reasons to chose Parquet over ORC file format ?
>> 
>> Also : current data compression is bzip2
>> 
>> http://stackoverflow.com/questions/32373460/parquet-vs-orc-vs-orc-with-snappy
>>  
>> 
>>  
>> This seems like biased.



Re: Num of executors and cores

2016-07-26 Thread Mail.com
Hi,

In spark submit, I specify --master yarn-client.
When I go to executors in UI I do see all the 12 different executors assigned. 
But for the stage when I drill down to Tasks I saw only 8 tasks with index 0-7.

I ran again increasing the number of executors as 15 and I now see 12 tasks for 
the stage.

Still like to understand even if 12 executors were available why there was only 
8 tasks for the stage. 

Thanks,
Pradeep



> On Jul 26, 2016, at 8:46 AM, Jacek Laskowski  wrote:
> 
> Hi,
> 
> Where's this yarn-client mode specified? When you said "However, when
> I run the job I see that the stage which reads the directory has only
> 8 tasks." -- how do you see 8 tasks for a stage? It appears you're in
> local[*] mode on a 8-core machine (like me) and that's why I'm asking
> such basic questions.
> 
> Pozdrawiam,
> Jacek Laskowski
> 
> https://medium.com/@jaceklaskowski/
> Mastering Apache Spark http://bit.ly/mastering-apache-spark
> Follow me at https://twitter.com/jaceklaskowski
> 
> 
>> On Tue, Jul 26, 2016 at 2:39 PM, Mail.com  wrote:
>> More of jars and files and app name. It runs on yarn-client mode.
>> 
>> Thanks,
>> Pradeep
>> 
>>> On Jul 26, 2016, at 7:10 AM, Jacek Laskowski  wrote:
>>> 
>>> Hi,
>>> 
>>> What's ""? What master URL do you use?
>>> 
>>> Pozdrawiam,
>>> Jacek Laskowski
>>> 
>>> https://medium.com/@jaceklaskowski/
>>> Mastering Apache Spark http://bit.ly/mastering-apache-spark
>>> Follow me at https://twitter.com/jaceklaskowski
>>> 
>>> 
 On Tue, Jul 26, 2016 at 2:18 AM, Mail.com  wrote:
 Hi All,
 
 I have a directory which has 12 files. I want to read the entire file so I 
 am reading it as wholeTextFiles(dirpath, numPartitions).
 
 I run spark-submit as  --num-executors 12 
 --executor-cores 1 and numPartitions 12.
 
 However, when I run the job I see that the stage which reads the directory 
 has only 8 tasks. So some task reads more than one file and takes twice 
 the time.
 
 What can I do that the files are read by 12 tasks  I.e one file per task.
 
 Thanks,
 Pradeep
 
 -
 To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>> 
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Web UI port 4040 not working

2016-07-26 Thread Jestin Ma
Also, sorry for the repeated updates, I checked my master UI and it says no
drivers are running but 1 application is running.

On Tue, Jul 26, 2016 at 6:49 AM, Jestin Ma 
wrote:

> I did netstat -apn | grep 4040 on machine 6, and I see
>
> tcp0  0 :::4040 :::*
>  LISTEN  30597/java
>
> What does this mean?
>
> On Tue, Jul 26, 2016 at 6:47 AM, Jestin Ma 
> wrote:
>
>> I do not deploy using cluster mode and I don't use EC2.
>>
>> I just read that launching as client mode: "the driver is launched
>> directly within the spark-submit process which acts as a *client* to the
>> cluster."
>>
>> My current setup is that I have cluster machines 1, 2, 3, 4, 5, with 1
>> being the master.
>> I submit from another cluster machine 6 in client mode. So I'm taking
>> that the driver is launched in my machine 6.
>>
>> On Tue, Jul 26, 2016 at 6:38 AM, Jacek Laskowski  wrote:
>>
>>> Hi,
>>>
>>> Do you perhaps deploy using cluster mode? Is this EC2? You'd need to
>>> figure out where the driver runs and use the machine's IP.
>>>
>>> Pozdrawiam,
>>> Jacek Laskowski
>>> 
>>> https://medium.com/@jaceklaskowski/
>>> Mastering Apache Spark http://bit.ly/mastering-apache-spark
>>> Follow me at https://twitter.com/jaceklaskowski
>>>
>>>
>>> On Tue, Jul 26, 2016 at 3:36 PM, Jestin Ma 
>>> wrote:
>>> > I tried doing that on my master node.
>>> > I got nothing.
>>> > However, I grep'd port 8080 and I got the standalone UI.
>>> >
>>> > On Tue, Jul 26, 2016 at 12:39 AM, Chanh Le 
>>> wrote:
>>> >>
>>> >> You’re running in StandAlone Mode?
>>> >> Usually inside active task it will show the address of current job.
>>> >> or you can check in master node by using netstat -apn | grep 4040
>>> >>
>>> >>
>>> >>
>>> >> > On Jul 26, 2016, at 8:21 AM, Jestin Ma 
>>> >> > wrote:
>>> >> >
>>> >> > Hello, when running spark jobs, I can access the master UI (port
>>> 8080
>>> >> > one) no problem. However, I'm confused as to how to access the web
>>> UI to see
>>> >> > jobs/tasks/stages/etc.
>>> >> >
>>> >> > I can access the master UI at http://:8080. But port
>>> 4040
>>> >> > gives me a -connection cannot be reached-.
>>> >> >
>>> >> > Is the web UI http:// with a port of 4040?
>>> >> >
>>> >> > I'm running my Spark job on a cluster machine and submitting it to a
>>> >> > master node part of the cluster. I heard of ssh tunneling; is that
>>> relevant
>>> >> > here?
>>> >> >
>>> >> > Thank you!
>>> >>
>>> >
>>>
>>
>>
>


Re: Spark Web UI port 4040 not working

2016-07-26 Thread Jestin Ma
I did netstat -apn | grep 4040 on machine 6, and I see

tcp0  0 :::4040 :::*
 LISTEN  30597/java

What does this mean?

On Tue, Jul 26, 2016 at 6:47 AM, Jestin Ma 
wrote:

> I do not deploy using cluster mode and I don't use EC2.
>
> I just read that launching as client mode: "the driver is launched
> directly within the spark-submit process which acts as a *client* to the
> cluster."
>
> My current setup is that I have cluster machines 1, 2, 3, 4, 5, with 1
> being the master.
> I submit from another cluster machine 6 in client mode. So I'm taking that
> the driver is launched in my machine 6.
>
> On Tue, Jul 26, 2016 at 6:38 AM, Jacek Laskowski  wrote:
>
>> Hi,
>>
>> Do you perhaps deploy using cluster mode? Is this EC2? You'd need to
>> figure out where the driver runs and use the machine's IP.
>>
>> Pozdrawiam,
>> Jacek Laskowski
>> 
>> https://medium.com/@jaceklaskowski/
>> Mastering Apache Spark http://bit.ly/mastering-apache-spark
>> Follow me at https://twitter.com/jaceklaskowski
>>
>>
>> On Tue, Jul 26, 2016 at 3:36 PM, Jestin Ma 
>> wrote:
>> > I tried doing that on my master node.
>> > I got nothing.
>> > However, I grep'd port 8080 and I got the standalone UI.
>> >
>> > On Tue, Jul 26, 2016 at 12:39 AM, Chanh Le  wrote:
>> >>
>> >> You’re running in StandAlone Mode?
>> >> Usually inside active task it will show the address of current job.
>> >> or you can check in master node by using netstat -apn | grep 4040
>> >>
>> >>
>> >>
>> >> > On Jul 26, 2016, at 8:21 AM, Jestin Ma 
>> >> > wrote:
>> >> >
>> >> > Hello, when running spark jobs, I can access the master UI (port 8080
>> >> > one) no problem. However, I'm confused as to how to access the web
>> UI to see
>> >> > jobs/tasks/stages/etc.
>> >> >
>> >> > I can access the master UI at http://:8080. But port
>> 4040
>> >> > gives me a -connection cannot be reached-.
>> >> >
>> >> > Is the web UI http:// with a port of 4040?
>> >> >
>> >> > I'm running my Spark job on a cluster machine and submitting it to a
>> >> > master node part of the cluster. I heard of ssh tunneling; is that
>> relevant
>> >> > here?
>> >> >
>> >> > Thank you!
>> >>
>> >
>>
>
>


Re: Spark Web UI port 4040 not working

2016-07-26 Thread Jestin Ma
I do not deploy using cluster mode and I don't use EC2.

I just read that launching as client mode: "the driver is launched directly
within the spark-submit process which acts as a *client* to the cluster."

My current setup is that I have cluster machines 1, 2, 3, 4, 5, with 1
being the master.
I submit from another cluster machine 6 in client mode. So I'm taking that
the driver is launched in my machine 6.

On Tue, Jul 26, 2016 at 6:38 AM, Jacek Laskowski  wrote:

> Hi,
>
> Do you perhaps deploy using cluster mode? Is this EC2? You'd need to
> figure out where the driver runs and use the machine's IP.
>
> Pozdrawiam,
> Jacek Laskowski
> 
> https://medium.com/@jaceklaskowski/
> Mastering Apache Spark http://bit.ly/mastering-apache-spark
> Follow me at https://twitter.com/jaceklaskowski
>
>
> On Tue, Jul 26, 2016 at 3:36 PM, Jestin Ma 
> wrote:
> > I tried doing that on my master node.
> > I got nothing.
> > However, I grep'd port 8080 and I got the standalone UI.
> >
> > On Tue, Jul 26, 2016 at 12:39 AM, Chanh Le  wrote:
> >>
> >> You’re running in StandAlone Mode?
> >> Usually inside active task it will show the address of current job.
> >> or you can check in master node by using netstat -apn | grep 4040
> >>
> >>
> >>
> >> > On Jul 26, 2016, at 8:21 AM, Jestin Ma 
> >> > wrote:
> >> >
> >> > Hello, when running spark jobs, I can access the master UI (port 8080
> >> > one) no problem. However, I'm confused as to how to access the web UI
> to see
> >> > jobs/tasks/stages/etc.
> >> >
> >> > I can access the master UI at http://:8080. But port
> 4040
> >> > gives me a -connection cannot be reached-.
> >> >
> >> > Is the web UI http:// with a port of 4040?
> >> >
> >> > I'm running my Spark job on a cluster machine and submitting it to a
> >> > master node part of the cluster. I heard of ssh tunneling; is that
> relevant
> >> > here?
> >> >
> >> > Thank you!
> >>
> >
>


Question on set membership / diff sync technique in Spark

2016-07-26 Thread Natu Lauchande
Hi,

I am working on a data pipeline in a Spark Streaming app that receives data
as a CSV regularly.

After some enrichment we send the data to another storage layer(ES in the
case). Some of the records in the incoming CSV might be repeated.

I am trying to devise a strategy based on MD5's of the lines to avoid
processing already seen lines , i wonder what would be the best approach
to store this data. I would prefer the data to be located within HDFS
within the same cluster.

I am considering a couple of formats :
- Parquet
- Sequence Files
- Avro
- Apache Arrow (Doesn't sound to have a production version ready yet)

Questions:

1. Is there any alternative approach to avoid re-processing the same rows .

2. Which data storage/technique is more indicated for this kind of set
membership operation.

Any help and thoughts are very much welcome .

Thanks in advance,
Natu


Re: Spark Web UI port 4040 not working

2016-07-26 Thread Jestin Ma
I tried doing that on my master node.
I got nothing.
However, I grep'd port 8080 and I got the standalone UI.

On Tue, Jul 26, 2016 at 12:39 AM, Chanh Le  wrote:

> You’re running in StandAlone Mode?
> Usually inside active task it will show the address of current job.
> or you can check in master node by using netstat -apn | grep 4040
>
>
>
> > On Jul 26, 2016, at 8:21 AM, Jestin Ma 
> wrote:
> >
> > Hello, when running spark jobs, I can access the master UI (port 8080
> one) no problem. However, I'm confused as to how to access the web UI to
> see jobs/tasks/stages/etc.
> >
> > I can access the master UI at http://:8080. But port 4040
> gives me a -connection cannot be reached-.
> >
> > Is the web UI http:// with a port of 4040?
> >
> > I'm running my Spark job on a cluster machine and submitting it to a
> master node part of the cluster. I heard of ssh tunneling; is that relevant
> here?
> >
> > Thank you!
>
>


Re: Spark Web UI port 4040 not working

2016-07-26 Thread Jacek Laskowski
Hi,

Do you perhaps deploy using cluster mode? Is this EC2? You'd need to
figure out where the driver runs and use the machine's IP.

Pozdrawiam,
Jacek Laskowski

https://medium.com/@jaceklaskowski/
Mastering Apache Spark http://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski


On Tue, Jul 26, 2016 at 3:36 PM, Jestin Ma  wrote:
> I tried doing that on my master node.
> I got nothing.
> However, I grep'd port 8080 and I got the standalone UI.
>
> On Tue, Jul 26, 2016 at 12:39 AM, Chanh Le  wrote:
>>
>> You’re running in StandAlone Mode?
>> Usually inside active task it will show the address of current job.
>> or you can check in master node by using netstat -apn | grep 4040
>>
>>
>>
>> > On Jul 26, 2016, at 8:21 AM, Jestin Ma 
>> > wrote:
>> >
>> > Hello, when running spark jobs, I can access the master UI (port 8080
>> > one) no problem. However, I'm confused as to how to access the web UI to 
>> > see
>> > jobs/tasks/stages/etc.
>> >
>> > I can access the master UI at http://:8080. But port 4040
>> > gives me a -connection cannot be reached-.
>> >
>> > Is the web UI http:// with a port of 4040?
>> >
>> > I'm running my Spark job on a cluster machine and submitting it to a
>> > master node part of the cluster. I heard of ssh tunneling; is that relevant
>> > here?
>> >
>> > Thank you!
>>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Event Log Compression

2016-07-26 Thread Bryan Jeffrey
All,

I am running Spark 1.6.1. I enabled 'spark.eventLog.compress', and the data
is now being compressed using lz4.  I would like to move that back to
Snappy as I have some third party tools that require using Snappy.

Is there a variable used to control Spark eventLog compression algorithm?

Thank you,

Bryan Jeffrey


Re: dataframe.foreach VS dataframe.collect().foreach

2016-07-26 Thread Pedro Rodriguez
:)

Just realized you didn't get your original question answered though:

scala> import sqlContext.implicits._
import sqlContext.implicits._

scala> case class Person(age: Long, name: String)
defined class Person

scala> val df = Seq(Person(24, "pedro"), Person(22, "fritz")).toDF()
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

scala> df.select("age")
res2: org.apache.spark.sql.DataFrame = [age: bigint]

scala> df.select("age").collect.map(_.getLong(0))
res3: Array[Long] = Array(24, 22)

scala> import org.apache.spark.sql.Row
import org.apache.spark.sql.Row

scala> df.collect.flatMap {
 | case Row(age: Long, name: String) => Seq(Tuple1(age))
 | case _ => Seq()
 | }
res7: Array[(Long,)] = Array((24,), (22,))

These docs are helpful
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Row
(1.6 docs, but should be similar in 2.0)

On Tue, Jul 26, 2016 at 7:08 AM, Gourav Sengupta 
wrote:

> And Pedro has made sense of a world running amok, scared, and drunken
> stupor.
>
> Regards,
> Gourav
>
> On Tue, Jul 26, 2016 at 2:01 PM, Pedro Rodriguez 
> wrote:
>
>> I am not 100% as I haven't tried this out, but there is a huge difference
>> between the two. Both foreach and collect are actions irregardless of
>> whether or not the data frame is empty.
>>
>> Doing a collect will bring all the results back to the driver, possibly
>> forcing it to run out of memory. Foreach will apply your function to each
>> element of the DataFrame, but will do so across the cluster. This behavior
>> is useful for when you need to do something custom for each element
>> (perhaps save to a db for which there is no driver or something custom like
>> make an http request per element, careful here though due to overhead cost).
>>
>> In your example, I am going to assume that hrecords is something like a
>> list buffer. The reason that will be empty is that each worker will get
>> sent an empty list (its captured in the closure for foreach) and append to
>> it. The instance of the list at the driver doesn't know about what happened
>> at the workers so its empty.
>>
>> I don't know why Chanh's comment applies here since I am guessing the df
>> is not empty.
>>
>> On Tue, Jul 26, 2016 at 1:53 AM, kevin  wrote:
>>
>>> thank you Chanh
>>>
>>> 2016-07-26 15:34 GMT+08:00 Chanh Le :
>>>
 Hi Ken,

 *blacklistDF -> just DataFrame *
 Spark is lazy until you call something like* collect, take, write* it
 will execute the hold process *like you do map or filter before you
 collect*.
 That mean until you call collect spark* do nothing* so you df would
 not have any data -> can’t call foreach.
 Call collect execute the process -> get data -> foreach is ok.


 On Jul 26, 2016, at 2:30 PM, kevin  wrote:

  blacklistDF.collect()



>>>
>>
>>
>> --
>> Pedro Rodriguez
>> PhD Student in Distributed Machine Learning | CU Boulder
>> UC Berkeley AMPLab Alumni
>>
>> ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423
>> Github: github.com/EntilZha | LinkedIn:
>> https://www.linkedin.com/in/pedrorodriguezscience
>>
>>
>


-- 
Pedro Rodriguez
PhD Student in Distributed Machine Learning | CU Boulder
UC Berkeley AMPLab Alumni

ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423
Github: github.com/EntilZha | LinkedIn:
https://www.linkedin.com/in/pedrorodriguezscience


Re: Outer Explode needed

2016-07-26 Thread Yong Zhang
The reason of no response is that this feature is not available yet.


You can vote and following this JIRA 
https://issues.apache.org/jira/browse/SPARK-13721, if you really need this 
feature.


Yong



From: Don Drake 
Sent: Monday, July 25, 2016 9:12 PM
To: d...@spark.apache.org
Subject: Fwd: Outer Explode needed

No response on the Users list, I thought I would repost here.

See below.

-Don
-- Forwarded message --
From: Don Drake mailto:dondr...@gmail.com>>
Date: Sun, Jul 24, 2016 at 2:18 PM
Subject: Outer Explode needed
To: user mailto:user@spark.apache.org>>


I have a nested data structure (array of structures) that I'm using the DSL 
df.explode() API to flatten the data.  However, when the array is empty, I'm 
not getting the rest of the row in my output as it is skipped.

This is the intended behavior, and Hive supports a SQL "OUTER explode()" to 
generate the row when the explode would not yield any output.

https://cwiki.apache.org/confluence/display/Hive/LanguageManual+LateralView

Can we get this same outer explode in the DSL?  I have to jump through some 
outer join hoops to get the rows where the array is empty.

Thanks.

-Don

--
Donald Drake
Drake Consulting
http://www.drakeconsulting.com/
https://twitter.com/dondrake
800-733-2143



--
Donald Drake
Drake Consulting
http://www.drakeconsulting.com/
https://twitter.com/dondrake
800-733-2143


Re: ORC v/s Parquet for Spark 2.0

2016-07-26 Thread Koert Kuipers
when parquet came out it was developed by a community of companies, and was
designed as a library to be supported by multiple big data projects. nice

orc on the other hand initially only supported hive. it wasn't even
designed as a library that can be re-used. even today it brings in the
kitchen sink of transitive dependencies. yikes

On Jul 26, 2016 5:09 AM, "Jörn Franke"  wrote:

> I think both are very similar, but with slightly different goals. While
> they work transparently for each Hadoop application you need to enable
> specific support in the application for predicate push down.
> In the end you have to check which application you are using and do some
> tests (with correct predicate push down configuration). Keep in mind that
> both formats work best if they are sorted on filter columns (which is your
> responsibility) and if their optimatizations are correctly configured (min
> max index, bloom filter, compression etc) .
>
> If you need to ingest sensor data you may want to store it first in hbase
> and then batch process it in large files in Orc or parquet format.
>
> On 26 Jul 2016, at 04:09, janardhan shetty  wrote:
>
> Just wondering advantages and disadvantages to convert data into ORC or
> Parquet.
>
> In the documentation of Spark there are numerous examples of Parquet
> format.
>
> Any strong reasons to chose Parquet over ORC file format ?
>
> Also : current data compression is bzip2
>
>
> http://stackoverflow.com/questions/32373460/parquet-vs-orc-vs-orc-with-snappy
> This seems like biased.
>
>


Re: dataframe.foreach VS dataframe.collect().foreach

2016-07-26 Thread Gourav Sengupta
And Pedro has made sense of a world running amok, scared, and drunken
stupor.

Regards,
Gourav

On Tue, Jul 26, 2016 at 2:01 PM, Pedro Rodriguez 
wrote:

> I am not 100% as I haven't tried this out, but there is a huge difference
> between the two. Both foreach and collect are actions irregardless of
> whether or not the data frame is empty.
>
> Doing a collect will bring all the results back to the driver, possibly
> forcing it to run out of memory. Foreach will apply your function to each
> element of the DataFrame, but will do so across the cluster. This behavior
> is useful for when you need to do something custom for each element
> (perhaps save to a db for which there is no driver or something custom like
> make an http request per element, careful here though due to overhead cost).
>
> In your example, I am going to assume that hrecords is something like a
> list buffer. The reason that will be empty is that each worker will get
> sent an empty list (its captured in the closure for foreach) and append to
> it. The instance of the list at the driver doesn't know about what happened
> at the workers so its empty.
>
> I don't know why Chanh's comment applies here since I am guessing the df
> is not empty.
>
> On Tue, Jul 26, 2016 at 1:53 AM, kevin  wrote:
>
>> thank you Chanh
>>
>> 2016-07-26 15:34 GMT+08:00 Chanh Le :
>>
>>> Hi Ken,
>>>
>>> *blacklistDF -> just DataFrame *
>>> Spark is lazy until you call something like* collect, take, write* it
>>> will execute the hold process *like you do map or filter before you
>>> collect*.
>>> That mean until you call collect spark* do nothing* so you df would not
>>> have any data -> can’t call foreach.
>>> Call collect execute the process -> get data -> foreach is ok.
>>>
>>>
>>> On Jul 26, 2016, at 2:30 PM, kevin  wrote:
>>>
>>>  blacklistDF.collect()
>>>
>>>
>>>
>>
>
>
> --
> Pedro Rodriguez
> PhD Student in Distributed Machine Learning | CU Boulder
> UC Berkeley AMPLab Alumni
>
> ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423
> Github: github.com/EntilZha | LinkedIn:
> https://www.linkedin.com/in/pedrorodriguezscience
>
>


Re: dataframe.foreach VS dataframe.collect().foreach

2016-07-26 Thread Pedro Rodriguez
I am not 100% as I haven't tried this out, but there is a huge difference
between the two. Both foreach and collect are actions irregardless of
whether or not the data frame is empty.

Doing a collect will bring all the results back to the driver, possibly
forcing it to run out of memory. Foreach will apply your function to each
element of the DataFrame, but will do so across the cluster. This behavior
is useful for when you need to do something custom for each element
(perhaps save to a db for which there is no driver or something custom like
make an http request per element, careful here though due to overhead cost).

In your example, I am going to assume that hrecords is something like a
list buffer. The reason that will be empty is that each worker will get
sent an empty list (its captured in the closure for foreach) and append to
it. The instance of the list at the driver doesn't know about what happened
at the workers so its empty.

I don't know why Chanh's comment applies here since I am guessing the df is
not empty.

On Tue, Jul 26, 2016 at 1:53 AM, kevin  wrote:

> thank you Chanh
>
> 2016-07-26 15:34 GMT+08:00 Chanh Le :
>
>> Hi Ken,
>>
>> *blacklistDF -> just DataFrame *
>> Spark is lazy until you call something like* collect, take, write* it
>> will execute the hold process *like you do map or filter before you
>> collect*.
>> That mean until you call collect spark* do nothing* so you df would not
>> have any data -> can’t call foreach.
>> Call collect execute the process -> get data -> foreach is ok.
>>
>>
>> On Jul 26, 2016, at 2:30 PM, kevin  wrote:
>>
>>  blacklistDF.collect()
>>
>>
>>
>


-- 
Pedro Rodriguez
PhD Student in Distributed Machine Learning | CU Boulder
UC Berkeley AMPLab Alumni

ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423
Github: github.com/EntilZha | LinkedIn:
https://www.linkedin.com/in/pedrorodriguezscience


sbt build under scala

2016-07-26 Thread Martin Somers
Just wondering

Whats is the correct way of building a spark job using scala - are there
any changes coming with spark v2

Ive been following this post

http://www.infoobjects.com/spark-submit-with-sbt/



Then again Ive been mainly using docker locally what is decent container
for submitting these jobs locally

Im getting to a stage where I need to submit jobs remotely and thinking of
the best way of doing so


tks

M


Re: Num of executors and cores

2016-07-26 Thread Jacek Laskowski
Hi,

Where's this yarn-client mode specified? When you said "However, when
I run the job I see that the stage which reads the directory has only
8 tasks." -- how do you see 8 tasks for a stage? It appears you're in
local[*] mode on a 8-core machine (like me) and that's why I'm asking
such basic questions.

Pozdrawiam,
Jacek Laskowski

https://medium.com/@jaceklaskowski/
Mastering Apache Spark http://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski


On Tue, Jul 26, 2016 at 2:39 PM, Mail.com  wrote:
> More of jars and files and app name. It runs on yarn-client mode.
>
> Thanks,
> Pradeep
>
>> On Jul 26, 2016, at 7:10 AM, Jacek Laskowski  wrote:
>>
>> Hi,
>>
>> What's ""? What master URL do you use?
>>
>> Pozdrawiam,
>> Jacek Laskowski
>> 
>> https://medium.com/@jaceklaskowski/
>> Mastering Apache Spark http://bit.ly/mastering-apache-spark
>> Follow me at https://twitter.com/jaceklaskowski
>>
>>
>>> On Tue, Jul 26, 2016 at 2:18 AM, Mail.com  wrote:
>>> Hi All,
>>>
>>> I have a directory which has 12 files. I want to read the entire file so I 
>>> am reading it as wholeTextFiles(dirpath, numPartitions).
>>>
>>> I run spark-submit as  --num-executors 12 --executor-cores 
>>> 1 and numPartitions 12.
>>>
>>> However, when I run the job I see that the stage which reads the directory 
>>> has only 8 tasks. So some task reads more than one file and takes twice the 
>>> time.
>>>
>>> What can I do that the files are read by 12 tasks  I.e one file per task.
>>>
>>> Thanks,
>>> Pradeep
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Num of executors and cores

2016-07-26 Thread Mail.com
More of jars and files and app name. It runs on yarn-client mode.

Thanks,
Pradeep

> On Jul 26, 2016, at 7:10 AM, Jacek Laskowski  wrote:
> 
> Hi,
> 
> What's ""? What master URL do you use?
> 
> Pozdrawiam,
> Jacek Laskowski
> 
> https://medium.com/@jaceklaskowski/
> Mastering Apache Spark http://bit.ly/mastering-apache-spark
> Follow me at https://twitter.com/jaceklaskowski
> 
> 
>> On Tue, Jul 26, 2016 at 2:18 AM, Mail.com  wrote:
>> Hi All,
>> 
>> I have a directory which has 12 files. I want to read the entire file so I 
>> am reading it as wholeTextFiles(dirpath, numPartitions).
>> 
>> I run spark-submit as  --num-executors 12 --executor-cores 
>> 1 and numPartitions 12.
>> 
>> However, when I run the job I see that the stage which reads the directory 
>> has only 8 tasks. So some task reads more than one file and takes twice the 
>> time.
>> 
>> What can I do that the files are read by 12 tasks  I.e one file per task.
>> 
>> Thanks,
>> Pradeep
>> 
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: DAGScheduler: Job 20 finished: collectAsMap at DecisionTree.scala:651, took 19.556700 s Killed

2016-07-26 Thread Jacek Laskowski
Hi,

Anything relevant in ApplicationMaster's log? What about the
executors? You should have 2 (default) so review the logs of each
executors.

Pozdrawiam,
Jacek Laskowski

https://medium.com/@jaceklaskowski/
Mastering Apache Spark http://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski


On Tue, Jul 26, 2016 at 1:17 PM, Ascot Moss  wrote:
> It is YARN cluster,
>
> /bin/spark-submit \
>
> --conf "spark.executor.extraJavaOptions=-XX:+UseG1GC -XX:+PrintGCTimeStamps
> -XX:+PrintGCDetails" \
>
> --driver-memory 64G \
>
> --executor-memory 16g \
>
>
> On Tue, Jul 26, 2016 at 7:00 PM, Jacek Laskowski  wrote:
>>
>> Hi,
>>
>> What's the cluster manager? Is this YARN perhaps? Do you have any
>> other apps on the cluster? How do you submit your app? What are the
>> properties?
>>
>> Pozdrawiam,
>> Jacek Laskowski
>> 
>> https://medium.com/@jaceklaskowski/
>> Mastering Apache Spark http://bit.ly/mastering-apache-spark
>> Follow me at https://twitter.com/jaceklaskowski
>>
>>
>> On Tue, Jul 26, 2016 at 1:27 AM, Ascot Moss  wrote:
>> > Hi,
>> >
>> > spark: 1.6.1
>> > java: java 1.8_u40
>> > I tried random forest training phase, the same code works well if with
>> > 20
>> > trees (lower accuracy, about 68%).  When trying the training phase with
>> > more
>> > tree, I set to 200 trees, it returned:
>> >
>> > "DAGScheduler: Job 20 finished: collectAsMap at DecisionTree.scala:651,
>> > took
>> > 19.556700 s Killed" .  There is no WARN or ERROR from console, the task
>> > is
>> > just stopped in the end.
>> >
>> > Any idea how to resolve it? Should the timeout parameter be set to
>> > longer
>> >
>> > regards
>> >
>> >
>> > (below is the log from console)
>> >
>> > 16/07/26 00:02:47 INFO DAGScheduler: looking for newly runnable stages
>> >
>> > 16/07/26 00:02:47 INFO DAGScheduler: running: Set()
>> >
>> > 16/07/26 00:02:47 INFO DAGScheduler: waiting: Set(ResultStage 32)
>> >
>> > 16/07/26 00:02:47 INFO DAGScheduler: failed: Set()
>> >
>> > 16/07/26 00:02:47 INFO DAGScheduler: Submitting ResultStage 32
>> > (MapPartitionsRDD[75] at map at DecisionTree.scala:642), which has no
>> > missing parents
>> >
>> > 16/07/26 00:02:47 INFO MemoryStore: Block broadcast_48 stored as values
>> > in
>> > memory (estimated size 2.2 MB, free 18.2 MB)
>> >
>> > 16/07/26 00:02:47 INFO MemoryStore: Block broadcast_48_piece0 stored as
>> > bytes in memory (estimated size 436.9 KB, free 18.7 MB)
>> >
>> > 16/07/26 00:02:47 INFO BlockManagerInfo: Added broadcast_48_piece0 in
>> > memory
>> > on x.x.x.x:35450 (size: 436.9 KB, free: 45.8 GB)
>> >
>> > 16/07/26 00:02:47 INFO SparkContext: Created broadcast 48 from broadcast
>> > at
>> > DAGScheduler.scala:1006
>> >
>> > 16/07/26 00:02:47 INFO DAGScheduler: Submitting 4 missing tasks from
>> > ResultStage 32 (MapPartitionsRDD[75] at map at DecisionTree.scala:642)
>> >
>> > 16/07/26 00:02:47 INFO TaskSchedulerImpl: Adding task set 32.0 with 4
>> > tasks
>> >
>> > 16/07/26 00:02:47 INFO TaskSetManager: Starting task 0.0 in stage 32.0
>> > (TID
>> > 185, x.x.x.x, partition 0,NODE_LOCAL, 1956 bytes)
>> >
>> > 16/07/26 00:02:47 INFO TaskSetManager: Starting task 1.0 in stage 32.0
>> > (TID
>> > 186, x.x.x.x, partition 1,NODE_LOCAL, 1956 bytes)
>> >
>> > 16/07/26 00:02:47 INFO TaskSetManager: Starting task 2.0 in stage 32.0
>> > (TID
>> > 187, x.x.x.x, partition 2,NODE_LOCAL, 1956 bytes)
>> >
>> > 16/07/26 00:02:47 INFO TaskSetManager: Starting task 3.0 in stage 32.0
>> > (TID
>> > 188, x.x.x.x, partition 3,NODE_LOCAL, 1956 bytes)
>> >
>> > 16/07/26 00:02:47 INFO BlockManagerInfo: Added broadcast_48_piece0 in
>> > memory
>> > on x.x.x.x:58784 (size: 436.9 KB, free: 5.1 GB)
>> >
>> > 16/07/26 00:02:47 INFO MapOutputTrackerMasterEndpoint: Asked to send map
>> > output locations for shuffle 12 to x.x.x.x:44434
>> >
>> > 16/07/26 00:02:47 INFO MapOutputTrackerMaster: Size of output statuses
>> > for
>> > shuffle 12 is 180 bytes
>> >
>> > 16/07/26 00:02:47 INFO BlockManagerInfo: Added broadcast_48_piece0 in
>> > memory
>> > on x.x.x.x:46186 (size: 436.9 KB, free: 2.2 GB)
>> >
>> > 16/07/26 00:02:47 INFO BlockManagerInfo: Added broadcast_48_piece0 in
>> > memory
>> > on x.x.x.x:50132 (size: 436.9 KB, free: 5.0 GB)
>> >
>> > 16/07/26 00:02:47 INFO MapOutputTrackerMasterEndpoint: Asked to send map
>> > output locations for shuffle 12 to x.x.x.x:47272
>> >
>> > 16/07/26 00:02:47 INFO MapOutputTrackerMasterEndpoint: Asked to send map
>> > output locations for shuffle 12 to x.x.x.x:46802
>> >
>> > 16/07/26 00:02:49 INFO TaskSetManager: Finished task 2.0 in stage 32.0
>> > (TID
>> > 187) in 2265 ms on x.x.x.x (1/4)
>> >
>> > 16/07/26 00:02:49 INFO TaskSetManager: Finished task 1.0 in stage 32.0
>> > (TID
>> > 186) in 2266 ms on x.x.x.x (2/4)
>> >
>> > 16/07/26 00:02:50 INFO TaskSetManager: Finished task 0.0 in stage 32.0
>> > (TID
>> > 185) in 2794 ms on x.x.x.x (3/4)
>> >
>> > 16/07/26 00:02:50 INFO TaskSetManager: Finished task 3.0 in stage 32.0
>> > (TID
>> > 188) in 3738 ms on x.

Re: ORC v/s Parquet for Spark 2.0

2016-07-26 Thread Ovidiu-Cristian MARCU
So did you tried actually to run your use case with spark 2.0 and orc files?
It’s hard to understand your ‘apparently..’.

Best,
Ovidiu
> On 26 Jul 2016, at 13:10, Gourav Sengupta  wrote:
> 
> If you have ever tried to use ORC via SPARK you will know that SPARK's 
> promise of accessing ORC files is a sham. SPARK cannot access partitioned 
> tables via HIVEcontext which are ORC, SPARK cannot stripe through ORC faster 
> and what more, if you are using SQL and have thought of using HIVE with ORC 
> on TEZ, then it runs way better, faster and leaner than SPARK. 
> 
> I can process almost a few billion records close to a terabyte in a cluster 
> with around 100GB RAM and 40 cores in a few hours, and find it a challenge 
> doing the same with SPARK. 
> 
> But apparently, everything is resolved in SPARK 2.0.
> 
> 
> Regards,
> Gourav Sengupta
> 
> On Tue, Jul 26, 2016 at 11:50 AM, Ofir Manor  > wrote:
> One additional point specific to Spark 2.0 - for the alpha Structured 
> Streaming API (only),  the file sink only supports Parquet format (I'm sure 
> that limitation will be lifted in a future release before Structured 
> Streaming is GA):
>  "File sink - Stores the output to a directory. As of Spark 2.0, this 
> only supports Parquet file format, and Append output mode."
>  
> http://people.apache.org/~pwendell/spark-releases/spark-2.0.0-rc5-docs/structured-streaming-programming-guide.html#where-to-go-from-here
>  
> 
> 
> ​
> 



Re: DAGScheduler: Job 20 finished: collectAsMap at DecisionTree.scala:651, took 19.556700 s Killed

2016-07-26 Thread Ascot Moss
It is YARN cluster,

/bin/spark-submit \

--conf "spark.executor.extraJavaOptions=-XX:+UseG1GC
-XX:+PrintGCTimeStamps -XX:+PrintGCDetails"
\

--driver-memory 64G \

--executor-memory 16g \


On Tue, Jul 26, 2016 at 7:00 PM, Jacek Laskowski  wrote:

> Hi,
>
> What's the cluster manager? Is this YARN perhaps? Do you have any
> other apps on the cluster? How do you submit your app? What are the
> properties?
>
> Pozdrawiam,
> Jacek Laskowski
> 
> https://medium.com/@jaceklaskowski/
> Mastering Apache Spark http://bit.ly/mastering-apache-spark
> Follow me at https://twitter.com/jaceklaskowski
>
>
> On Tue, Jul 26, 2016 at 1:27 AM, Ascot Moss  wrote:
> > Hi,
> >
> > spark: 1.6.1
> > java: java 1.8_u40
> > I tried random forest training phase, the same code works well if with 20
> > trees (lower accuracy, about 68%).  When trying the training phase with
> more
> > tree, I set to 200 trees, it returned:
> >
> > "DAGScheduler: Job 20 finished: collectAsMap at DecisionTree.scala:651,
> took
> > 19.556700 s Killed" .  There is no WARN or ERROR from console, the task
> is
> > just stopped in the end.
> >
> > Any idea how to resolve it? Should the timeout parameter be set to longer
> >
> > regards
> >
> >
> > (below is the log from console)
> >
> > 16/07/26 00:02:47 INFO DAGScheduler: looking for newly runnable stages
> >
> > 16/07/26 00:02:47 INFO DAGScheduler: running: Set()
> >
> > 16/07/26 00:02:47 INFO DAGScheduler: waiting: Set(ResultStage 32)
> >
> > 16/07/26 00:02:47 INFO DAGScheduler: failed: Set()
> >
> > 16/07/26 00:02:47 INFO DAGScheduler: Submitting ResultStage 32
> > (MapPartitionsRDD[75] at map at DecisionTree.scala:642), which has no
> > missing parents
> >
> > 16/07/26 00:02:47 INFO MemoryStore: Block broadcast_48 stored as values
> in
> > memory (estimated size 2.2 MB, free 18.2 MB)
> >
> > 16/07/26 00:02:47 INFO MemoryStore: Block broadcast_48_piece0 stored as
> > bytes in memory (estimated size 436.9 KB, free 18.7 MB)
> >
> > 16/07/26 00:02:47 INFO BlockManagerInfo: Added broadcast_48_piece0 in
> memory
> > on x.x.x.x:35450 (size: 436.9 KB, free: 45.8 GB)
> >
> > 16/07/26 00:02:47 INFO SparkContext: Created broadcast 48 from broadcast
> at
> > DAGScheduler.scala:1006
> >
> > 16/07/26 00:02:47 INFO DAGScheduler: Submitting 4 missing tasks from
> > ResultStage 32 (MapPartitionsRDD[75] at map at DecisionTree.scala:642)
> >
> > 16/07/26 00:02:47 INFO TaskSchedulerImpl: Adding task set 32.0 with 4
> tasks
> >
> > 16/07/26 00:02:47 INFO TaskSetManager: Starting task 0.0 in stage 32.0
> (TID
> > 185, x.x.x.x, partition 0,NODE_LOCAL, 1956 bytes)
> >
> > 16/07/26 00:02:47 INFO TaskSetManager: Starting task 1.0 in stage 32.0
> (TID
> > 186, x.x.x.x, partition 1,NODE_LOCAL, 1956 bytes)
> >
> > 16/07/26 00:02:47 INFO TaskSetManager: Starting task 2.0 in stage 32.0
> (TID
> > 187, x.x.x.x, partition 2,NODE_LOCAL, 1956 bytes)
> >
> > 16/07/26 00:02:47 INFO TaskSetManager: Starting task 3.0 in stage 32.0
> (TID
> > 188, x.x.x.x, partition 3,NODE_LOCAL, 1956 bytes)
> >
> > 16/07/26 00:02:47 INFO BlockManagerInfo: Added broadcast_48_piece0 in
> memory
> > on x.x.x.x:58784 (size: 436.9 KB, free: 5.1 GB)
> >
> > 16/07/26 00:02:47 INFO MapOutputTrackerMasterEndpoint: Asked to send map
> > output locations for shuffle 12 to x.x.x.x:44434
> >
> > 16/07/26 00:02:47 INFO MapOutputTrackerMaster: Size of output statuses
> for
> > shuffle 12 is 180 bytes
> >
> > 16/07/26 00:02:47 INFO BlockManagerInfo: Added broadcast_48_piece0 in
> memory
> > on x.x.x.x:46186 (size: 436.9 KB, free: 2.2 GB)
> >
> > 16/07/26 00:02:47 INFO BlockManagerInfo: Added broadcast_48_piece0 in
> memory
> > on x.x.x.x:50132 (size: 436.9 KB, free: 5.0 GB)
> >
> > 16/07/26 00:02:47 INFO MapOutputTrackerMasterEndpoint: Asked to send map
> > output locations for shuffle 12 to x.x.x.x:47272
> >
> > 16/07/26 00:02:47 INFO MapOutputTrackerMasterEndpoint: Asked to send map
> > output locations for shuffle 12 to x.x.x.x:46802
> >
> > 16/07/26 00:02:49 INFO TaskSetManager: Finished task 2.0 in stage 32.0
> (TID
> > 187) in 2265 ms on x.x.x.x (1/4)
> >
> > 16/07/26 00:02:49 INFO TaskSetManager: Finished task 1.0 in stage 32.0
> (TID
> > 186) in 2266 ms on x.x.x.x (2/4)
> >
> > 16/07/26 00:02:50 INFO TaskSetManager: Finished task 0.0 in stage 32.0
> (TID
> > 185) in 2794 ms on x.x.x.x (3/4)
> >
> > 16/07/26 00:02:50 INFO TaskSetManager: Finished task 3.0 in stage 32.0
> (TID
> > 188) in 3738 ms on x.x.x.x (4/4)
> >
> > 16/07/26 00:02:50 INFO TaskSchedulerImpl: Removed TaskSet 32.0, whose
> tasks
> > have all completed, from pool
> >
> > 16/07/26 00:02:50 INFO DAGScheduler: ResultStage 32 (collectAsMap at
> > DecisionTree.scala:651) finished in 3.738 s
> >
> > 16/07/26 00:02:50 INFO DAGScheduler: Job 19 finished: collectAsMap at
> > DecisionTree.scala:651, took 19.493917 s
> >
> > 16/07/26 00:02:51 INFO MemoryStore: Block broadcast_49 stored as values
> in
> > memory (estimated size 1053.9 KB, free 19.7 MB)
> >
> > 16/07/26 00:02:52 INFO MemoryStore: Block broa

Re: Num of executors and cores

2016-07-26 Thread Jacek Laskowski
Hi,

What's ""? What master URL do you use?

Pozdrawiam,
Jacek Laskowski

https://medium.com/@jaceklaskowski/
Mastering Apache Spark http://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski


On Tue, Jul 26, 2016 at 2:18 AM, Mail.com  wrote:
> Hi All,
>
> I have a directory which has 12 files. I want to read the entire file so I am 
> reading it as wholeTextFiles(dirpath, numPartitions).
>
> I run spark-submit as  --num-executors 12 --executor-cores 1 
> and numPartitions 12.
>
> However, when I run the job I see that the stage which reads the directory 
> has only 8 tasks. So some task reads more than one file and takes twice the 
> time.
>
> What can I do that the files are read by 12 tasks  I.e one file per task.
>
> Thanks,
> Pradeep
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: ORC v/s Parquet for Spark 2.0

2016-07-26 Thread Gourav Sengupta
If you have ever tried to use ORC via SPARK you will know that SPARK's
promise of accessing ORC files is a sham. SPARK cannot access partitioned
tables via HIVEcontext which are ORC, SPARK cannot stripe through ORC
faster and what more, if you are using SQL and have thought of using HIVE
with ORC on TEZ, then it runs way better, faster and leaner than SPARK.

I can process almost a few billion records close to a terabyte in a cluster
with around 100GB RAM and 40 cores in a few hours, and find it a challenge
doing the same with SPARK.

But apparently, everything is resolved in SPARK 2.0.


Regards,
Gourav Sengupta

On Tue, Jul 26, 2016 at 11:50 AM, Ofir Manor  wrote:

> One additional point specific to Spark 2.0 - for the alpha Structured
> Streaming API (only),  the file sink only supports Parquet format (I'm sure
> that limitation will be lifted in a future release before Structured
> Streaming is GA):
>  "File sink - Stores the output to a directory. As of Spark 2.0, this
> only supports Parquet file format, and Append output mode."
>
> http://people.apache.org/~pwendell/spark-releases/spark-2.0.0-rc5-docs/structured-streaming-programming-guide.html#where-to-go-from-here
>
> ​
>


Re: Spark Web UI port 4040 not working

2016-07-26 Thread Jacek Laskowski
Hi,

Go to 8080 and under Running Applications click the Application ID.
You're on the page with Application Detail UI just before Executor
Summary table. Use it to access the web UI.

Pozdrawiam,
Jacek Laskowski

https://medium.com/@jaceklaskowski/
Mastering Apache Spark http://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski


On Tue, Jul 26, 2016 at 3:21 AM, Jestin Ma  wrote:
> Hello, when running spark jobs, I can access the master UI (port 8080 one)
> no problem. However, I'm confused as to how to access the web UI to see
> jobs/tasks/stages/etc.
>
> I can access the master UI at http://:8080. But port 4040 gives
> me a -connection cannot be reached-.
>
> Is the web UI http:// with a port of 4040?
>
> I'm running my Spark job on a cluster machine and submitting it to a master
> node part of the cluster. I heard of ssh tunneling; is that relevant here?
>
> Thank you!

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: DAGScheduler: Job 20 finished: collectAsMap at DecisionTree.scala:651, took 19.556700 s Killed

2016-07-26 Thread Jacek Laskowski
Hi,

What's the cluster manager? Is this YARN perhaps? Do you have any
other apps on the cluster? How do you submit your app? What are the
properties?

Pozdrawiam,
Jacek Laskowski

https://medium.com/@jaceklaskowski/
Mastering Apache Spark http://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski


On Tue, Jul 26, 2016 at 1:27 AM, Ascot Moss  wrote:
> Hi,
>
> spark: 1.6.1
> java: java 1.8_u40
> I tried random forest training phase, the same code works well if with 20
> trees (lower accuracy, about 68%).  When trying the training phase with more
> tree, I set to 200 trees, it returned:
>
> "DAGScheduler: Job 20 finished: collectAsMap at DecisionTree.scala:651, took
> 19.556700 s Killed" .  There is no WARN or ERROR from console, the task is
> just stopped in the end.
>
> Any idea how to resolve it? Should the timeout parameter be set to longer
>
> regards
>
>
> (below is the log from console)
>
> 16/07/26 00:02:47 INFO DAGScheduler: looking for newly runnable stages
>
> 16/07/26 00:02:47 INFO DAGScheduler: running: Set()
>
> 16/07/26 00:02:47 INFO DAGScheduler: waiting: Set(ResultStage 32)
>
> 16/07/26 00:02:47 INFO DAGScheduler: failed: Set()
>
> 16/07/26 00:02:47 INFO DAGScheduler: Submitting ResultStage 32
> (MapPartitionsRDD[75] at map at DecisionTree.scala:642), which has no
> missing parents
>
> 16/07/26 00:02:47 INFO MemoryStore: Block broadcast_48 stored as values in
> memory (estimated size 2.2 MB, free 18.2 MB)
>
> 16/07/26 00:02:47 INFO MemoryStore: Block broadcast_48_piece0 stored as
> bytes in memory (estimated size 436.9 KB, free 18.7 MB)
>
> 16/07/26 00:02:47 INFO BlockManagerInfo: Added broadcast_48_piece0 in memory
> on x.x.x.x:35450 (size: 436.9 KB, free: 45.8 GB)
>
> 16/07/26 00:02:47 INFO SparkContext: Created broadcast 48 from broadcast at
> DAGScheduler.scala:1006
>
> 16/07/26 00:02:47 INFO DAGScheduler: Submitting 4 missing tasks from
> ResultStage 32 (MapPartitionsRDD[75] at map at DecisionTree.scala:642)
>
> 16/07/26 00:02:47 INFO TaskSchedulerImpl: Adding task set 32.0 with 4 tasks
>
> 16/07/26 00:02:47 INFO TaskSetManager: Starting task 0.0 in stage 32.0 (TID
> 185, x.x.x.x, partition 0,NODE_LOCAL, 1956 bytes)
>
> 16/07/26 00:02:47 INFO TaskSetManager: Starting task 1.0 in stage 32.0 (TID
> 186, x.x.x.x, partition 1,NODE_LOCAL, 1956 bytes)
>
> 16/07/26 00:02:47 INFO TaskSetManager: Starting task 2.0 in stage 32.0 (TID
> 187, x.x.x.x, partition 2,NODE_LOCAL, 1956 bytes)
>
> 16/07/26 00:02:47 INFO TaskSetManager: Starting task 3.0 in stage 32.0 (TID
> 188, x.x.x.x, partition 3,NODE_LOCAL, 1956 bytes)
>
> 16/07/26 00:02:47 INFO BlockManagerInfo: Added broadcast_48_piece0 in memory
> on x.x.x.x:58784 (size: 436.9 KB, free: 5.1 GB)
>
> 16/07/26 00:02:47 INFO MapOutputTrackerMasterEndpoint: Asked to send map
> output locations for shuffle 12 to x.x.x.x:44434
>
> 16/07/26 00:02:47 INFO MapOutputTrackerMaster: Size of output statuses for
> shuffle 12 is 180 bytes
>
> 16/07/26 00:02:47 INFO BlockManagerInfo: Added broadcast_48_piece0 in memory
> on x.x.x.x:46186 (size: 436.9 KB, free: 2.2 GB)
>
> 16/07/26 00:02:47 INFO BlockManagerInfo: Added broadcast_48_piece0 in memory
> on x.x.x.x:50132 (size: 436.9 KB, free: 5.0 GB)
>
> 16/07/26 00:02:47 INFO MapOutputTrackerMasterEndpoint: Asked to send map
> output locations for shuffle 12 to x.x.x.x:47272
>
> 16/07/26 00:02:47 INFO MapOutputTrackerMasterEndpoint: Asked to send map
> output locations for shuffle 12 to x.x.x.x:46802
>
> 16/07/26 00:02:49 INFO TaskSetManager: Finished task 2.0 in stage 32.0 (TID
> 187) in 2265 ms on x.x.x.x (1/4)
>
> 16/07/26 00:02:49 INFO TaskSetManager: Finished task 1.0 in stage 32.0 (TID
> 186) in 2266 ms on x.x.x.x (2/4)
>
> 16/07/26 00:02:50 INFO TaskSetManager: Finished task 0.0 in stage 32.0 (TID
> 185) in 2794 ms on x.x.x.x (3/4)
>
> 16/07/26 00:02:50 INFO TaskSetManager: Finished task 3.0 in stage 32.0 (TID
> 188) in 3738 ms on x.x.x.x (4/4)
>
> 16/07/26 00:02:50 INFO TaskSchedulerImpl: Removed TaskSet 32.0, whose tasks
> have all completed, from pool
>
> 16/07/26 00:02:50 INFO DAGScheduler: ResultStage 32 (collectAsMap at
> DecisionTree.scala:651) finished in 3.738 s
>
> 16/07/26 00:02:50 INFO DAGScheduler: Job 19 finished: collectAsMap at
> DecisionTree.scala:651, took 19.493917 s
>
> 16/07/26 00:02:51 INFO MemoryStore: Block broadcast_49 stored as values in
> memory (estimated size 1053.9 KB, free 19.7 MB)
>
> 16/07/26 00:02:52 INFO MemoryStore: Block broadcast_49_piece0 stored as
> bytes in memory (estimated size 626.7 KB, free 20.3 MB)
>
> 16/07/26 00:02:52 INFO BlockManagerInfo: Added broadcast_49_piece0 in memory
> on x.x.x.x:35450 (size: 626.7 KB, free: 45.8 GB)
>
> 16/07/26 00:02:52 INFO SparkContext: Created broadcast 49 from broadcast at
> DecisionTree.scala:601
>
> 16/07/26 00:02:52 INFO SparkContext: Starting job: collectAsMap at
> DecisionTree.scala:651
>
> 16/07/26 00:02:52 INFO DAGScheduler: Registering RDD 76 (mapPartitions at
> DecisionTree.scala:622)
>
> 16/0

FileUtil.fullyDelete does ?

2016-07-26 Thread Divya Gehlot
Hi,
When I am doing the using theFileUtil.copymerge function

val file = "/tmp/primaryTypes.csv"

FileUtil.fullyDelete(new File(file))

 val destinationFile= "/tmp/singlePrimaryTypes.csv"

FileUtil.fullyDelete(new File(destinationFile))

 val counts = partitions.

reduceByKey {case (x,y) => x + y}.

sortBy {case (key, value) => -value}.

map { case (key, value) => Array(key, value).mkString(",") }

 counts.saveAsTextFile(file)

 merge(file, destinationFile)


I am wondering here what does  FileUtil.fullyDelete(new
File(destinationFile)) do ?

  does it delete the merged file If yes,then how will we access
the merged file ..?


Confused here ...



Thanks,

Divya


Re: ORC v/s Parquet for Spark 2.0

2016-07-26 Thread Ofir Manor
One additional point specific to Spark 2.0 - for the alpha Structured
Streaming API (only),  the file sink only supports Parquet format (I'm sure
that limitation will be lifted in a future release before Structured
Streaming is GA):
 "File sink - Stores the output to a directory. As of Spark 2.0, this
only supports Parquet file format, and Append output mode."

http://people.apache.org/~pwendell/spark-releases/spark-2.0.0-rc5-docs/structured-streaming-programming-guide.html#where-to-go-from-here

​


Re: DAGScheduler: Job 20 finished: collectAsMap at DecisionTree.scala:651, took 19.556700 s Killed

2016-07-26 Thread Ascot Moss
any ideas?

On Tuesday, July 26, 2016, Ascot Moss  wrote:

> Hi,
>
> spark: 1.6.1
> java: java 1.8_u40
> I tried random forest training phase, the same code works well if with 20
> trees (lower accuracy, about 68%).  When trying the training phase with
> more tree, I set to 200 trees, it returned:
>
> "DAGScheduler: Job 20 finished: collectAsMap at DecisionTree.scala:651,
> took 19.556700 s Killed" .  There is no WARN or ERROR from console, the
> task is just stopped in the end.
>
> Any idea how to resolve it? Should the timeout parameter be set to longer
>
> regards
>
>
> (below is the log from console)
>
> 16/07/26 00:02:47 INFO DAGScheduler: looking for newly runnable stages
>
> 16/07/26 00:02:47 INFO DAGScheduler: running: Set()
>
> 16/07/26 00:02:47 INFO DAGScheduler: waiting: Set(ResultStage 32)
>
> 16/07/26 00:02:47 INFO DAGScheduler: failed: Set()
>
> 16/07/26 00:02:47 INFO DAGScheduler: Submitting ResultStage 32
> (MapPartitionsRDD[75] at map at DecisionTree.scala:642), which has no
> missing parents
>
> 16/07/26 00:02:47 INFO MemoryStore: Block broadcast_48 stored as values in
> memory (estimated size 2.2 MB, free 18.2 MB)
>
> 16/07/26 00:02:47 INFO MemoryStore: Block broadcast_48_piece0 stored as
> bytes in memory (estimated size 436.9 KB, free 18.7 MB)
>
> 16/07/26 00:02:47 INFO BlockManagerInfo: Added broadcast_48_piece0 in
> memory on x.x.x.x:35450 (size: 436.9 KB, free: 45.8 GB)
>
> 16/07/26 00:02:47 INFO SparkContext: Created broadcast 48 from broadcast
> at DAGScheduler.scala:1006
>
> 16/07/26 00:02:47 INFO DAGScheduler: Submitting 4 missing tasks from
> ResultStage 32 (MapPartitionsRDD[75] at map at DecisionTree.scala:642)
>
> 16/07/26 00:02:47 INFO TaskSchedulerImpl: Adding task set 32.0 with 4 tasks
>
> 16/07/26 00:02:47 INFO TaskSetManager: Starting task 0.0 in stage 32.0
> (TID 185, x.x.x.x, partition 0,NODE_LOCAL, 1956 bytes)
>
> 16/07/26 00:02:47 INFO TaskSetManager: Starting task 1.0 in stage 32.0
> (TID 186, x.x.x.x, partition 1,NODE_LOCAL, 1956 bytes)
>
> 16/07/26 00:02:47 INFO TaskSetManager: Starting task 2.0 in stage 32.0
> (TID 187, x.x.x.x, partition 2,NODE_LOCAL, 1956 bytes)
>
> 16/07/26 00:02:47 INFO TaskSetManager: Starting task 3.0 in stage 32.0
> (TID 188, x.x.x.x, partition 3,NODE_LOCAL, 1956 bytes)
>
> 16/07/26 00:02:47 INFO BlockManagerInfo: Added broadcast_48_piece0 in
> memory on x.x.x.x:58784 (size: 436.9 KB, free: 5.1 GB)
>
> 16/07/26 00:02:47 INFO MapOutputTrackerMasterEndpoint: Asked to send map
> output locations for shuffle 12 to x.x.x.x:44434
>
> 16/07/26 00:02:47 INFO MapOutputTrackerMaster: Size of output statuses for
> shuffle 12 is 180 bytes
>
> 16/07/26 00:02:47 INFO BlockManagerInfo: Added broadcast_48_piece0 in
> memory on x.x.x.x:46186 (size: 436.9 KB, free: 2.2 GB)
>
> 16/07/26 00:02:47 INFO BlockManagerInfo: Added broadcast_48_piece0 in
> memory on x.x.x.x:50132 (size: 436.9 KB, free: 5.0 GB)
>
> 16/07/26 00:02:47 INFO MapOutputTrackerMasterEndpoint: Asked to send map
> output locations for shuffle 12 to x.x.x.x:47272
>
> 16/07/26 00:02:47 INFO MapOutputTrackerMasterEndpoint: Asked to send map
> output locations for shuffle 12 to x.x.x.x:46802
>
> 16/07/26 00:02:49 INFO TaskSetManager: Finished task 2.0 in stage 32.0
> (TID 187) in 2265 ms on x.x.x.x (1/4)
>
> 16/07/26 00:02:49 INFO TaskSetManager: Finished task 1.0 in stage 32.0
> (TID 186) in 2266 ms on x.x.x.x (2/4)
>
> 16/07/26 00:02:50 INFO TaskSetManager: Finished task 0.0 in stage 32.0
> (TID 185) in 2794 ms on x.x.x.x (3/4)
>
> 16/07/26 00:02:50 INFO TaskSetManager: Finished task 3.0 in stage 32.0
> (TID 188) in 3738 ms on x.x.x.x (4/4)
>
> 16/07/26 00:02:50 INFO TaskSchedulerImpl: Removed TaskSet 32.0, whose
> tasks have all completed, from pool
>
> 16/07/26 00:02:50 INFO DAGScheduler: ResultStage 32 (collectAsMap at
> DecisionTree.scala:651) finished in 3.738 s
>
> 16/07/26 00:02:50 INFO DAGScheduler: Job 19 finished: collectAsMap at
> DecisionTree.scala:651, took 19.493917 s
>
> 16/07/26 00:02:51 INFO MemoryStore: Block broadcast_49 stored as values in
> memory (estimated size 1053.9 KB, free 19.7 MB)
>
> 16/07/26 00:02:52 INFO MemoryStore: Block broadcast_49_piece0 stored as
> bytes in memory (estimated size 626.7 KB, free 20.3 MB)
>
> 16/07/26 00:02:52 INFO BlockManagerInfo: Added broadcast_49_piece0 in
> memory on x.x.x.x:35450 (size: 626.7 KB, free: 45.8 GB)
>
> 16/07/26 00:02:52 INFO SparkContext: Created broadcast 49 from broadcast
> at DecisionTree.scala:601
>
> 16/07/26 00:02:52 INFO SparkContext: Starting job: collectAsMap at
> DecisionTree.scala:651
>
> 16/07/26 00:02:52 INFO DAGScheduler: Registering RDD 76 (mapPartitions at
> DecisionTree.scala:622)
>
> 16/07/26 00:02:52 INFO DAGScheduler: Got job 20 (collectAsMap at
> DecisionTree.scala:651) with 4 output partitions
>
> 16/07/26 00:02:52 INFO DAGScheduler: Final stage: ResultStage 34
> (collectAsMap at DecisionTree.scala:651)
>
> 16/07/26 00:02:52 INFO DAGScheduler: Parents of final stage:
> List(ShuffleMapStage 33)
>
> 16/0

Spark streaming lost data when ReceiverTracker writes Blockinfo to hdfs timeout

2016-07-26 Thread Andy Zhao
Hi guys, 

I wrote a spark streaming program which consume 1000 messages from one
topic of Kafka, did some transformation, and wrote the result back to
another topic. But only found 988 messages in the second topic. I checked
log info and confirmed all messages was received by receivers. But I found a
hdfs writing time out message printed from Class BatchedWriteAheadLog. 

I checkout source code and found code like this: 
  
/** Add received block. This event will get written to the write ahead
log (if enabled). */ 
  def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = { 
try { 
  val writeResult = writeToLog(BlockAdditionEvent(receivedBlockInfo)) 
  if (writeResult) { 
synchronized { 
  getReceivedBlockQueue(receivedBlockInfo.streamId) +=
receivedBlockInfo 
} 
logDebug(s"Stream ${receivedBlockInfo.streamId} received " + 
  s"block ${receivedBlockInfo.blockStoreResult.blockId}") 
  } else { 
logDebug(s"Failed to acknowledge stream
${receivedBlockInfo.streamId} receiving " + 
  s"block ${receivedBlockInfo.blockStoreResult.blockId} in the Write
Ahead Log.") 
  } 
  writeResult 
} catch { 
  case NonFatal(e) => 
logError(s"Error adding block $receivedBlockInfo", e) 
false 
} 
  } 


It seems that ReceiverTracker tries to write block info to hdfs, but the
write operation time out, this cause writeToLog function return false, and 
this code "getReceivedBlockQueue(receivedBlockInfo.streamId) +=
receivedBlockInfo" is skipped. so the block info is lost. 

   The spark version I use is 1.6.1 and I did not turn on
spark.streaming.receiver.writeAheadLog.enable. 

   I want to know whether or not this is a designed behaviour. 

Thanks
  




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-lost-data-when-ReceiverTracker-writes-Blockinfo-to-hdfs-timeout-tp27410.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Maintaining order of pair rdd

2016-07-26 Thread Marco Mistroni
Apologies janardhan, i always get confused on this
Ok. so you have a  (key, val) RDD (val is irrelevant here)

then you can do this
val reduced = myRDD.reduceByKey((first, second) => first  ++ second)

val sorted = reduced.sortBy(tpl => tpl._1)

hth



On Tue, Jul 26, 2016 at 3:31 AM, janardhan shetty 
wrote:

> groupBy is a shuffle operation and index is already lost in this process
> if I am not wrong and don't see *sortWith* operation on RDD.
>
> Any suggestions or help ?
>
> On Mon, Jul 25, 2016 at 12:58 AM, Marco Mistroni 
> wrote:
>
>> Hi
>>  after you do a groupBy you should use a sortWith.
>> Basically , a groupBy reduces your structure to (anyone correct me if i m
>> wrong) a RDD[(key,val)], which you can see as a tuple.so you could use
>> sortWith (or sortBy, cannot remember which one) (tpl=> tpl._1)
>> hth
>>
>> On Mon, Jul 25, 2016 at 1:21 AM, janardhan shetty > > wrote:
>>
>>> Thanks Marco. This solved the order problem. Had another question which
>>> is prefix to this.
>>>
>>> As you can see below ID2,ID1 and ID3 are in order and I need to maintain
>>> this index order as well. But when we do groupByKey 
>>> operation(*rdd.distinct.groupByKey().mapValues(v
>>> => v.toArray*))
>>> everything is *jumbled*.
>>> Is there any way we can maintain this order as well ?
>>>
>>> scala> RDD.foreach(println)
>>> (ID2,18159)
>>> (ID1,18159)
>>> (ID3,18159)
>>>
>>> (ID2,18159)
>>> (ID1,18159)
>>> (ID3,18159)
>>>
>>> (ID2,36318)
>>> (ID1,36318)
>>> (ID3,36318)
>>>
>>> (ID2,54477)
>>> (ID1,54477)
>>> (ID3,54477)
>>>
>>> *Jumbled version : *
>>> Array(
>>> (ID1,Array(*18159*, 308703, 72636, 64544, 39244, 107937, *54477*,
>>> 145272, 100079, *36318*, 160992, 817, 89366, 150022, 19622, 44683,
>>> 58866, 162076, 45431, 100136)),
>>> (ID3,Array(100079, 19622, *18159*, 212064, 107937, 44683, 150022,
>>> 39244, 100136, 58866, 72636, 145272, 817, 89366, * 54477*, *36318*,
>>> 308703, 160992, 45431, 162076)),
>>> (ID2,Array(308703, * 54477*, 89366, 39244, 150022, 72636, 817, 58866,
>>> 44683, 19622, 160992, 107937, 100079, 100136, 145272, 64544, *18159*,
>>> 45431, *36318*, 162076))
>>> )
>>>
>>> *Expected output:*
>>> Array(
>>> (ID1,Array(*18159*,*36318*, *54477,...*)),
>>> (ID3,Array(*18159*,*36318*, *54477, ...*)),
>>> (ID2,Array(*18159*,*36318*, *54477, ...*))
>>> )
>>>
>>> As you can see after *groupbyKey* operation is complete item 18519 is
>>> in index 0 for ID1, index 2 for ID3 and index 16 for ID2 where as expected
>>> is index 0
>>>
>>>
>>> On Sun, Jul 24, 2016 at 12:43 PM, Marco Mistroni 
>>> wrote:
>>>
 Hello
  Uhm you have an array containing 3 tuples?
 If all the arrays have same length, you can just zip all of them,
 creatings a list of tuples
 then you can scan the list 5 by 5...?

 so something like

 (Array(0)_2,Array(1)._2,Array(2)._2).zipped.toList

 this will give you a list of tuples of 3 elements containing each items
 from ID1, ID2 and ID3  ... sample below
 res: List((18159,100079,308703), (308703, 19622, 54477), (72636,18159,
 89366)..)

 then you can use a recursive function to compare each element such as

 def iterate(lst:List[(Int, Int, Int)]):T = {
 if (lst.isEmpty): /// return your comparison
 else {
  val splits = lst.splitAt(5)
  // do sometjhing about it using splits._1
  iterate(splits._2)
}

 will this help? or am i still missing something?

 kr












 On 24 Jul 2016 5:52 pm, "janardhan shetty" 
 wrote:

> Array(
> (ID1,Array(18159, 308703, 72636, 64544, 39244, 107937, 54477, 145272,
> 100079, 36318, 160992, 817, 89366, 150022, 19622, 44683, 58866, 162076,
> 45431, 100136)),
> (ID3,Array(100079, 19622, 18159, 212064, 107937, 44683, 150022, 39244,
> 100136, 58866, 72636, 145272, 817, 89366, 54477, 36318, 308703, 160992,
> 45431, 162076)),
> (ID2,Array(308703, 54477, 89366, 39244, 150022, 72636, 817, 58866,
> 44683, 19622, 160992, 107937, 100079, 100136, 145272, 64544, 18159, 45431,
> 36318, 162076))
> )
>
> I need to compare first 5 elements of ID1 with first five element of
> ID3  next first 5 elements of ID1 to ID2. Similarly next 5 elements in 
> that
> order until the end of number of elements.
> Let me know if this helps
>
>
> On Sun, Jul 24, 2016 at 7:45 AM, Marco Mistroni 
> wrote:
>
>> Apologies I misinterpreted could you post two use cases?
>> Kr
>>
>> On 24 Jul 2016 3:41 pm, "janardhan shetty" 
>> wrote:
>>
>>> Marco,
>>>
>>> Thanks for the response. It is indexed order and not ascending or
>>> descending order.
>>> On Jul 24, 2016 7:37 AM, "Marco Mistroni" 
>>> wrote:
>>>
 Use map values to transform to an rdd where values are sorted?
 Hth

 On 24 Jul 2016 6:23 am, "janardhan

unsubscribe

2016-07-26 Thread Uzi Hadad



Re: ORC v/s Parquet for Spark 2.0

2016-07-26 Thread Jörn Franke
I think both are very similar, but with slightly different goals. While they 
work transparently for each Hadoop application you need to enable specific 
support in the application for predicate push down. 
In the end you have to check which application you are using and do some tests 
(with correct predicate push down configuration). Keep in mind that both 
formats work best if they are sorted on filter columns (which is your 
responsibility) and if their optimatizations are correctly configured (min max 
index, bloom filter, compression etc) . 

If you need to ingest sensor data you may want to store it first in hbase and 
then batch process it in large files in Orc or parquet format.

> On 26 Jul 2016, at 04:09, janardhan shetty  wrote:
> 
> Just wondering advantages and disadvantages to convert data into ORC or 
> Parquet. 
> 
> In the documentation of Spark there are numerous examples of Parquet format. 
> 
> Any strong reasons to chose Parquet over ORC file format ?
> 
> Also : current data compression is bzip2
> 
> http://stackoverflow.com/questions/32373460/parquet-vs-orc-vs-orc-with-snappy 
> This seems like biased.


Re: PCA machine learning

2016-07-26 Thread pseudo oduesp
hi ,
i want add somme point
getting the follow tow vectors first on it s  features vectors =
Row(features=SparseVector(765, {0: 3.0, 1: 1.0, 2: 50.0, 3: 16.0, 5:
88021.0, 6: 88021.0, 8: 1.0, 11: 1.0, 12: 200.0, 14: 200.0, 15: 200.0, 16:
200.0, 17: 2.0, 18: 1.0, 25: 1.0, 26: 2.0, 31: 89200.0, 32: 65.0, 33: 1.0,
34: 89020044.0, 35: 1.0, 36: 1.0, 42: 4.0, 43: 24.0, 44: 2274.0, 45: 54.0,
46: 34.0, 47: 44.0, 48: 2654.0, 49: 2934.0, 50: 84.0, 56: 3404.0, 57: 16.0,
59: 1.0, 70: 1.0, 75: 1.0, 76: 1.0, 77: 1.0, 78: 1.0, 79: 1.0, 80: 1.0, 81:
1.0, 82: 1.0, 83: 1.0, 84: 1.0, 85: 1.0, 86: 1.0, 87: 1.0, 88: 1.0, 89:
1.0, 91: 1.0, 92: 1.0, 93: 1.0, 94: 1.0, 95: 1.0, 96: 1.0, 97: 1.0, 98:
1.0, 99: 1.0, 100: 1.0, 102: 1.0, 137: 1.0, 139: 1.0, 141: 1.0, 150: 1.0,
155: 1.0, 158: 1.0, 160: 1.0, 259: 0.61, 260: 0.61, 261: 0.61, 262: 0.61,
263: 1.0, 264: 0.61, 265: 0.61, 266: 0.61, 267: 0.61, 268: 1.0, 269: 0.61,
270: 0.61, 271: 0.61, 272: 0.61, 273: 1.0, 274: 0.61, 275: 0.61, 276: 0.61,
277: 0.61, 278: 1.0, 281: 916.57, 282: 916.57, 283: 916.57, 284: 865.43,
285: 865.43, 286: 865.43, 287: 816.19, 288: 816.19, 289: 816.19, 290:
760.53, 291: 760.53, 292: 760.53, 293: 874.9, 294: 874.9, 295: 874.9, 296:
963.89, 297: 172.9, 298: 73.64, 299: 1.87, 300: 349.53, 301: 109.95, 302:
116.67, 303: 38.59, 304: 68.28, 305: 2.23, 313: 1.0, 314: 1.0, 315: 1.0,
316: 1.0, 317: 1.0, 318: 1.0, 319: 1.0, 320: 1.0, 321: 1.0, 322: 1.0, 323:
109.95, 324: 172.9, 325: 116.67, 326: 38.59, 327: 2.23, 328: 73.64, 329:
1.87, 330: 349.53, 331: 68.28, 332: 180.46, 333: 933.66, 334: 916.57, 335:
1.0, 336: 1.0, 337: 1.0, 338: 1.0, 339: 1.0, 340: 1.0, 341: 1.0, 342: 1.0,
343: 1.0, 344: 166.231, 345: 323.713, 346: 104.988, 347: 104.988, 348:
34.996, 350: 69.992, 352: 61.243, 353: 166.231, 354: 323.713, 355: 104.988,
356: 104.988, 357: 34.996, 359: 69.992, 361: 61.243, 364: 1.0, 365: 1.0,
366: 1.0, 367: 1.0, 368: 1.0, 369: 1.0, 370: 1.0, 371: 1.0, 372: 1.0, 373:
144.5007, 374: 281.3961, 375: 91.2636, 376: 91.2636, 377: 30.4212, 379:
60.8424, 381: 53.2371, 382: 144.5007, 383: 281.3961, 384: 91.2636, 385:
91.2636, 386: 30.4212, 388: 60.8424, 390: 53.2371, 393: 1.0, 394: 1.0, 395:
1.0, 396: 1.0, 397: 1.0, 398: 1.0, 399: 1.0, 400: 1.0, 401: 1.0, 402:
155.0761, 403: 301.9903, 404: 97.9428, 405: 97.9428, 406: 32.6476, 408:
65.2952, 410: 57.1333, 411: 155.0761, 412: 301.9903, 413: 97.9428, 414:
97.9428, 415: 32.6476, 417: 65.2952, 419: 57.1333, 422: 1.0, 423: 1.0, 424:
1.0, 425: 1.0, 426: 1.0, 427: 1.0, 428: 1.0, 429: 1.0, 430: 1.0, 431:
164.4317, 432: 320.2091, 433: 103.8516, 434: 103.8516, 435: 34.6172, 437:
69.2344, 439: 60.5801, 440: 164.4317, 441: 320.2091, 442: 103.8516, 443:
103.8516, 444: 34.6172, 446: 69.2344, 448: 60.5801, 451: 1.0, 452: 1.0,
453: 1.0, 454: 1.0, 455: 1.0, 456: 1.0, 457: 1.0, 458: 1.0, 459: 1.0, 460:
174.1483, 461: 339.1309, 462: 109.9884, 463: 109.9884, 464: 36.6628, 466:
73.3256, 468: 64.1599, 469: 174.1483, 470: 339.1309, 471: 109.9884, 472:
109.9884, 473: 36.6628, 475: 73.3256, 477: 64.1599, 480: 0.0001, 481:
0.0001, 482: 0.0001, 483: 0.0001, 484: 0.0001, 485: 0.0001, 486: 0.0001,
487: 0.0001, 488: 172.9, 489: 172.9, 490: 172.9, 491: 172.9, 492: 283.4426,
493: 283.4426, 494: 283.4426, 495: 283.4426, 504: 73.64, 505: 73.64, 506:
73.64, 507: 73.64, 508: 1207213.1148, 509: 1207213.1148, 510: 1207213.1148,
511: 1207213.1148, 520: 1.87, 521: 1.87, 522: 1.87, 523: 1.87, 524:
30655.7377, 525: 30655.7377, 526: 30655.7377, 527: 30655.7377, 536: 349.53,
537: 349.53, 538: 349.53, 539: 349.53, 540: 573.0, 541: 573.0, 542: 573.0,
543: 573.0, 552: 116.67, 553: 116.67, 554: 116.67, 555: 116.67, 556:
191.2623, 557: 191.2623, 558: 191.2623, 559: 191.2623, 568: 38.59, 569:
38.59, 570: 38.59, 571: 38.59, 572: 38.59, 573: 38.59, 574: 38.59, 575:
38.59, 584: 180.46, 585: 180.46, 586: 180.46, 587: 180.46, 588: 295.8361,
589: 295.8361, 590: 295.8361, 591: 295.8361, 600: 933.66, 601: 933.66, 602:
933.66, 603: 933.66, 604: 1239250.9834, 605: 1239250.9834, 606:
1239250.9834, 607: 1239250.9834, 643: 170.0, 644: 170.0, 646: 170.0, 648:
170.0, 658: 170.0, 662: 170.0, 665: 170.0, 667: 170.0, 758: 0.224, 763:
0.224}),


and second one it's projection on 20 principal component anlaysis  :


pca_features=DenseVector([89036409.0534, 2986242.0691, 227234.8184,
108796.4282, -129553.463, 89983.1029, 223420.7277, 53740.2034,
-113602.7292, -20057.1001, 33872.3162, -759.2689, 410., -872.6325,
-4896.6554, 4060.5014, -786.3297, -951.3851, 68464.2515, 3850.9394,
876.7108, 98.5793, 21342.2015, 863.9765, 1456.3933, -265.2494, 85325.4192,
-3657.0752, 111.7979, -59.6176, -945.8667, -84.1924, 246.233, -636.8786,
-749.1798, 900.8763, -177.4543, -105.4379, 272.7857, -535.0951]))]


when i create  the vector from orginal data frame i had order of my columns
like that i can associete for each  value in feature the name of variable .

how i can  identify names of principal component in second vector ?






2016-07-26 10:39 GMT+02:00 pseudo oduesp :

> Hi,
> when i perform PCA reduction 

PCA machine learning

2016-07-26 Thread pseudo oduesp
Hi,
when i perform PCA reduction dimension i get dense vector with length of
number of principla component  my question :

 -How i get the name of features giving this vectors ?
 -the  values inside vectors result its  value of projection of all
features  on this componenets ?
- how to use it ?

thanks


Re: dataframe.foreach VS dataframe.collect().foreach

2016-07-26 Thread kevin
thank you Chanh

2016-07-26 15:34 GMT+08:00 Chanh Le :

> Hi Ken,
>
> *blacklistDF -> just DataFrame *
> Spark is lazy until you call something like* collect, take, write* it
> will execute the hold process *like you do map or filter before you
> collect*.
> That mean until you call collect spark* do nothing* so you df would not
> have any data -> can’t call foreach.
> Call collect execute the process -> get data -> foreach is ok.
>
>
> On Jul 26, 2016, at 2:30 PM, kevin  wrote:
>
>  blacklistDF.collect()
>
>
>


Re: Spark Web UI port 4040 not working

2016-07-26 Thread Chanh Le
You’re running in StandAlone Mode?
Usually inside active task it will show the address of current job.
or you can check in master node by using netstat -apn | grep 4040



> On Jul 26, 2016, at 8:21 AM, Jestin Ma  wrote:
> 
> Hello, when running spark jobs, I can access the master UI (port 8080 one) no 
> problem. However, I'm confused as to how to access the web UI to see 
> jobs/tasks/stages/etc.
> 
> I can access the master UI at http://:8080. But port 4040 gives 
> me a -connection cannot be reached-.
> 
> Is the web UI http:// with a port of 4040?
> 
> I'm running my Spark job on a cluster machine and submitting it to a master 
> node part of the cluster. I heard of ssh tunneling; is that relevant here?
> 
> Thank you!


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: dataframe.foreach VS dataframe.collect().foreach

2016-07-26 Thread Chanh Le
Hi Ken,

blacklistDF -> just DataFrame 
Spark is lazy until you call something like collect, take, write it will 
execute the hold process like you do map or filter before you collect.
That mean until you call collect spark do nothing so you df would not have any 
data -> can’t call foreach.
Call collect execute the process -> get data -> foreach is ok.


> On Jul 26, 2016, at 2:30 PM, kevin  wrote:
> 
>  blacklistDF.collect()



dataframe.foreach VS dataframe.collect().foreach

2016-07-26 Thread kevin
HI ALL:
I don't quite understand the different between : dataframe.foreach and
dataframe.collect().foreach . When to use dataframe.foreach?

I use spark2.0 ,I want to iterate a dataframe to get one colum's value :

this can work out

 blacklistDF.collect().foreach { x =>
println(s">>>getString(0)" + x.getAs[String]("uid"))
val put = new Put(Bytes.toBytes(x.getAs[String]("uid")));
put.add(Bytes.toBytes("cf"), Bytes.toBytes("uid"),
Bytes.toBytes(x.getAs[String]("uid")))
hrecords.add(put)
  }

if I use blacklistDF.foreach {} I will get nothing