Implementing a custom Spark shell

2014-02-25 Thread Sampo Niskanen
Hi,

I'd like to create a custom version of the Spark shell, which has
automatically defined some other variables / RDDs (in addition to 'sc')
specific to our application.  Is this possible?

I took a look at the code that the spark-shell invokes, and it seems quite
complex.  Can this be reused from my code?


I'm implementing a standalone application that uses the Spark libraries
(managed by SBT).  Ideally, I'd like to be able to launch the shell from
that application, instead of using the default Spark distribution.
 Alternatively, can some utility code be injected within the standard
spark-shell?


Thanks.

*Sampo Niskanen*

*Lead developer / Wellmo*


Re: Need some tutorials and examples about customized partitioner

2014-02-25 Thread Matei Zaharia
Take a look at the “advanced Spark features” talk here too: 
http://ampcamp.berkeley.edu/amp-camp-one-berkeley-2012/.

Matei

On Feb 25, 2014, at 6:22 PM, Tao Xiao  wrote:

> Thank you Mayur, I think that will help me a lot 
> 
> 
> Best,
> Tao
> 
> 
> 2014-02-26 8:56 GMT+08:00 Mayur Rustagi :
> Type of Shuffling is best explained by Matei in Spark Internals . 
> http://www.youtube.com/watch?v=49Hr5xZyTEA#t=2203
> Why dont you look at that & then if you have follow up questions ask here, 
> also would be good to watch this whole talk as it talks about Spark job flows 
> in a lot more detail. 
> 
> SCALA
> import org.apache.spark.RangePartitioner;
> var file=sc.textFile("")
> var partitionedFile=file.map(x=>(x,1))
> var data= partitionedFile.partitionBy(new RangePartitioner(3, 
> partitionedFile))
> data.glom().collect()(0).length
> data.glom().collect()(1).length
> data.glom().collect()(2).length
> This will sample the RDD partitionedFile & then try to partition 
> partitionedFile in almost equal sizes. 
> Do not do collect if your data size is huge as this may OOM the driver, write 
> it to disk in that case. 
> 
> 
> 
> Scala 
> 
> Mayur Rustagi
> Ph: +919632149971
> http://www.sigmoidanalytics.com
> https://twitter.com/mayur_rustagi
> 
> 
> 
> On Tue, Feb 25, 2014 at 1:19 AM, Tao Xiao  wrote:
> I am a newbie to Spark and I need to know how RDD partitioning can be 
> controlled in the process of shuffling. I have googled for examples but 
> haven't found much concrete examples, in contrast with the fact that there 
> are many good tutorials about Hadoop's shuffling and partitioner.
> 
> Can anybody show me good tutorials explaining the process of shuffling in 
> Spark, as well as examples of how to use a customized partitioner.?
> 
> 
> Best,
> Tao
> 
> 



Re: Sharing SparkContext

2014-02-25 Thread Ognen Duzlevski
In that case, I must have misunderstood the following (from 
http://spark.incubator.apache.org/docs/0.8.1/job-scheduling.html). 
Apologies. Ognen


"Inside a given Spark application (SparkContext instance), multiple 
parallel jobs can run simultaneously if they were submitted from 
separate threads. By “job”, in this section, we mean a Spark action 
(e.g.|save|,|collect|) and any tasks that need to run to evaluate that 
action. Spark’s scheduler is fully thread-safe and supports this use 
case to enable applications that serve multiple requests (e.g. queries 
for multiple users).


By default, Spark’s scheduler runs jobs in FIFO fashion. Each job is 
divided into “stages” (e.g. map and reduce phases), and the first job 
gets priority on all available resources while its stages have tasks to 
launch, then the second job gets priority, etc. If the jobs at the head 
of the queue don’t need to use the whole cluster, later jobs can start 
to run right away, but if the jobs at the head of the queue are large, 
then later jobs may be delayed significantly.


Starting in Spark 0.8, it is also possible to configure fair sharing 
between jobs. Under fair sharing, Spark assigns tasks between jobs in a 
“round robin” fashion, so that all jobs get a roughly equal share of 
cluster resources. This means that short jobs submitted while a long job 
is running can start receiving resources right away and still get good 
response times, without waiting for the long job to finish. This mode is 
best for multi-user settings.


To enable the fair scheduler, simply set 
the|spark.scheduler.mode|to|FAIR|before creating a SparkContext:"


On 2/25/14, 12:30 PM, Mayur Rustagi wrote:
fair scheduler merely reorders tasks .. I think he is looking to run 
multiple pieces of code on a single context on demand from 
customers...if the code & order is decided then fair scheduler will 
ensure that all tasks get equal cluster time :)




Mayur Rustagi
Ph: +919632149971 
h ttp://www.sigmoidanalytics.com 


https://twitter.com/mayur_rustagi



On Tue, Feb 25, 2014 at 10:24 AM, Ognen Duzlevski 
mailto:og...@nengoiksvelzud.com>> wrote:


Doesn't the fair scheduler solve this?
Ognen


On 2/25/14, 12:08 PM, abhinav chowdary wrote:

Sorry for not being clear earlier
how do you want to pass the operations to the spark context?
this is partly what i am looking for . How to access the active
spark context and possible ways to pass operations

Thanks



On Tue, Feb 25, 2014 at 10:02 AM, Mayur Rustagi
mailto:mayur.rust...@gmail.com>> wrote:

how do you want to pass the operations to the spark context?


Mayur Rustagi
Ph: +919632149971 
h
ttp://www.sigmoidanalytics.com

https://twitter.com/mayur_rustagi



On Tue, Feb 25, 2014 at 9:59 AM, abhinav chowdary
mailto:abhinav.chowd...@gmail.com>> wrote:

Hi,
   I am looking for ways to share the sparkContext,
meaning i need to be able to perform multiple operations
on the same spark context.

Below is code of a simple app i am testing

 def main(args: Array[String]) {
println("Welcome to example application!")

val sc = new
SparkContext("spark://10.128.228.142:7077
", "Simple App")

println("Spark context created!")

println("Creating RDD!")

Now once this context is created i want to access  this
to submit multiple jobs/operations

Any help is much appreciated

Thanks







-- 
Warm Regards

Abhinav Chowdary







NullPointerException from 'Count' on DStream

2014-02-25 Thread anoldbrain
Dear all,

I encountered NullPointerException running a simple program like below:


> val sparkconf = new SparkConf()
> .setMaster(master)
> .setAppName("myapp")
> // and other setups
> 
> val ssc = new StreamingContext(sparkconf, Seconds(30))
> val flume = new FlumeInputDStream(ssc, flume_sink_ip, flume_sink_port,
> StorageLevel.MEMORY_AND_DISK_SER_2)
> 
> val messages = flume.map(x => {
> val charset = Charset.forName("UTF-8")
> val decoder = charset.newDecoder()
> val msg = decoder.decode(x.event.getBody()).toString()
> msg
> })
> 
> // "messages.count" does not throw NullPointerException
> messages.count.foreachRDD(rdd => {
> ()
> })
> messages.print()

If calling "messages.count" or 'messages.count.map', no exception is thrown.
If using 'messages.count.foreachRDD', NullPointerException is thrown.
Console output snippets:

> 
> 14/02/26 10:36:51 INFO storage.BlockManagerMasterActor$BlockManagerInfo:
> Registering block manager node-005:36924 with 294.4 MB RAM
> 14/02/26 10:37:00 ERROR scheduler.JobScheduler: Error generating jobs for
> time 139338222 ms
> java.lang.NullPointerException
>   at
> org.apache.spark.streaming.dstream.DStream$$anonfun$count$3.apply(DStream.scala:487)
>   at
> org.apache.spark.streaming.dstream.DStream$$anonfun$count$3.apply(DStream.scala:487)
>   at
> org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1.apply(DStream.scala:536)
>   at
> org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1.apply(DStream.scala:536)
>   at
> org.apache.spark.streaming.dstream.DStream$$anonfun$4.apply(DStream.scala:547)
>   at
> org.apache.spark.streaming.dstream.DStream$$anonfun$4.apply(DStream.scala:545)
>   at
> org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:41)
>   at
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:292)
>   at
> org.apache.spark.streaming.dstream.ShuffledDStream.compute(ShuffledDStream.scala:41)
>   at
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:292)
>   at
> org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
>   at
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:292)
>   at
> org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
>   at
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:115)
>   at
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:115)
>   at
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>   at
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>   at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>   at
> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
>   at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
>   at
> org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:115)
>   at
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:161)
>   at
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:161)
>   at scala.util.Try$.apply(Try.scala:161)
>   at
> org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:161)
>   at
> org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:105)
>   at
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$start$1$$anon$1$$anonfun$receive$1.applyOrElse(JobGenerator.scala:70)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>   at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> [error] (run-main-0) java.lang.NullPointerException
> java.lang.NullPointerException
>   at
> org.apache.spark.streaming.dstream.DStream$$anonfun$count$3.apply(DStream.scala:487)
>   at
> org.apache.spark.streaming.dstream.DStream$$anonfun$count$3.apply(DStream.scala:487)
>   at
> org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1.apply(DStream.scala:536)
>   at
> org.apache.spark.streaming.dstream.D

Re: Sharing SparkContext

2014-02-25 Thread Ognen Duzlevski


On 2/25/14, 12:24 PM, Mayur Rustagi wrote:

So there is no way to share context currently,
1. you can try jobserver by Ooyala but I havnt used it & frankly 
nobody has shared feedback on it.


One of the major show stoppers for me is that when compiled with Hadoop 
2.2.0 - Ooyala standalone server from the jobserver branch does not 
work. If you are OK staying with 1.0.4, it does work.


Ognen


Re: [HELP] ask for some information about public data set

2014-02-25 Thread Evan R. Sparks
Hi hyqgod,

This is probably a better question for the spark user's list than the dev
list (cc'ing user and bcc'ing dev on this reply).

To answer your question, though:

Amazon's Public Datasets Page is a nice place to start:
http://aws.amazon.com/datasets/ - these work well with spark because
they're often stored on s3 (which spark can read from natively) and it's
very easy to spin up a spark cluster on EC2 to begin experimenting with the
data.

There's also a pretty good list of (mostly big) datasets that google has
released over the years here:
http://svonava.com/post/62186512058/datasets-released-by-google

- Evan

On Tue, Feb 25, 2014 at 6:33 PM, 黄远强  wrote:

> Hi all:
> I am a freshman in Spark community. i dream of being a expert in the field
> of big data.  But i have no idea where to start after i have gone through
> the published  documents in Spark website and examples in  Spark source
> code.  I want to know if there are some public data set in the internet
> that can be utilized  to learn Spark and test my some new ideas base on
> Spark.
>   Thanks a lot.
>
>
> ---
> Best regards
> hyqgod


Help with building and running examples with GraphX from the REPL

2014-02-25 Thread Soumya Simanta
I'm not able to run the GraphX examples from the Scala REPL. Can anyone
point to the correct documentation that talks about the configuration
and/or how to build GraphX for the REPL ?

Thanks


Re: Need some tutorials and examples about customized partitioner

2014-02-25 Thread Tao Xiao
Thank you Mayur, I think that will help me a lot


Best,
Tao


2014-02-26 8:56 GMT+08:00 Mayur Rustagi :

> Type of Shuffling is best explained by Matei in Spark Internals .
> http://www.youtube.com/watch?v=49Hr5xZyTEA#t=2203
> Why dont you look at that & then if you have follow up questions ask here,
> also would be good to watch this whole talk as it talks about Spark job
> flows in a lot more detail.
>
> SCALA
> import org.apache.spark.RangePartitioner;
> var file=sc.textFile("")
> var partitionedFile=file.map(x=>(x,1))
> var data= partitionedFile.partitionBy(new
> RangePartitioner(3, partitionedFile))
> data.glom().collect()(0).length
> data.glom().collect()(1).length
> data.glom().collect()(2).length
> This will sample the RDD partitionedFile & then try to partition
> partitionedFile in almost equal sizes.
> Do not do collect if your data size is huge as this may OOM the driver,
> write it to disk in that case.
>
>
>
> Scala
>
> Mayur Rustagi
> Ph: +919632149971
> h ttp://www.sigmoidanalytics.com
> https://twitter.com/mayur_rustagi
>
>
>
> On Tue, Feb 25, 2014 at 1:19 AM, Tao Xiao wrote:
>
>> I am a newbie to Spark and I need to know how RDD partitioning can be
>> controlled in the process of shuffling. I have googled for examples but
>> haven't found much concrete examples, in contrast with the fact that there
>> are many good tutorials about Hadoop's shuffling and partitioner.
>>
>> Can anybody show me good tutorials explaining the process of shuffling in
>> Spark, as well as examples of how to use a customized partitioner.?
>>
>>
>> Best,
>> Tao
>>
>
>


Re: Need some tutorials and examples about customized partitioner

2014-02-25 Thread Mayur Rustagi
Type of Shuffling is best explained by Matei in Spark Internals .
http://www.youtube.com/watch?v=49Hr5xZyTEA#t=2203
Why dont you look at that & then if you have follow up questions ask here,
also would be good to watch this whole talk as it talks about Spark job
flows in a lot more detail.

SCALA
import org.apache.spark.RangePartitioner;
var file=sc.textFile("")
var partitionedFile=file.map(x=>(x,1))
var data= partitionedFile.partitionBy(new
RangePartitioner(3, partitionedFile))
data.glom().collect()(0).length
data.glom().collect()(1).length
data.glom().collect()(2).length
This will sample the RDD partitionedFile & then try to partition
partitionedFile in almost equal sizes.
Do not do collect if your data size is huge as this may OOM the driver,
write it to disk in that case.



Scala

Mayur Rustagi
Ph: +919632149971
h ttp://www.sigmoidanalytics.com
https://twitter.com/mayur_rustagi



On Tue, Feb 25, 2014 at 1:19 AM, Tao Xiao  wrote:

> I am a newbie to Spark and I need to know how RDD partitioning can be
> controlled in the process of shuffling. I have googled for examples but
> haven't found much concrete examples, in contrast with the fact that there
> are many good tutorials about Hadoop's shuffling and partitioner.
>
> Can anybody show me good tutorials explaining the process of shuffling in
> Spark, as well as examples of how to use a customized partitioner.?
>
>
> Best,
> Tao
>


Re: How to get well-distribute partition

2014-02-25 Thread Mayur Rustagi
It seems are you are already using parititonBy, you can simply plugin in
your custom function instead of lambda x:x & it should use that to
partition. Range partitioner is available in Scala I am not sure if its
exposed directly in python.
Regards
Mayur

Mayur Rustagi
Ph: +919632149971
h ttp://www.sigmoidanalytics.com
https://twitter.com/mayur_rustagi



On Tue, Feb 25, 2014 at 11:00 AM, Mayur Rustagi wrote:

> okay you caught me on this.. I havnt used python api.
> Lets try
> http://www.cs.berkeley.edu/~pwendell/strataconf/api/pyspark/pyspark.rdd.RDD-class.html#partitionByon
>  the rdd & customize the partitioner instead of hash to a custom function.
> Please update on the list if it works, it seems to be a common problem.
>
> Mayur Rustagi
> Ph: +919632149971
> h ttp://www.sigmoidanalytics.com
> https://twitter.com/mayur_rustagi
>
>
>
> On Mon, Feb 24, 2014 at 9:23 PM, zhaoxw12 
> wrote:
>
>> Thanks for your reply.
>> For some reasons, I have to use python in my program. I can't find the API
>> of RangePartitioner. Could you tell me more details?
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-get-well-distribute-partition-tp2002p2013.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>
>


Re: Sharing SparkContext

2014-02-25 Thread abhinav chowdary
Thank You Mayur

I will try Ooyala job server to begin with. Is there a way to load RDD
created via sparkContext into shark? Only reason i ask is my RDD is being
created from Cassandra (not Hadoop,  we are trying to get shark work with
Cassandra as well, having troubles with it when running in distributed
mode).


On Tue, Feb 25, 2014 at 10:30 AM, Mayur Rustagi wrote:

> fair scheduler merely reorders tasks .. I think he is looking to run
> multiple pieces of code on a single context on demand from customers...if
> the code & order is decided then fair scheduler will ensure that all tasks
> get equal cluster time :)
>
>
>
> Mayur Rustagi
> Ph: +919632149971
> h ttp://www.sigmoidanalytics.com
> https://twitter.com/mayur_rustagi
>
>
>
> On Tue, Feb 25, 2014 at 10:24 AM, Ognen Duzlevski <
> og...@nengoiksvelzud.com> wrote:
>
>>  Doesn't the fair scheduler solve this?
>> Ognen
>>
>>
>> On 2/25/14, 12:08 PM, abhinav chowdary wrote:
>>
>> Sorry for not being clear earlier
>> how do you want to pass the operations to the spark context?
>> this is partly what i am looking for . How to access the active spark
>> context and possible ways to pass operations
>>
>>  Thanks
>>
>>
>>
>>  On Tue, Feb 25, 2014 at 10:02 AM, Mayur Rustagi > > wrote:
>>
>>> how do you want to pass the operations to the spark context?
>>>
>>>
>>>  Mayur Rustagi
>>> Ph: +919632149971
>>> h ttp://www.sigmoidanalytics.com
>>>  https://twitter.com/mayur_rustagi
>>>
>>>
>>>
>>> On Tue, Feb 25, 2014 at 9:59 AM, abhinav chowdary <
>>> abhinav.chowd...@gmail.com> wrote:
>>>
 Hi,
I am looking for ways to share the sparkContext, meaning i need
 to be able to perform multiple operations on the same spark context.

  Below is code of a simple app i am testing

   def main(args: Array[String]) {
 println("Welcome to example application!")

  val sc = new SparkContext("spark://10.128.228.142:7077", "Simple
 App")

  println("Spark context created!")

  println("Creating RDD!")

  Now once this context is created i want to access  this to submit
 multiple jobs/operations

  Any help is much appreciated

  Thanks




>>>
>>
>>
>>  --
>> Warm Regards
>> Abhinav Chowdary
>>
>>
>>
>


-- 
Warm Regards
Abhinav Chowdary


RE: ETL on pyspark

2014-02-25 Thread Adrian Mocanu
Matei, one more follow up,
If you write the stream data to your file by iterating through each RDD? I know 
foreach is not idempotent. Can this rewrite some tuples or RDDs twice?

stream.foreachRDD(rdd=>rdd.foreach({
 tuple=>appendToFileTuple(tuple)
}))

Thanks a lot!
-A
From: Matei Zaharia [mailto:matei.zaha...@gmail.com]
Sent: February-25-14 3:02 PM
To: user@spark.apache.org
Cc: u...@spark.incubator.apache.org
Subject: Re: ETL on pyspark

It will only move a file to the final directory when it's successfully finished 
writing it, so the file shouldn't have any duplicates. Old attempts will just 
be deleted.

Matei

On Feb 25, 2014, at 9:19 AM, Adrian Mocanu 
mailto:amoc...@verticalscope.com>> wrote:


Hi Matei
If Spark crashes while writing the file, after recovery from the failure does 
it continue where it left off or will there be duplicates in the file?

-A
From: Matei Zaharia [mailto:matei.zaha...@gmail.com]
Sent: February-24-14 4:20 PM
To: u...@spark.incubator.apache.org
Subject: Re: ETL on pyspark

collect() means to bring all the data back to the master node, and there might 
just be too much of it for that. How big is your file? If you can't bring it 
back to the master node try saveAsTextFile to write it out to a filesystem (in 
parallel).

Matei

On Feb 24, 2014, at 1:08 PM, Chengi Liu 
mailto:chengi.liu...@gmail.com>> wrote:



Hi,
  A newbie here. I am trying to do etl on spark. Few questions.

I have csv file with header.
1) How do I parse this file (as it has a header..)
2) I was trying to follow the tutorial here: 
http://ampcamp.berkeley.edu/3/exercises/data-exploration-using-spark.html
3) I am trying to do a frequency count..
 rows_key_value = rows.map(lambda x:(x[1],1)).reduceByKey(lambda 
x,y:x+y,1).collect()


After waiting for like few minutes I see this error:
java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Arrays.java:2271)
at java.io.ByteArrayOutputStream.toByteArray(ByteArrayOutputStream.java:178)
at 
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:50)
at 
org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:223)
at 
org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
14/02/24 13:06:45 INFO TaskSetManager: Starting task 13.0:0 as TID 331 on 
executor 2: node07 (PROCESS_LOCAL)
14/02/24 13:06:45 INFO TaskSetManager: Serialized task 13.0:0 as 3809 bytes in 
0 ms


How do i fix this?
Thanks



Re: Size of RDD larger than Size of data on disk

2014-02-25 Thread Matei Zaharia
The problem is that Java objects can take more space than the underlying data, 
but there are options in Spark to store data in serialized form to get around 
this. Take a look at https://spark.incubator.apache.org/docs/latest/tuning.html.

Matei

On Feb 25, 2014, at 12:01 PM, Suraj Satishkumar Sheth  
wrote:

> Hi Mayur,
> Thanks for replying. Is it usually double the size of data on disk?
> I have observed this many times. Storage section of Spark is telling me that 
> 100% of RDD is cached using 97 GB of RAM while the data in HDFS is only 47 GB.
>  
> Thanks and Regards,
> Suraj Sheth
>  
> From: Mayur Rustagi [mailto:mayur.rust...@gmail.com] 
> Sent: Tuesday, February 25, 2014 11:19 PM
> To: user@spark.apache.org
> Cc: u...@spark.incubator.apache.org
> Subject: Re: Size of RDD larger than Size of data on disk
>  
> Spark may take more RAM than reqiured by RDD, can you look at storage section 
> of Spark & see how much space RDD is taking in memory. It may still take more 
> storage than disk as Java objects have some overhead. 
> Consider enabling compression in RDD. 
> 
> Mayur Rustagi
> Ph: +919632149971
> http://www.sigmoidanalytics.com
> https://twitter.com/mayur_rustagi
>  
>  
> 
> On Tue, Feb 25, 2014 at 6:47 AM, Suraj Satishkumar Sheth  
> wrote:
> Hi All,
> I have a folder in HDFS which has files with size of 47GB. I am loading this 
> in Spark as RDD[String] and caching it. The total amount of RAM that Spark 
> uses to cache it is around 97GB. I want to know why Spark is taking up so 
> much of Space for the RDD? Can we reduce the RDD size in Spark and make it 
> similar to it’s size on disk?
>  
> Thanks and Regards,
> Suraj Sheth



RE: Size of RDD larger than Size of data on disk

2014-02-25 Thread Suraj Satishkumar Sheth
Hi Mayur,
Thanks for replying. Is it usually double the size of data on disk?
I have observed this many times. Storage section of Spark is telling me that 
100% of RDD is cached using 97 GB of RAM while the data in HDFS is only 47 GB.

Thanks and Regards,
Suraj Sheth

From: Mayur Rustagi [mailto:mayur.rust...@gmail.com]
Sent: Tuesday, February 25, 2014 11:19 PM
To: user@spark.apache.org
Cc: u...@spark.incubator.apache.org
Subject: Re: Size of RDD larger than Size of data on disk

Spark may take more RAM than reqiured by RDD, can you look at storage section 
of Spark & see how much space RDD is taking in memory. It may still take more 
storage than disk as Java objects have some overhead.
Consider enabling compression in RDD.

Mayur Rustagi
Ph: +919632149971
http://www.sigmoidanalytics.com
https://twitter.com/mayur_rustagi


On Tue, Feb 25, 2014 at 6:47 AM, Suraj Satishkumar Sheth 
mailto:suraj...@adobe.com>> wrote:
Hi All,
I have a folder in HDFS which has files with size of 47GB. I am loading this in 
Spark as RDD[String] and caching it. The total amount of RAM that Spark uses to 
cache it is around 97GB. I want to know why Spark is taking up so much of Space 
for the RDD? Can we reduce the RDD size in Spark and make it similar to it’s 
size on disk?

Thanks and Regards,
Suraj Sheth



Re: ETL on pyspark

2014-02-25 Thread Matei Zaharia
It will only move a file to the final directory when it’s successfully finished 
writing it, so the file shouldn’t have any duplicates. Old attempts will just 
be deleted.

Matei

On Feb 25, 2014, at 9:19 AM, Adrian Mocanu  wrote:

> Hi Matei
> If Spark crashes while writing the file, after recovery from the failure does 
> it continue where it left off or will there be duplicates in the file?
>  
> -A
> From: Matei Zaharia [mailto:matei.zaha...@gmail.com] 
> Sent: February-24-14 4:20 PM
> To: u...@spark.incubator.apache.org
> Subject: Re: ETL on pyspark
>  
> collect() means to bring all the data back to the master node, and there 
> might just be too much of it for that. How big is your file? If you can’t 
> bring it back to the master node try saveAsTextFile to write it out to a 
> filesystem (in parallel).
>  
> Matei
>  
> On Feb 24, 2014, at 1:08 PM, Chengi Liu  wrote:
> 
> 
> Hi,
>   A newbie here. I am trying to do etl on spark. Few questions.
> 
> I have csv file with header.
> 1) How do I parse this file (as it has a header..)
> 2) I was trying to follow the tutorial here: 
> http://ampcamp.berkeley.edu/3/exercises/data-exploration-using-spark.html
> 
> 3) I am trying to do a frequency count.. 
>  rows_key_value = rows.map(lambda x:(x[1],1)).reduceByKey(lambda 
> x,y:x+y,1).collect()
> 
> 
> After waiting for like few minutes I see this error:
> java.lang.OutOfMemoryError: Java heap space
> at java.util.Arrays.copyOf(Arrays.java:2271)
> at 
> java.io.ByteArrayOutputStream.toByteArray(ByteArrayOutputStream.java:178)
> at 
> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:50)
> at 
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:223)
> at 
> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:744)
> 14/02/24 13:06:45 INFO TaskSetManager: Starting task 13.0:0 as TID 331 on 
> executor 2: node07 (PROCESS_LOCAL)
> 14/02/24 13:06:45 INFO TaskSetManager: Serialized task 13.0:0 as 3809 bytes 
> in 0 ms
> 
> 
> How do i fix this?
> Thanks
> 



Re: How to get well-distribute partition

2014-02-25 Thread Mayur Rustagi
okay you caught me on this.. I havnt used python api.
Lets try
http://www.cs.berkeley.edu/~pwendell/strataconf/api/pyspark/pyspark.rdd.RDD-class.html#partitionByon
the rdd & customize the partitioner instead of hash to a custom
function.
Please update on the list if it works, it seems to be a common problem.

Mayur Rustagi
Ph: +919632149971
h ttp://www.sigmoidanalytics.com
https://twitter.com/mayur_rustagi



On Mon, Feb 24, 2014 at 9:23 PM, zhaoxw12 wrote:

> Thanks for your reply.
> For some reasons, I have to use python in my program. I can't find the API
> of RangePartitioner. Could you tell me more details?
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-get-well-distribute-partition-tp2002p2013.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Re: Sharing SparkContext

2014-02-25 Thread Mayur Rustagi
fair scheduler merely reorders tasks .. I think he is looking to run
multiple pieces of code on a single context on demand from customers...if
the code & order is decided then fair scheduler will ensure that all tasks
get equal cluster time :)



Mayur Rustagi
Ph: +919632149971
h ttp://www.sigmoidanalytics.com
https://twitter.com/mayur_rustagi



On Tue, Feb 25, 2014 at 10:24 AM, Ognen Duzlevski
wrote:

>  Doesn't the fair scheduler solve this?
> Ognen
>
>
> On 2/25/14, 12:08 PM, abhinav chowdary wrote:
>
> Sorry for not being clear earlier
> how do you want to pass the operations to the spark context?
> this is partly what i am looking for . How to access the active spark
> context and possible ways to pass operations
>
>  Thanks
>
>
>
>  On Tue, Feb 25, 2014 at 10:02 AM, Mayur Rustagi 
> wrote:
>
>> how do you want to pass the operations to the spark context?
>>
>>
>>  Mayur Rustagi
>> Ph: +919632149971
>> h ttp://www.sigmoidanalytics.com
>>  https://twitter.com/mayur_rustagi
>>
>>
>>
>> On Tue, Feb 25, 2014 at 9:59 AM, abhinav chowdary <
>> abhinav.chowd...@gmail.com> wrote:
>>
>>> Hi,
>>>I am looking for ways to share the sparkContext, meaning i need
>>> to be able to perform multiple operations on the same spark context.
>>>
>>>  Below is code of a simple app i am testing
>>>
>>>   def main(args: Array[String]) {
>>> println("Welcome to example application!")
>>>
>>>  val sc = new SparkContext("spark://10.128.228.142:7077", "Simple
>>> App")
>>>
>>>  println("Spark context created!")
>>>
>>>  println("Creating RDD!")
>>>
>>>  Now once this context is created i want to access  this to submit
>>> multiple jobs/operations
>>>
>>>  Any help is much appreciated
>>>
>>>  Thanks
>>>
>>>
>>>
>>>
>>
>
>
>  --
> Warm Regards
> Abhinav Chowdary
>
>
>


Re: Sharing SparkContext

2014-02-25 Thread Ognen Duzlevski

Doesn't the fair scheduler solve this?
Ognen

On 2/25/14, 12:08 PM, abhinav chowdary wrote:

Sorry for not being clear earlier
how do you want to pass the operations to the spark context?
this is partly what i am looking for . How to access the active spark 
context and possible ways to pass operations


Thanks



On Tue, Feb 25, 2014 at 10:02 AM, Mayur Rustagi 
mailto:mayur.rust...@gmail.com>> wrote:


how do you want to pass the operations to the spark context?


Mayur Rustagi
Ph: +919632149971 
h
ttp://www.sigmoidanalytics.com

https://twitter.com/mayur_rustagi



On Tue, Feb 25, 2014 at 9:59 AM, abhinav chowdary
mailto:abhinav.chowd...@gmail.com>>
wrote:

Hi,
   I am looking for ways to share the sparkContext,
meaning i need to be able to perform multiple operations on
the same spark context.

Below is code of a simple app i am testing

 def main(args: Array[String]) {
println("Welcome to example application!")

val sc = new SparkContext("spark://10.128.228.142:7077
", "Simple App")

println("Spark context created!")

println("Creating RDD!")

Now once this context is created i want to access  this to
submit multiple jobs/operations

Any help is much appreciated

Thanks







--
Warm Regards
Abhinav Chowdary




Re: Sharing SparkContext

2014-02-25 Thread Mayur Rustagi
So there is no way to share context currently,
1. you can try jobserver by Ooyala but I havnt used it & frankly nobody has
shared feedback on it.
2. If you can load that rdd to Shark then you get a sql interface on that
RDD + columnar storage
3. You can try a crude method of starting a spark shell & passing commands
to it after receiving them through html interface etc.. but you'll have to
do the hard work of managing concurrency.
I was wondering about the usecase, are you looking to pass the spark
closure on rdd & transforming it each time or looking to avoid caching RDD
again & again.





Mayur Rustagi
Ph: +919632149971
h ttp://www.sigmoidanalytics.com
https://twitter.com/mayur_rustagi



On Tue, Feb 25, 2014 at 10:08 AM, abhinav chowdary <
abhinav.chowd...@gmail.com> wrote:

> Sorry for not being clear earlier
>
> how do you want to pass the operations to the spark context?
> this is partly what i am looking for . How to access the active spark
> context and possible ways to pass operations
>
> Thanks
>
>
>
> On Tue, Feb 25, 2014 at 10:02 AM, Mayur Rustagi 
> wrote:
>
>> how do you want to pass the operations to the spark context?
>>
>>
>> Mayur Rustagi
>> Ph: +919632149971
>> h ttp://www.sigmoidanalytics.com
>> https://twitter.com/mayur_rustagi
>>
>>
>>
>> On Tue, Feb 25, 2014 at 9:59 AM, abhinav chowdary <
>> abhinav.chowd...@gmail.com> wrote:
>>
>>> Hi,
>>>I am looking for ways to share the sparkContext, meaning i need
>>> to be able to perform multiple operations on the same spark context.
>>>
>>> Below is code of a simple app i am testing
>>>
>>>  def main(args: Array[String]) {
>>> println("Welcome to example application!")
>>>
>>> val sc = new SparkContext("spark://10.128.228.142:7077", "Simple
>>> App")
>>>
>>> println("Spark context created!")
>>>
>>> println("Creating RDD!")
>>>
>>> Now once this context is created i want to access  this to submit
>>> multiple jobs/operations
>>>
>>> Any help is much appreciated
>>>
>>> Thanks
>>>
>>>
>>>
>>>
>>
>
>
> --
> Warm Regards
> Abhinav Chowdary
>


Re: Sharing SparkContext

2014-02-25 Thread abhinav chowdary
Sorry for not being clear earlier
how do you want to pass the operations to the spark context?
this is partly what i am looking for . How to access the active spark
context and possible ways to pass operations

Thanks



On Tue, Feb 25, 2014 at 10:02 AM, Mayur Rustagi wrote:

> how do you want to pass the operations to the spark context?
>
>
> Mayur Rustagi
> Ph: +919632149971
> h ttp://www.sigmoidanalytics.com
> https://twitter.com/mayur_rustagi
>
>
>
> On Tue, Feb 25, 2014 at 9:59 AM, abhinav chowdary <
> abhinav.chowd...@gmail.com> wrote:
>
>> Hi,
>>I am looking for ways to share the sparkContext, meaning i need to
>> be able to perform multiple operations on the same spark context.
>>
>> Below is code of a simple app i am testing
>>
>>  def main(args: Array[String]) {
>> println("Welcome to example application!")
>>
>> val sc = new SparkContext("spark://10.128.228.142:7077", "Simple
>> App")
>>
>> println("Spark context created!")
>>
>> println("Creating RDD!")
>>
>> Now once this context is created i want to access  this to submit
>> multiple jobs/operations
>>
>> Any help is much appreciated
>>
>> Thanks
>>
>>
>>
>>
>


-- 
Warm Regards
Abhinav Chowdary


Re: Sharing SparkContext

2014-02-25 Thread Mayur Rustagi
how do you want to pass the operations to the spark context?


Mayur Rustagi
Ph: +919632149971
h ttp://www.sigmoidanalytics.com
https://twitter.com/mayur_rustagi



On Tue, Feb 25, 2014 at 9:59 AM, abhinav chowdary <
abhinav.chowd...@gmail.com> wrote:

> Hi,
>I am looking for ways to share the sparkContext, meaning i need to
> be able to perform multiple operations on the same spark context.
>
> Below is code of a simple app i am testing
>
>  def main(args: Array[String]) {
> println("Welcome to example application!")
>
> val sc = new SparkContext("spark://10.128.228.142:7077", "Simple App")
>
> println("Spark context created!")
>
> println("Creating RDD!")
>
> Now once this context is created i want to access  this to submit multiple
> jobs/operations
>
> Any help is much appreciated
>
> Thanks
>
>
>
>


Sharing SparkContext

2014-02-25 Thread abhinav chowdary
Hi,
   I am looking for ways to share the sparkContext, meaning i need to
be able to perform multiple operations on the same spark context.

Below is code of a simple app i am testing

 def main(args: Array[String]) {
println("Welcome to example application!")

val sc = new SparkContext("spark://10.128.228.142:7077", "Simple App")

println("Spark context created!")

println("Creating RDD!")

Now once this context is created i want to access  this to submit multiple
jobs/operations

Any help is much appreciated

Thanks


Re: Size of RDD larger than Size of data on disk

2014-02-25 Thread Mayur Rustagi
Spark may take more RAM than reqiured by RDD, can you look at storage
section of Spark & see how much space RDD is taking in memory. It may still
take more storage than disk as Java objects have some overhead.
Consider enabling compression in RDD.

Mayur Rustagi
Ph: +919632149971
h ttp://www.sigmoidanalytics.com
https://twitter.com/mayur_rustagi



On Tue, Feb 25, 2014 at 6:47 AM, Suraj Satishkumar Sheth  wrote:

>  Hi All,
>
> I have a folder in HDFS which has files with size of 47GB. I am loading
> this in Spark as RDD[String] and caching it. The total amount of RAM that
> Spark uses to cache it is around 97GB. I want to know why Spark is taking
> up so much of Space for the RDD? Can we reduce the RDD size in Spark and
> make it similar to it’s size on disk?
>
>
>
> Thanks and Regards,
>
> Suraj Sheth
>


Re: HBase row count

2014-02-25 Thread Soumitra Kumar
Found the issue, actually splits in HBase was not uniform, so one job was
taking 90% of time.

BTW, is there a way to save the details available port 4040 after job is
finished?


On Tue, Feb 25, 2014 at 7:26 AM, Nick Pentreath wrote:

> It's tricky really since you may not know upfront how much data is in
> there. You could possibly take a look at how much data is in the HBase
> tables to get an idea.
>
> It may take a bit of trial and error, like running out of memory trying to
> cache the dataset, and checking the Spark UI on port 4040 to see how much
> is cached and how much memory still remains available, etc etc. You should
> also take a look at http://spark.apache.org/docs/latest/tuning.html for
> ideas around memory and serialization tuning.
>
> Broadly speaking, what you want to try to do is filter as much data as
> possible first and cache the subset of data on which you'll be performing
> multiple passes or computations. For example, based on your code above, you
> may in fact only wish to cache the data that is the "interesting" fields
> RDD. It all depends on what you're trying to achieve.
>
> If you will only be doing one pass through the data anyway (like running a
> count every time on the full dataset) then caching is not going to help you.
>
>
> On Tue, Feb 25, 2014 at 4:59 PM, Soumitra Kumar 
> wrote:
>
>> Thanks Nick.
>>
>> How do I figure out if the RDD fits in memory?
>>
>>
>> On Tue, Feb 25, 2014 at 1:04 AM, Nick Pentreath > > wrote:
>>
>>> cache only caches the data on the first action (count) - the first time
>>> it still needs to read the data from the source. So the first time you call
>>> count it will take the same amount of time whether cache is enabled or not.
>>> The second time you call count on a cached RDD, you should see that it
>>> takes a lot less time (assuming that the data fit in memory).
>>>
>>>
>>> On Tue, Feb 25, 2014 at 9:38 AM, Soumitra Kumar <
>>> kumar.soumi...@gmail.com> wrote:
>>>
 I did try with 'hBaseRDD.cache()', but don't see any improvement.

 My expectation is that with cache enabled, there should not be any
 penalty of 'hBaseRDD.count' call.



 On Mon, Feb 24, 2014 at 11:29 PM, Nick Pentreath <
 nick.pentre...@gmail.com> wrote:

> Yes, you''re initiating a scan for each count call. The normal way to
> improve this would be to use cache(), which is what you have in your
> commented out line:
> // hBaseRDD.cache()
>
> If you uncomment that line, you should see an improvement overall.
>
> If caching is not an option for some reason (maybe data is too large),
> then you can implement an overall count in your readFields method using
> accumulators:
>
> val count = sc.accumulator(0L)
> ...
> In your flatMap function do count += 1 for each row (regardless of
> whether "interesting" or not).
>
> In your main method after doing an action (e.g. count in your case),
> call val totalCount = count.value.
>
>
>
>
> On Tue, Feb 25, 2014 at 9:15 AM, Soumitra Kumar <
> kumar.soumi...@gmail.com> wrote:
>
>> I have a code which reads an HBase table, and counts number of rows
>> containing a field.
>>
>> def readFields(rdd : RDD[(ImmutableBytesWritable, Result)]) :
>> RDD[List[Array[Byte]]] = {
>> return rdd.flatMap(kv => {
>> // Set of interesting keys for this use case
>> val keys = List ("src")
>> var data = List[Array[Byte]]()
>> var usefulRow = false
>>
>> val cf = Bytes.toBytes ("cf")
>> keys.foreach {key =>
>> val col = kv._2.getValue(cf, Bytes.toBytes(key))
>> if (col != null)
>> usefulRow = true
>> data = data :+ col
>> }
>>
>> if (usefulRow)
>> Some(data)
>> else
>> None
>> })
>> }
>>
>> def main(args: Array[String]) {
>> val hBaseRDD = init(args)
>> // hBaseRDD.cache()
>>
>> println(" Initial row count " + hBaseRDD.count())
>> println(" Rows with interesting fields " +
>> readFields(hBaseRDD).count())
>>   }
>>
>>
>> I am running on a one mode CDH installation.
>>
>> As it is it takes around 2.5 minutes. But if I comment out
>> 'println(" Initial row count " + hBaseRDD.count())', it takes around
>> 1.5 minutes.
>>
>> Is it doing HBase scan twice, for both 'count' calls? How do I
>> improve it?
>>
>> Thanks,
>> -Soumitra.
>>
>
>

>>>
>>
>


RE: ETL on pyspark

2014-02-25 Thread Adrian Mocanu
Hi Matei
If Spark crashes while writing the file, after recovery from the failure does 
it continue where it left off or will there be duplicates in the file?

-A
From: Matei Zaharia [mailto:matei.zaha...@gmail.com]
Sent: February-24-14 4:20 PM
To: u...@spark.incubator.apache.org
Subject: Re: ETL on pyspark

collect() means to bring all the data back to the master node, and there might 
just be too much of it for that. How big is your file? If you can't bring it 
back to the master node try saveAsTextFile to write it out to a filesystem (in 
parallel).

Matei

On Feb 24, 2014, at 1:08 PM, Chengi Liu 
mailto:chengi.liu...@gmail.com>> wrote:


Hi,
  A newbie here. I am trying to do etl on spark. Few questions.

I have csv file with header.
1) How do I parse this file (as it has a header..)
2) I was trying to follow the tutorial here: 
http://ampcamp.berkeley.edu/3/exercises/data-exploration-using-spark.html
3) I am trying to do a frequency count..
 rows_key_value = rows.map(lambda x:(x[1],1)).reduceByKey(lambda 
x,y:x+y,1).collect()

After waiting for like few minutes I see this error:
java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Arrays.java:2271)
at java.io.ByteArrayOutputStream.toByteArray(ByteArrayOutputStream.java:178)
at 
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:50)
at 
org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:223)
at 
org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
14/02/24 13:06:45 INFO TaskSetManager: Starting task 13.0:0 as TID 331 on 
executor 2: node07 (PROCESS_LOCAL)
14/02/24 13:06:45 INFO TaskSetManager: Serialized task 13.0:0 as 3809 bytes in 
0 ms

How do i fix this?
Thanks




Kryo serialization does not compress

2014-02-25 Thread pradeeps8
Hi All,

We are currently trying to benchmark the various cache options on RDDs with
respect to speed and efficiency.
The data that we are using is mostly filled with numbers (floating point).

We have noticed that the memory consumption of the RDD for MEMORY_ONLY
(519.1 MB) and MEMORY_ONLY_SER (511.5 MB) 

which uses Kryo serialization.
Both consumes almost equivalent storage (519.1 MB vs 511.5 MB respectively).

Is this behavior expected?
Because we were under the impression that kryo serialization is efficient
and were expecting it to compress further.

Also,we have noticed that when we enable compression(LZ4) on RDDs, the
memory consumption of the RDD for MEMORY_ONLY 

with compression is same as without compression i.e. 519.1 MB. 
But for MEMORY_ONLY_SER (kryo serialization) with compression consumes only
386.5 MB.

Why isn't enabling compression without serialization working for
MEMORY_ONLY?
Is there anything else we need to do for MEMORY_ONLY to get it compressed?

Thanks,
Pradeep



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Kryo-serialization-does-not-compress-tp2042.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Spark in YARN HDP problem

2014-02-25 Thread aecc
Hi, I'm trying to run the Spark examples in YARN and I get the following
error:

appDiagnostics: Application application_1390483691679_0124 failed 2 times
due to AM Container for appattempt_1390483691679_0124_02 exited with 
exitCode: 1 due to: Exception from container-launch: 
org.apache.hadoop.util.Shell$ExitCodeException: 
at org.apache.hadoop.util.Shell.runCommand(Shell.java:464)
at org.apache.hadoop.util.Shell.run(Shell.java:379)
at
org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:589)
at
org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:195)
at
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:283)
at
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:79)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:724)

This is the command to run:
SPARK_JAR=/opt/spark/assembly/target/spark-assembly_2.10-0.9.0-incubating.jar
/opt/spark/bin/spark-class org.apache.spark.deploy.yarn.Client --jar
/opt/spark/examples/target/scala-2.10/spark-examples_2.10-assembly-0.9.0-incubating.jar
--class org.apache.spark.examples.SparkPi --args yarn-standalone

And my spark-env.sh file:
export SCALA_HOME=/opt/scala
export SPARK_LIBRARY_PATH="/usr/lib/hadoop/lib/native:$SPARK_LIBRARY_PATH"
export HADOOP_CONF_DIR="/etc/hadoop/conf/"

Am I doing something wrong?




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-in-YARN-HDP-problem-tp2041.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Akka Connection refused - standalone cluster using spark-0.9.0

2014-02-25 Thread Akhil Das
Hi Rui,

If you are getting a "Connection refused" exception, You can resolve it by
checking

*=> Master is running on the specific host*


   -  *netstat -at | grep 7077*

You will get something similar to:


   - *tcp0  0 akhldz.master.io:7077
    *:* LISTEN *


If that is the case, then from your worker machine do a


   - *host akhldz.master.io  *( replace
   akhldz.master.io with your master host. If something goes wrong, then
   add a host entry in your /etc/hosts file)
   - *telnet akhldz.master.io  7077 *( If this is
   not connecting, then your worker wont connect either. )


*=> Adding Host entry in /etc/hosts*


Open /etc/hosts from your worker machine and add the following entry
(example)

*192.168.100.20   akhldz.master.io *


*PS :In the above case Pillis was having two ip addresses having same host
name*

eg:
192.168.100.40  s1.machine.org 
192.168.100.41  s1.machine.org 


Hope that help, Please do post your stack trace if that doesn't solve your
problem.



On Tue, Feb 25, 2014 at 7:33 PM, Li, Rui  wrote:

>  Hi Pillis,
>
>
>
> I met with the same problem here. Could you share how you solved the issue
> more specifically?
>
> I added an entry in /etc/hosts, but it doesn't help.
>
>
>
> *From:* Pillis W [mailto:pillis.w...@gmail.com]
> *Sent:* Sunday, February 09, 2014 4:49 AM
> *To:* u...@spark.incubator.apache.org
> *Subject:* Re: Akka Connection refused - standalone cluster using
> spark-0.9.0
>
>
>
> I fixed my issue - two IP addresses had the same hostname.
>
> Regards
>
>
>
>
>
>
>
> On Fri, Feb 7, 2014 at 12:59 PM, Soumya Simanta 
> wrote:
>
> I see similar logs but only when I try to run a standalone Scala program.
> The whole setup works just fine if I'm using the spark-shell/REPL.
>
>
>
>
>
>
>
> On Fri, Feb 7, 2014 at 3:05 PM, mohankreddy 
> wrote:
>
> Here's more information. I have the master up but when I try to get the
> workers up I am getting the following error.
>
>
> log4j:WARN No appenders could be found for logger
> (akka.event.slf4j.Slf4jLogger).
> log4j:WARN Please initialize the log4j system properly.
> log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for
> more info.
>
> 14/02/07 15:01:17 INFO Worker: Using Spark's default log4j profile:
> org/apache/spark/log4j-defaults.properties
> 14/02/07 15:01:17 INFO Worker: Starting Spark worker yyy:58020 with 16
> cores, 67.0 GB RAM
> 14/02/07 15:01:17 INFO Worker: Spark home: /opt/spark
> 14/02/07 15:01:17 INFO WorkerWebUI: Started Worker web UI at
> http://y:8081
> 14/02/07 15:01:17 INFO Worker: Connecting to master spark://x/:7077...
> 14/02/07 15:01:17 INFO RemoteActorRefProvider$RemoteDeadLetterActorRef:
> Message [org.apache.spark.deploy.DeployMessages$RegisterWorker] from
> Actor[akka://sparkWorker/user/Worker#2037095035] to
>
> Actor[akka://sparkWorker/deadLetters] was not delivered. [1] dead letters
> encountered. This logging can be turned off or adjusted with configuration
> settings 'akka.log-dead-letters' and
> 'akka.log-dead-letters-during-shutdown'.
>
> 14/02/07 15:01:37 INFO Worker: Connecting to master spark://x/:7077...
> 14/02/07 15:01:37 INFO RemoteActorRefProvider$RemoteDeadLetterActorRef:
> Message [org.apache.spark.deploy.DeployMessages$RegisterWorker] from
> Actor[akka://sparkWorker/user/Worker#2037095035] to
> Actor[akka://sparkWorker/deadLetters] was not delivered. [2] dead letters
>
> encountered. This logging can be turned off or adjusted with configuration
> settings 'akka.log-dead-letters' and
> 'akka.log-dead-letters-during-shutdown'.
>
> 14/02/07 15:01:57 INFO Worker: Connecting to master spark:///:7077...
> 14/02/07 15:01:57 INFO RemoteActorRefProvider$RemoteDeadLetterActorRef:
> Message [org.apache.spark.deploy.DeployMessages$RegisterWorker] from
> Actor[akka://sparkWorker/user/Worker#2037095035] to
> Actor[akka://sparkWorker/deadLetters] was not delivered. [3] dead letters
>
> encountered. This logging can be turned off or adjusted with configuration
> settings 'akka.log-dead-letters' and
> 'akka.log-dead-letters-during-shutdown'.
>
> 14/02/07 15:02:17 ERROR Worker: All masters are unresponsive! Giving up.
>
>
>
> PS: I masked the IPs
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Akka-Connection-refused-standalone-cluster-using-spark-0-9-0-tp1297p1311.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
>
>
>
>



-- 
Thanks
Best Regards


Re: HBase row count

2014-02-25 Thread Nick Pentreath
It's tricky really since you may not know upfront how much data is in
there. You could possibly take a look at how much data is in the HBase
tables to get an idea.

It may take a bit of trial and error, like running out of memory trying to
cache the dataset, and checking the Spark UI on port 4040 to see how much
is cached and how much memory still remains available, etc etc. You should
also take a look at http://spark.apache.org/docs/latest/tuning.html for
ideas around memory and serialization tuning.

Broadly speaking, what you want to try to do is filter as much data as
possible first and cache the subset of data on which you'll be performing
multiple passes or computations. For example, based on your code above, you
may in fact only wish to cache the data that is the "interesting" fields
RDD. It all depends on what you're trying to achieve.

If you will only be doing one pass through the data anyway (like running a
count every time on the full dataset) then caching is not going to help you.


On Tue, Feb 25, 2014 at 4:59 PM, Soumitra Kumar wrote:

> Thanks Nick.
>
> How do I figure out if the RDD fits in memory?
>
>
> On Tue, Feb 25, 2014 at 1:04 AM, Nick Pentreath 
> wrote:
>
>> cache only caches the data on the first action (count) - the first time
>> it still needs to read the data from the source. So the first time you call
>> count it will take the same amount of time whether cache is enabled or not.
>> The second time you call count on a cached RDD, you should see that it
>> takes a lot less time (assuming that the data fit in memory).
>>
>>
>> On Tue, Feb 25, 2014 at 9:38 AM, Soumitra Kumar > > wrote:
>>
>>> I did try with 'hBaseRDD.cache()', but don't see any improvement.
>>>
>>> My expectation is that with cache enabled, there should not be any
>>> penalty of 'hBaseRDD.count' call.
>>>
>>>
>>>
>>> On Mon, Feb 24, 2014 at 11:29 PM, Nick Pentreath <
>>> nick.pentre...@gmail.com> wrote:
>>>
 Yes, you''re initiating a scan for each count call. The normal way to
 improve this would be to use cache(), which is what you have in your
 commented out line:
 // hBaseRDD.cache()

 If you uncomment that line, you should see an improvement overall.

 If caching is not an option for some reason (maybe data is too large),
 then you can implement an overall count in your readFields method using
 accumulators:

 val count = sc.accumulator(0L)
 ...
 In your flatMap function do count += 1 for each row (regardless of
 whether "interesting" or not).

 In your main method after doing an action (e.g. count in your case),
 call val totalCount = count.value.




 On Tue, Feb 25, 2014 at 9:15 AM, Soumitra Kumar <
 kumar.soumi...@gmail.com> wrote:

> I have a code which reads an HBase table, and counts number of rows
> containing a field.
>
> def readFields(rdd : RDD[(ImmutableBytesWritable, Result)]) :
> RDD[List[Array[Byte]]] = {
> return rdd.flatMap(kv => {
> // Set of interesting keys for this use case
> val keys = List ("src")
> var data = List[Array[Byte]]()
> var usefulRow = false
>
> val cf = Bytes.toBytes ("cf")
> keys.foreach {key =>
> val col = kv._2.getValue(cf, Bytes.toBytes(key))
> if (col != null)
> usefulRow = true
> data = data :+ col
> }
>
> if (usefulRow)
> Some(data)
> else
> None
> })
> }
>
> def main(args: Array[String]) {
> val hBaseRDD = init(args)
> // hBaseRDD.cache()
>
> println(" Initial row count " + hBaseRDD.count())
> println(" Rows with interesting fields " +
> readFields(hBaseRDD).count())
>   }
>
>
> I am running on a one mode CDH installation.
>
> As it is it takes around 2.5 minutes. But if I comment out
> 'println(" Initial row count " + hBaseRDD.count())', it takes around
> 1.5 minutes.
>
> Is it doing HBase scan twice, for both 'count' calls? How do I improve
> it?
>
> Thanks,
> -Soumitra.
>


>>>
>>
>


Re: HBase row count

2014-02-25 Thread Soumitra Kumar
Thanks Nick.

How do I figure out if the RDD fits in memory?


On Tue, Feb 25, 2014 at 1:04 AM, Nick Pentreath wrote:

> cache only caches the data on the first action (count) - the first time it
> still needs to read the data from the source. So the first time you call
> count it will take the same amount of time whether cache is enabled or not.
> The second time you call count on a cached RDD, you should see that it
> takes a lot less time (assuming that the data fit in memory).
>
>
> On Tue, Feb 25, 2014 at 9:38 AM, Soumitra Kumar 
> wrote:
>
>> I did try with 'hBaseRDD.cache()', but don't see any improvement.
>>
>> My expectation is that with cache enabled, there should not be any
>> penalty of 'hBaseRDD.count' call.
>>
>>
>>
>> On Mon, Feb 24, 2014 at 11:29 PM, Nick Pentreath <
>> nick.pentre...@gmail.com> wrote:
>>
>>> Yes, you''re initiating a scan for each count call. The normal way to
>>> improve this would be to use cache(), which is what you have in your
>>> commented out line:
>>> // hBaseRDD.cache()
>>>
>>> If you uncomment that line, you should see an improvement overall.
>>>
>>> If caching is not an option for some reason (maybe data is too large),
>>> then you can implement an overall count in your readFields method using
>>> accumulators:
>>>
>>> val count = sc.accumulator(0L)
>>> ...
>>> In your flatMap function do count += 1 for each row (regardless of
>>> whether "interesting" or not).
>>>
>>> In your main method after doing an action (e.g. count in your case),
>>> call val totalCount = count.value.
>>>
>>>
>>>
>>>
>>> On Tue, Feb 25, 2014 at 9:15 AM, Soumitra Kumar <
>>> kumar.soumi...@gmail.com> wrote:
>>>
 I have a code which reads an HBase table, and counts number of rows
 containing a field.

 def readFields(rdd : RDD[(ImmutableBytesWritable, Result)]) :
 RDD[List[Array[Byte]]] = {
 return rdd.flatMap(kv => {
 // Set of interesting keys for this use case
 val keys = List ("src")
 var data = List[Array[Byte]]()
 var usefulRow = false

 val cf = Bytes.toBytes ("cf")
 keys.foreach {key =>
 val col = kv._2.getValue(cf, Bytes.toBytes(key))
 if (col != null)
 usefulRow = true
 data = data :+ col
 }

 if (usefulRow)
 Some(data)
 else
 None
 })
 }

 def main(args: Array[String]) {
 val hBaseRDD = init(args)
 // hBaseRDD.cache()

 println(" Initial row count " + hBaseRDD.count())
 println(" Rows with interesting fields " +
 readFields(hBaseRDD).count())
   }


 I am running on a one mode CDH installation.

 As it is it takes around 2.5 minutes. But if I comment out
 'println(" Initial row count " + hBaseRDD.count())', it takes around
 1.5 minutes.

 Is it doing HBase scan twice, for both 'count' calls? How do I improve
 it?

 Thanks,
 -Soumitra.

>>>
>>>
>>
>


Size of RDD larger than Size of data on disk

2014-02-25 Thread Suraj Satishkumar Sheth
Hi All,
I have a folder in HDFS which has files with size of 47GB. I am loading this in 
Spark as RDD[String] and caching it. The total amount of RAM that Spark uses to 
cache it is around 97GB. I want to know why Spark is taking up so much of Space 
for the RDD? Can we reduce the RDD size in Spark and make it similar to it's 
size on disk?

Thanks and Regards,
Suraj Sheth


RE: Akka Connection refused - standalone cluster using spark-0.9.0

2014-02-25 Thread Li, Rui
Hi Pillis,

I met with the same problem here. Could you share how you solved the issue more 
specifically?
I added an entry in /etc/hosts, but it doesn't help.

From: Pillis W [mailto:pillis.w...@gmail.com]
Sent: Sunday, February 09, 2014 4:49 AM
To: u...@spark.incubator.apache.org
Subject: Re: Akka Connection refused - standalone cluster using spark-0.9.0

I fixed my issue - two IP addresses had the same hostname.
Regards



On Fri, Feb 7, 2014 at 12:59 PM, Soumya Simanta 
mailto:soumya.sima...@gmail.com>> wrote:
I see similar logs but only when I try to run a standalone Scala program. The 
whole setup works just fine if I'm using the spark-shell/REPL.



On Fri, Feb 7, 2014 at 3:05 PM, mohankreddy 
mailto:mre...@beanatomics.com>> wrote:
Here's more information. I have the master up but when I try to get the
workers up I am getting the following error.

log4j:WARN No appenders could be found for logger
(akka.event.slf4j.Slf4jLogger).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for
more info.
14/02/07 15:01:17 INFO Worker: Using Spark's default log4j profile:
org/apache/spark/log4j-defaults.properties
14/02/07 15:01:17 INFO Worker: Starting Spark worker yyy:58020 with 16
cores, 67.0 GB RAM
14/02/07 15:01:17 INFO Worker: Spark home: /opt/spark
14/02/07 15:01:17 INFO WorkerWebUI: Started Worker web UI at
http://y:8081
14/02/07 15:01:17 INFO Worker: Connecting to master spark://x/:7077...
14/02/07 15:01:17 INFO RemoteActorRefProvider$RemoteDeadLetterActorRef:
Message [org.apache.spark.deploy.DeployMessages$RegisterWorker] from
Actor[akka://sparkWorker/user/Worker#2037095035] to
Actor[akka://sparkWorker/deadLetters] was not delivered. [1] dead letters
encountered. This logging can be turned off or adjusted with configuration
settings 'akka.log-dead-letters' and
'akka.log-dead-letters-during-shutdown'.
14/02/07 15:01:37 INFO Worker: Connecting to master spark://x/:7077...
14/02/07 15:01:37 INFO RemoteActorRefProvider$RemoteDeadLetterActorRef:
Message [org.apache.spark.deploy.DeployMessages$RegisterWorker] from
Actor[akka://sparkWorker/user/Worker#2037095035] to
Actor[akka://sparkWorker/deadLetters] was not delivered. [2] dead letters
encountered. This logging can be turned off or adjusted with configuration
settings 'akka.log-dead-letters' and
'akka.log-dead-letters-during-shutdown'.
14/02/07 15:01:57 INFO Worker: Connecting to master spark:///:7077...
14/02/07 15:01:57 INFO RemoteActorRefProvider$RemoteDeadLetterActorRef:
Message [org.apache.spark.deploy.DeployMessages$RegisterWorker] from
Actor[akka://sparkWorker/user/Worker#2037095035] to
Actor[akka://sparkWorker/deadLetters] was not delivered. [3] dead letters
encountered. This logging can be turned off or adjusted with configuration
settings 'akka.log-dead-letters' and
'akka.log-dead-letters-during-shutdown'.
14/02/07 15:02:17 ERROR Worker: All masters are unresponsive! Giving up.



PS: I masked the IPs



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Akka-Connection-refused-standalone-cluster-using-spark-0-9-0-tp1297p1311.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.




Re: HBase row count

2014-02-25 Thread Koert Kuipers
i find them both somewhat confusing actually.
* RDD.cache is lazy, and mutates the RDD in place
* RDD.unpersist has a direct effect of unloading, and also mutates the RDD
in place to disable future lazy caching

i have found that if i need to unload an RDD from memory, but still want it
to be cached again in the future, i need to do:
rdd.unpersist.cache






On Tue, Feb 25, 2014 at 6:50 AM, Cheng Lian  wrote:

> BTW, unlike RDD.cache(), the reverse operation RDD.unpersist() is not
> lazy, which is somewhat confusing...
>
>
> On Tue, Feb 25, 2014 at 7:48 PM, Cheng Lian  wrote:
>
>> RDD.cache() is a lazy operation, the method itself doesn't perform the
>> cache operation, it just asks Spark runtime to cache the content of the RDD
>> when the first action is invoked.  In your case, the first action is the
>> first count() call, which conceptually does 3 things:
>>
>>1. Performs the HBase scan
>>2. Counts all the element
>>3. Caches the RDD content
>>
>>
>>
>> On Tue, Feb 25, 2014 at 3:38 PM, Soumitra Kumar > > wrote:
>>
>>> I did try with 'hBaseRDD.cache()', but don't see any improvement.
>>>
>>> My expectation is that with cache enabled, there should not be any
>>> penalty of 'hBaseRDD.count' call.
>>>
>>>
>>>
>>> On Mon, Feb 24, 2014 at 11:29 PM, Nick Pentreath <
>>> nick.pentre...@gmail.com> wrote:
>>>
 Yes, you''re initiating a scan for each count call. The normal way to
 improve this would be to use cache(), which is what you have in your
 commented out line:
 // hBaseRDD.cache()

 If you uncomment that line, you should see an improvement overall.

 If caching is not an option for some reason (maybe data is too large),
 then you can implement an overall count in your readFields method using
 accumulators:

 val count = sc.accumulator(0L)
 ...
 In your flatMap function do count += 1 for each row (regardless of
 whether "interesting" or not).

 In your main method after doing an action (e.g. count in your case),
 call val totalCount = count.value.




 On Tue, Feb 25, 2014 at 9:15 AM, Soumitra Kumar <
 kumar.soumi...@gmail.com> wrote:

> I have a code which reads an HBase table, and counts number of rows
> containing a field.
>
> def readFields(rdd : RDD[(ImmutableBytesWritable, Result)]) :
> RDD[List[Array[Byte]]] = {
> return rdd.flatMap(kv => {
> // Set of interesting keys for this use case
> val keys = List ("src")
> var data = List[Array[Byte]]()
> var usefulRow = false
>
> val cf = Bytes.toBytes ("cf")
> keys.foreach {key =>
> val col = kv._2.getValue(cf, Bytes.toBytes(key))
> if (col != null)
> usefulRow = true
> data = data :+ col
> }
>
> if (usefulRow)
> Some(data)
> else
> None
> })
> }
>
> def main(args: Array[String]) {
> val hBaseRDD = init(args)
> // hBaseRDD.cache()
>
> println(" Initial row count " + hBaseRDD.count())
> println(" Rows with interesting fields " +
> readFields(hBaseRDD).count())
>   }
>
>
> I am running on a one mode CDH installation.
>
> As it is it takes around 2.5 minutes. But if I comment out
> 'println(" Initial row count " + hBaseRDD.count())', it takes around
> 1.5 minutes.
>
> Is it doing HBase scan twice, for both 'count' calls? How do I improve
> it?
>
> Thanks,
> -Soumitra.
>


>>>
>>
>


Re: HBase row count

2014-02-25 Thread Cheng Lian
BTW, unlike RDD.cache(), the reverse operation RDD.unpersist() is not lazy,
which is somewhat confusing...


On Tue, Feb 25, 2014 at 7:48 PM, Cheng Lian  wrote:

> RDD.cache() is a lazy operation, the method itself doesn't perform the
> cache operation, it just asks Spark runtime to cache the content of the RDD
> when the first action is invoked.  In your case, the first action is the
> first count() call, which conceptually does 3 things:
>
>1. Performs the HBase scan
>2. Counts all the element
>3. Caches the RDD content
>
>
>
> On Tue, Feb 25, 2014 at 3:38 PM, Soumitra Kumar 
> wrote:
>
>> I did try with 'hBaseRDD.cache()', but don't see any improvement.
>>
>> My expectation is that with cache enabled, there should not be any
>> penalty of 'hBaseRDD.count' call.
>>
>>
>>
>> On Mon, Feb 24, 2014 at 11:29 PM, Nick Pentreath <
>> nick.pentre...@gmail.com> wrote:
>>
>>> Yes, you''re initiating a scan for each count call. The normal way to
>>> improve this would be to use cache(), which is what you have in your
>>> commented out line:
>>> // hBaseRDD.cache()
>>>
>>> If you uncomment that line, you should see an improvement overall.
>>>
>>> If caching is not an option for some reason (maybe data is too large),
>>> then you can implement an overall count in your readFields method using
>>> accumulators:
>>>
>>> val count = sc.accumulator(0L)
>>> ...
>>> In your flatMap function do count += 1 for each row (regardless of
>>> whether "interesting" or not).
>>>
>>> In your main method after doing an action (e.g. count in your case),
>>> call val totalCount = count.value.
>>>
>>>
>>>
>>>
>>> On Tue, Feb 25, 2014 at 9:15 AM, Soumitra Kumar <
>>> kumar.soumi...@gmail.com> wrote:
>>>
 I have a code which reads an HBase table, and counts number of rows
 containing a field.

 def readFields(rdd : RDD[(ImmutableBytesWritable, Result)]) :
 RDD[List[Array[Byte]]] = {
 return rdd.flatMap(kv => {
 // Set of interesting keys for this use case
 val keys = List ("src")
 var data = List[Array[Byte]]()
 var usefulRow = false

 val cf = Bytes.toBytes ("cf")
 keys.foreach {key =>
 val col = kv._2.getValue(cf, Bytes.toBytes(key))
 if (col != null)
 usefulRow = true
 data = data :+ col
 }

 if (usefulRow)
 Some(data)
 else
 None
 })
 }

 def main(args: Array[String]) {
 val hBaseRDD = init(args)
 // hBaseRDD.cache()

 println(" Initial row count " + hBaseRDD.count())
 println(" Rows with interesting fields " +
 readFields(hBaseRDD).count())
   }


 I am running on a one mode CDH installation.

 As it is it takes around 2.5 minutes. But if I comment out
 'println(" Initial row count " + hBaseRDD.count())', it takes around
 1.5 minutes.

 Is it doing HBase scan twice, for both 'count' calls? How do I improve
 it?

 Thanks,
 -Soumitra.

>>>
>>>
>>
>


Re: HBase row count

2014-02-25 Thread Cheng Lian
RDD.cache() is a lazy operation, the method itself doesn't perform the
cache operation, it just asks Spark runtime to cache the content of the RDD
when the first action is invoked.  In your case, the first action is the
first count() call, which conceptually does 3 things:

   1. Performs the HBase scan
   2. Counts all the element
   3. Caches the RDD content



On Tue, Feb 25, 2014 at 3:38 PM, Soumitra Kumar wrote:

> I did try with 'hBaseRDD.cache()', but don't see any improvement.
>
> My expectation is that with cache enabled, there should not be any penalty
> of 'hBaseRDD.count' call.
>
>
>
> On Mon, Feb 24, 2014 at 11:29 PM, Nick Pentreath  > wrote:
>
>> Yes, you''re initiating a scan for each count call. The normal way to
>> improve this would be to use cache(), which is what you have in your
>> commented out line:
>> // hBaseRDD.cache()
>>
>> If you uncomment that line, you should see an improvement overall.
>>
>> If caching is not an option for some reason (maybe data is too large),
>> then you can implement an overall count in your readFields method using
>> accumulators:
>>
>> val count = sc.accumulator(0L)
>> ...
>> In your flatMap function do count += 1 for each row (regardless of
>> whether "interesting" or not).
>>
>> In your main method after doing an action (e.g. count in your case), call val
>> totalCount = count.value.
>>
>>
>>
>>
>> On Tue, Feb 25, 2014 at 9:15 AM, Soumitra Kumar > > wrote:
>>
>>> I have a code which reads an HBase table, and counts number of rows
>>> containing a field.
>>>
>>> def readFields(rdd : RDD[(ImmutableBytesWritable, Result)]) :
>>> RDD[List[Array[Byte]]] = {
>>> return rdd.flatMap(kv => {
>>> // Set of interesting keys for this use case
>>> val keys = List ("src")
>>> var data = List[Array[Byte]]()
>>> var usefulRow = false
>>>
>>> val cf = Bytes.toBytes ("cf")
>>> keys.foreach {key =>
>>> val col = kv._2.getValue(cf, Bytes.toBytes(key))
>>> if (col != null)
>>> usefulRow = true
>>> data = data :+ col
>>> }
>>>
>>> if (usefulRow)
>>> Some(data)
>>> else
>>> None
>>> })
>>> }
>>>
>>> def main(args: Array[String]) {
>>> val hBaseRDD = init(args)
>>> // hBaseRDD.cache()
>>>
>>> println(" Initial row count " + hBaseRDD.count())
>>> println(" Rows with interesting fields " +
>>> readFields(hBaseRDD).count())
>>>   }
>>>
>>>
>>> I am running on a one mode CDH installation.
>>>
>>> As it is it takes around 2.5 minutes. But if I comment out
>>> 'println(" Initial row count " + hBaseRDD.count())', it takes around
>>> 1.5 minutes.
>>>
>>> Is it doing HBase scan twice, for both 'count' calls? How do I improve
>>> it?
>>>
>>> Thanks,
>>> -Soumitra.
>>>
>>
>>
>


Re:

2014-02-25 Thread Cheng Lian
RDD.count() is an action, which triggers a distributed job, no matter the
RDD is cached or not.  If the RDD is cached, there won't be duplicated
HBase scan.

How do you want to improve the performance?  Are you trying to reduce
unnecessary distributed jobs, or improve the performance of the second job?
 If the former, just use a variable to hold the result of the first count()
call; if the latter, hBaseRDD.cache() can help.  In both way, the extra
HBase scan can be eliminated.


On Tue, Feb 25, 2014 at 3:14 PM, Soumitra Kumar wrote:

> I have a code which reads an HBase table, and counts number of rows
> containing a field.
>
> def readFields(rdd : RDD[(ImmutableBytesWritable, Result)]) :
> RDD[List[Array[Byte]]] = {
> return rdd.flatMap(kv => {
> // Set of interesting keys for this use case
> val keys = List ("src")
> var data = List[Array[Byte]]()
> var usefulRow = false
>
> val cf = Bytes.toBytes ("cf")
> keys.foreach {key =>
> val col = kv._2.getValue(cf, Bytes.toBytes(key))
> if (col != null)
> usefulRow = true
> data = data :+ col
> }
>
> if (usefulRow)
> Some(data)
> else
> None
> })
> }
>
> def main(args: Array[String]) {
> val hBaseRDD = init(args)
> // hBaseRDD.cache()
>
> println(" Initial row count " + hBaseRDD.count())
> println(" Rows with interesting fields " +
> readFields(hBaseRDD).count())
>   }
>
>
> I am running on a one mode CDH installation.
>
> As it is it takes around 2.5 minutes. But if I comment out 'println("
> Initial row count " + hBaseRDD.count())', it takes around 1.5 minutes.
>
> Is it doing HBase scan twice, for both 'count' calls? How do I improve it?
>
> Thanks,
> -Soumitra.
>
>


Need some tutorials and examples about customized partitioner

2014-02-25 Thread Tao Xiao
I am a newbie to Spark and I need to know how RDD partitioning can be
controlled in the process of shuffling. I have googled for examples but
haven't found much concrete examples, in contrast with the fact that there
are many good tutorials about Hadoop's shuffling and partitioner.

Can anybody show me good tutorials explaining the process of shuffling in
Spark, as well as examples of how to use a customized partitioner.?


Best,
Tao


Re:

2014-02-25 Thread Eugen Cepoi
Yes it is doing it twice, try to cache the initial RDD.


2014-02-25 8:14 GMT+01:00 Soumitra Kumar :

> I have a code which reads an HBase table, and counts number of rows
> containing a field.
>
> def readFields(rdd : RDD[(ImmutableBytesWritable, Result)]) :
> RDD[List[Array[Byte]]] = {
> return rdd.flatMap(kv => {
> // Set of interesting keys for this use case
> val keys = List ("src")
> var data = List[Array[Byte]]()
> var usefulRow = false
>
> val cf = Bytes.toBytes ("cf")
> keys.foreach {key =>
> val col = kv._2.getValue(cf, Bytes.toBytes(key))
> if (col != null)
> usefulRow = true
> data = data :+ col
> }
>
> if (usefulRow)
> Some(data)
> else
> None
> })
> }
>
> def main(args: Array[String]) {
> val hBaseRDD = init(args)
> // hBaseRDD.cache()
>
> println(" Initial row count " + hBaseRDD.count())
> println(" Rows with interesting fields " +
> readFields(hBaseRDD).count())
>   }
>
>
> I am running on a one mode CDH installation.
>
> As it is it takes around 2.5 minutes. But if I comment out 'println("
> Initial row count " + hBaseRDD.count())', it takes around 1.5 minutes.
>
> Is it doing HBase scan twice, for both 'count' calls? How do I improve it?
>
> Thanks,
> -Soumitra.
>
>


Re: HBase row count

2014-02-25 Thread Nick Pentreath
cache only caches the data on the first action (count) - the first time it
still needs to read the data from the source. So the first time you call
count it will take the same amount of time whether cache is enabled or not.
The second time you call count on a cached RDD, you should see that it
takes a lot less time (assuming that the data fit in memory).


On Tue, Feb 25, 2014 at 9:38 AM, Soumitra Kumar wrote:

> I did try with 'hBaseRDD.cache()', but don't see any improvement.
>
> My expectation is that with cache enabled, there should not be any penalty
> of 'hBaseRDD.count' call.
>
>
>
> On Mon, Feb 24, 2014 at 11:29 PM, Nick Pentreath  > wrote:
>
>> Yes, you''re initiating a scan for each count call. The normal way to
>> improve this would be to use cache(), which is what you have in your
>> commented out line:
>> // hBaseRDD.cache()
>>
>> If you uncomment that line, you should see an improvement overall.
>>
>> If caching is not an option for some reason (maybe data is too large),
>> then you can implement an overall count in your readFields method using
>> accumulators:
>>
>> val count = sc.accumulator(0L)
>> ...
>> In your flatMap function do count += 1 for each row (regardless of
>> whether "interesting" or not).
>>
>> In your main method after doing an action (e.g. count in your case), call val
>> totalCount = count.value.
>>
>>
>>
>>
>> On Tue, Feb 25, 2014 at 9:15 AM, Soumitra Kumar > > wrote:
>>
>>> I have a code which reads an HBase table, and counts number of rows
>>> containing a field.
>>>
>>> def readFields(rdd : RDD[(ImmutableBytesWritable, Result)]) :
>>> RDD[List[Array[Byte]]] = {
>>> return rdd.flatMap(kv => {
>>> // Set of interesting keys for this use case
>>> val keys = List ("src")
>>> var data = List[Array[Byte]]()
>>> var usefulRow = false
>>>
>>> val cf = Bytes.toBytes ("cf")
>>> keys.foreach {key =>
>>> val col = kv._2.getValue(cf, Bytes.toBytes(key))
>>> if (col != null)
>>> usefulRow = true
>>> data = data :+ col
>>> }
>>>
>>> if (usefulRow)
>>> Some(data)
>>> else
>>> None
>>> })
>>> }
>>>
>>> def main(args: Array[String]) {
>>> val hBaseRDD = init(args)
>>> // hBaseRDD.cache()
>>>
>>> println(" Initial row count " + hBaseRDD.count())
>>> println(" Rows with interesting fields " +
>>> readFields(hBaseRDD).count())
>>>   }
>>>
>>>
>>> I am running on a one mode CDH installation.
>>>
>>> As it is it takes around 2.5 minutes. But if I comment out
>>> 'println(" Initial row count " + hBaseRDD.count())', it takes around
>>> 1.5 minutes.
>>>
>>> Is it doing HBase scan twice, for both 'count' calls? How do I improve
>>> it?
>>>
>>> Thanks,
>>> -Soumitra.
>>>
>>
>>
>


Re: Nothing happens when executing on cluster

2014-02-25 Thread Anders Bennehag
I believe I solved my problem. The worker-node didn't know where to return
the answers. I set SPARK_LOCAL_IP and the program runs as it should.


On Mon, Feb 24, 2014 at 3:55 PM, Anders Bennehag  wrote:

> Hello there,
>
> I'm having some trouble with my spark-cluster consisting of
>
> master.censored.dev and
> spark-worker-0
>
> Reading from the output of pyspark, master, and worker-node it seems like
> the cluster is formed correctly and pyspark connects to it. But for some
> reason, nothing happens after "TaskSchedulerImpl: Adding task set". Why is
> this and how can I investigate it further?
>
> I haven't really seen any clues in the web-ui.
>
> The program output is as follows:
> pypark:
> https://gist.githubusercontent.com/PureW/ebe1b95b9b4814fc2533/raw/e2d08b7b6288afad3cb03238acc3d172291166d8/pyspark+log
> master:
> https://gist.githubusercontent.com/PureW/9889bc9b57a8406599df/raw/4b1faeda8bacff06b5c3a32d75e74ef114933504/Spark-master
> worker:
> https://gist.githubusercontent.com/PureW/7451cd5ed6780f4d1e33/raw/f45971bd1e6cba620db566998a9afd035ea8d529/spark-worker
>
> The code I am running through pyspark can be seen at
> https://gist.github.com/PureW/2c9603bdf1ef2ae772f3
> When the worker-node couldn't access the data, it raised an exception, but
> now there's nothing at all. I've run the code locally and it only takes
> ~15s to finish.
>
>
> Thanks for any help!
> /Anders
>