Re: Using Cassandra as an input stream from Java

2013-12-05 Thread Pulasthi Supun Wickramasinghe
Hi Lucas,

That did the trick just had to change JavaPairRDDByteBuffer,
SortedMapByteBuffer, IColumn to JavaPairRDDByteBuffer,* ? extends
* SortedMapByteBuffer,
IColumn thanks for the help.

Regards,
Pulasthi



On Thu, Dec 5, 2013 at 10:40 AM, Lucas Fernandes Brunialti 
lbrunia...@igcorp.com.br wrote:

 Hi all,

 This should work:

 JavaPairRDDByteBuffer, SortedMapByteBuffer, IColumn casRdd =
 context.newAPIHadoopRDD(job.getConfiguration(),

 ColumnFamilyInputFormat.class.asSubclass(org.apache.hadoop.mapreduce.InputFormat.class),
 ByteBuffer.class, SortedMap.class);

 I have translated the word count written in scala to java, i just can't
 send it right now...

 Best Regards.

 Lucas.
 On Dec 5, 2013 1:51 AM, Pulasthi Supun Wickramasinghe 
 pulasthi...@gmail.com wrote:

 Hi Tal,

 Just checking if you have added your code to github :). if you have could
 you point me to it.

 Best Regards,
 Pulasthi


 On Thu, Nov 28, 2013 at 11:54 PM, Patrick Wendell pwend...@gmail.comwrote:

 Tal - that would be great to have open sourced if you can do it!

 On Thu, Nov 28, 2013 at 1:00 AM, Pulasthi Supun Wickramasinghe
 pulasthi...@gmail.com wrote:
  Hi Tal,
 
  Thanks for the info will try it out and see how it goes.
 
 
  On Thu, Nov 28, 2013 at 2:19 PM, Tal Sliwowicz ta...@taboola.com
 wrote:
 
  Hi Pulasthi,
 
  I couldn't make it work, so what I ended up doing was implement 3 Java
  classes - one that extends org.apache.hadoop.mapreduce.InputFormat ,
 another
  that extends org.apache.hadoop.mapreduce.InputSplit and a 3rd that
 extends
  org.apache.hadoop.mapreduce.RecordReader and used them to load data
 from
  Cassandra to an RDD (using the newAPIHadoopRDD() method). It works
 great!
  I'm cleaning up the code a bit and will upload to github as an open
 source
  (after the summit).
 
  That's great looking forward check it out after you publish on github
 :).
 
 
  Thanks,
  Pulasthi
 
  I hope this helps for now,
 
  Tal
 
 
  On Thu, Nov 28, 2013 at 10:21 AM, Pulasthi Supun Wickramasinghe
  pulasthi...@gmail.com wrote:
 
  Hi Tal,
 
  I also tried doing this by converting the scala sample into Java but
 i am
  getting an compile time error below is the code
 
   JavaSparkContext sc = new JavaSparkContext(local[3], casDemo);
 
  //Build the job configuration with ConfigHelper provided by
  Cassandra
  Job job = null;
  try {
  job = new Job();
  } catch (IOException e) {
  e.printStackTrace();  //To change body of catch
 statement use
  File | Settings | File Templates.
  }
  job.setInputFormatClass(ColumnFamilyInputFormat.class);
 
  String host = args[1];
  String port = args[2];
 
  ConfigHelper.setInputInitialAddress(job.getConfiguration(),
  host);
  ConfigHelper.setInputRpcPort(job.getConfiguration(), port);
  ConfigHelper.setOutputInitialAddress(job.getConfiguration(),
  host);
  ConfigHelper.setOutputRpcPort(job.getConfiguration(), port);
  ConfigHelper.setInputColumnFamily(job.getConfiguration(),
  casDemo, Words);
  ConfigHelper.setOutputColumnFamily(job.getConfiguration(),
  casDemo, WordCount);
 
  SlicePredicate predicate = new SlicePredicate();
  SliceRange sliceRange = new SliceRange();
  sliceRange.setStart(new byte[0]);
  sliceRange.setFinish(new byte[0]);
  predicate.setSlice_range(sliceRange);
  ConfigHelper.setInputSlicePredicate(job.getConfiguration(),
  predicate);
 
  ConfigHelper.setInputPartitioner(job.getConfiguration(),
  Murmur3Partitioner);
  ConfigHelper.setOutputPartitioner(job.getConfiguration(),
  Murmur3Partitioner);
 
  // Make a new Hadoop RDD
  final SortedMapByteBuffer, IColumn
 byteBufferIColumnSortedMap =
  new TreeMapByteBuffer, IColumn();
  JavaPairRDDByteBuffer, ? extends SortedMap casRdd =
  sc.newAPIHadoopRDD(job.getConfiguration(),
 ColumnFamilyInputFormat.class,
  ByteBuffer.class, byteBufferIColumnSortedMap.getClass());
 
 
  i also tried the code segment that you have provided but i keep
 getting
  the following error.
 
  java:
 
 /home/pulasthi/work/spark/sparkcasstest/src/main/java/org/wso2/spark/CassSpark.java:66:
 
 K,V,FnewAPIHadoopRDD(org.apache.hadoop.conf.Configuration,java.lang.ClassF,java.lang.ClassK,java.lang.ClassV)
  in org.apache.spark.api.java.JavaSparkContext cannot be applied to
 
 (org.apache.hadoop.conf.Configuration,java.lang.Classorg.apache.cassandra.hadoop.ColumnFamilyInputFormat,java.lang.Classjava.nio.ByteBuffer,java.lang.Classcapture#92
  of ? extends java.util.SortedMap)
 
  Did you encounter this if so any help on this would be appreciated.
 
  Best Regards,
  Pulasthi
 
 
  On Tue, Nov 19, 2013 at 9:42 PM, Tal Sliwowicz ta...@taboola.com
 wrote:
 
  Hi,
 
 
  I'm trying to use data stored in cassandra (v1.2) and need some
 help.
  I've translated the the scala example - CassandraTest.scala - to
 Java, but I
  keep 

Re: Persisting MatrixFactorizationModel

2013-12-05 Thread Aslan Bekirov
Thanks a lot Evan...


On Wed, Dec 4, 2013 at 8:31 PM, Evan R. Sparks evan.spa...@gmail.comwrote:

 Ah, actually - I just remembered that the user and product features of the
 model are RDDs, so  - you might be better off saving those components to
 HDFS and then at load time reading them back in and creating a new
 MatrixFactorizationModel. Sorry for the confusion!

 Note, the above solution only works if you want to deploy your model to a
 spark cluster. If the model is small enough and you really want to deploy
 it to several hosts, you could consider calling collect() on its components
 and then serializing the results as I suggested before. In general these
 models are usually pretty small (order of MB), so that's not such a bad
 option - when you get to 10s of millions of users or products, then you
 might consider pre-materializing some pieces of it (e.g. calculate top 100
 predictions for all users or something) and save those intermediate results
 to serve up.

 - Evan


 On Wed, Dec 4, 2013 at 9:54 AM, Aslan Bekirov aslanbeki...@gmail.comwrote:

 I thought to convert model to RDD and save to HDFS, and then load it.

 I will try your method. Thanks a lot.



 On Wed, Dec 4, 2013 at 7:41 PM, Evan R. Sparks evan.spa...@gmail.comwrote:

 The model is serializable - so you should be able to write it out to
 disk and load it up in another program.

 See, e.g. - https://gist.github.com/ramn/5566596 (Note, I haven't
 tested this particular example, but it looks alright).

 Spark makes use of this type of scala (and kryo, etc.) serialization
 internally, so you can check the Spark codebase for more examples.


 On Wed, Dec 4, 2013 at 9:34 AM, Aslan Bekirov aslanbeki...@gmail.comwrote:

 Hi All,

 I am creating a model by calling train method of ALS.

 val model = ALS.train(ratings.)

 I need to persist this model.  Use it from different clients, enable
 clients to make predictions using this model. In other words, persist and
 reload this model.

 Any suggestions, please?

 BR,
 Aslan







Re: How to balance task load

2013-12-05 Thread Andrew Ash
Hi Hao,

Where tasks go is influenced by where the data they operate on resides.  If
the data is on one executor, it may make more sense to do all the
computation on that node rather than ship data across the network.  How was
the data distributed across your cluster?

Andrew


On Mon, Dec 2, 2013 at 7:52 AM, Hao REN julien19890...@gmail.com wrote:

 Sorry for spam.

 To complete the my previous post:

 The map action sometimes creates 4 tasks which are all executed by the
 same executor.

 I believe that if a task dispatch like:
 executor_0 : 1 task;
 executor_1 : 1 task;
 executor_2 : 2 task;
 it will give a better performance.

 Can we force this kind of schedule in Spark ?

 Thank you.



 2013/12/2 Hao REN julien19890...@gmail.com

 Hi,

 When running some tests on EC2 with spark, I notice that: the tasks are
 not fairly distributed to executor.

 For example, a map action produces 4 tasks, but they all go to the


 Executors (3)

- *Memory:* 0.0 B Used (19.0 GB Total)
- *Disk:* 0.0 B Used

  Executor IDAddress RDD blocksMemory used Disk usedActive tasks Failed
 tasksComplete tasks Total tasks 0 ip-10-10-141-143.ec2.internal:52816 00.0 B 
 / 6.3 GB0.0 B40041
 ip-10-40-38-190.ec2.internal:60314 0 0.0 B / 6.3 GB 0.0 B0 0 00 
 2ip-10-62-35-223.ec2.internal:405.0 B / 6.3 GB0.0 B









 --
 REN Hao

 Data Engineer @ ClaraVista

 Paris, France

 Tel:  +33 06 14 54 57 24



Re: How to balance task load

2013-12-05 Thread Hao REN
Hi Andrew,

My data was loaded in HDFS. Actually, I got the answer from the spark-user
google group.

Patrick said:

All cores in the cluster are considered fungible since the tasks are
completely parallel. So until you run out of cores on any given node, it
might get all the tasks.

In some cases this provides *better* performance because you aren't moving
data around as much.

Thank you for your reply. =)


2013/12/5 Andrew Ash and...@andrewash.com

 Hi Hao,

 Where tasks go is influenced by where the data they operate on resides.
  If the data is on one executor, it may make more sense to do all the
 computation on that node rather than ship data across the network.  How was
 the data distributed across your cluster?

 Andrew


 On Mon, Dec 2, 2013 at 7:52 AM, Hao REN julien19890...@gmail.com wrote:

 Sorry for spam.

 To complete the my previous post:

 The map action sometimes creates 4 tasks which are all executed by the
 same executor.

 I believe that if a task dispatch like:
 executor_0 : 1 task;
 executor_1 : 1 task;
 executor_2 : 2 task;
 it will give a better performance.

 Can we force this kind of schedule in Spark ?

 Thank you.



 2013/12/2 Hao REN julien19890...@gmail.com

 Hi,

 When running some tests on EC2 with spark, I notice that: the tasks are
 not fairly distributed to executor.

 For example, a map action produces 4 tasks, but they all go to the


 Executors (3)

- *Memory:* 0.0 B Used (19.0 GB Total)
- *Disk:* 0.0 B Used

  Executor IDAddress RDD blocksMemory used Disk usedActive tasks Failed
 tasksComplete tasks Total tasks 0 ip-10-10-141-143.ec2.internal:52816 00.0 
 B / 6.3 GB0.0 B40041
 ip-10-40-38-190.ec2.internal:60314 0 0.0 B / 6.3 GB 0.0 B0 0 00 
 2ip-10-62-35-223.ec2.internal:405.0 B / 6.3 GB0.0 B









 --
 REN Hao

 Data Engineer @ ClaraVista

 Paris, France

 Tel:  +33 06 14 54 57 24





-- 
REN Hao

Data Engineer @ ClaraVista

Paris, France

Tel:  +33 06 14 54 57 24


Re: Bagel caching issues

2013-12-05 Thread huangjay
Hi,

Maybe you need to check those nodes. It's very slow.


3487SUCCESS PROCESS_LOCAL   ip-10-60-150-111.ec2.internal   2013/12/01 
02:11:38 17.7 m  16.3 m  23.3 MB 
3447SUCCESS PROCESS_LOCAL   ip-10-12-54-63.ec2.internal 2013/12/01 
02:11:26 20.1 m  13.9 m  50.9 MB 


 在 2013年12月1日,上午10:59,Mayuresh Kunjir mayuresh.kun...@gmail.com 写道:
 
 I tried passing DISK_ONLY storage level to Bagel's run method. It's running 
 without any error (so far) but is too slow. I am attaching details for a 
 stage corresponding to second iteration of my algorithm. (foreach at 
 Bagel.scala:237) It's been running for more than 35 minutes. I am noticing 
 very high GC time for some tasks. Listing below the setup parameters. 
 
 #nodes = 16
 SPARK_WORKER_MEMORY = 13G
 SPARK_MEM = 13G
 RDD storage fraction = 0.5
 degree of parallelism = 192 (16 nodes * 4 cores each * 3)
 Serializer = Kryo
 Vertex data size after serialization = ~12G (probably too high, but it's the 
 bare minimum required for the algorithm.)
 
 I would be grateful if you could suggest some further optimizations or point 
 out reasons why/if Bagel is not suitable for this data size. I need to 
 further scale my cluster and not feeling confident at all looking at this.
 
 Thanks and regards,
 ~Mayuresh
 
 
 On Sat, Nov 30, 2013 at 3:07 PM, Mayuresh Kunjir mayuresh.kun...@gmail.com 
 wrote:
 Hi Spark users,
 
 I am running a pagerank-style algorithm on Bagel and bumping into out of 
 memory issues with that. 
 
 Referring to the following table, rdd_120 is the rdd of vertices, serialized 
 and compressed in memory. On each iteration, Bagel deserializes the 
 compressed rdd. e.g. rdd_126 shows the uncompressed version of rdd_120 
 persisted in memory and disk. As iterations keep piling on, the cached 
 partitions start getting evicted. The moment a rdd_120 partition gets 
 evicted, it necessitates a recomputations and the performance goes for a 
 toss. Although we don't need uncompressed rdds from previous iterations, 
 they are the last ones to get evicted thanks to LRU policy. 
 
 Should I make Bagel use DISK_ONLY persistence? How much of a performance hit 
 would that be? Or maybe there is a better solution here.
 
 Storage
 RDD Name Storage LevelCached Partitions  Fraction Cached  Size 
 in Memory Size on Disk
 rdd_83Memory Serialized1x Replicated  23  12% 83.7 MB
  0.0 B
 rdd_95Memory Serialized1x Replicated  23 12% 2.5 MB   0.0 B
 rdd_120   Memory Serialized1x Replicated  25  13% 761.1 MB   
  0.0 B
 rdd_126   Disk Memory Deserialized 1x Replicated  192100%77.9 GB 
  1016.5 MB
 rdd_134   Disk Memory Deserialized 1x Replicated  185 96% 60.8 
 GB 475.4 MB
 Thanks and regards,
 ~Mayuresh
 
 BigFrame - Details for Stage 23.htm


RE: Pre-build Spark for Windows 8.1

2013-12-05 Thread Adrian Bonar
Excellent! Thank you, Matei.

From: Matei Zaharia [mailto:matei.zaha...@gmail.com]
Sent: Wednesday, December 4, 2013 4:26 PM
To: user@spark.incubator.apache.org
Subject: Re: Pre-build Spark for Windows 8.1

Hey Adrian,

Ideally you shouldn't use Cygwin to run on Windows - use the .cmd scripts we 
provide instead. Cygwin might be made to work but we haven't tried to do this 
so far so it's not supported. If you can fix it, that would of course be 
welcome.

Also, the deploy scripts don't work on Windows - we assumed that people would 
mostly use Windows for local development and testing. However, if you do want 
to launch a cluster, you can use spark-class.cmd manually to launch a Master 
and then Workers. Use the commands in the first part of 
http://spark.incubator.apache.org/docs/latest/spark-standalone.html.

Matei

On Dec 4, 2013, at 2:31 PM, Adrian Bonar 
adrian.bo...@microsoft.commailto:adrian.bo...@microsoft.com wrote:


Separate from my previous thread about building a distribution of Spark on 
Win8, I am also trying to run the pre-build binaries with little success. I 
downloaded and extract spark-0.8.0-incubating-bin-hadoop1 to d:\spark and 
attempted to start a master with the following error:

$ sh bin/start-master.sh
starting org.apache.spark.deploy.master.Master, logging to 
/cygdrive/d/spark/bin/../logs/spark-adribona-org.apache.spark.deploy.master.Master-1-ADRIBONA-DEV-1.out
failed to launch org.apache.spark.deploy.master.Master:

  Error: Could not find or load main class org.apache.spark.deploy.master.Master
full log in 
/cygdrive/d/spark/bin/../logs/spark-adribona-org.apache.spark.deploy.master.Master-1-ADRIBONA-DEV-1.out

--Adrian (again. :))



RE: Pre-build Spark for Windows 8.1

2013-12-05 Thread Adrian Bonar
The master starts up now as expected but the workers are unable to connect to 
the master. It looks like the master is refusing the connection messages but 
I'm not sure why. The first two error lines below are from trying to connect a 
worker from a separate machine and the last two error lines are from trying to 
connect a worker on the same machine as the master. I verified that the workers 
do not show up in the master's web ui.

MASTER:
D:\sparkspark-class org.apache.spark.deploy.master.Master
13/12/05 08:08:34 INFO slf4j.Slf4jEventHandler: Slf4jEventHandler started
13/12/05 08:08:34 INFO master.Master: Starting Spark master at 
spark://ADRIBONA-DEV-1:7077
13/12/05 08:08:34 INFO server.Server: jetty-7.x.y-SNAPSHOT
13/12/05 08:08:34 INFO handler.ContextHandler: started 
o.e.j.s.h.ContextHandler{/metrics/master/json,null}
13/12/05 08:08:34 INFO handler.ContextHandler: started 
o.e.j.s.h.ContextHandler{/metrics/applications/json,null}
13/12/05 08:08:34 INFO handler.ContextHandler: started 
o.e.j.s.h.ContextHandler{/static,null}
13/12/05 08:08:34 INFO handler.ContextHandler: started 
o.e.j.s.h.ContextHandler{/app/json,null}
13/12/05 08:08:34 INFO handler.ContextHandler: started 
o.e.j.s.h.ContextHandler{/app,null}
13/12/05 08:08:34 INFO handler.ContextHandler: started 
o.e.j.s.h.ContextHandler{/json,null}
13/12/05 08:08:34 INFO handler.ContextHandler: started 
o.e.j.s.h.ContextHandler{*,null}
13/12/05 08:08:34 INFO server.AbstractConnector: Started 
SelectChannelConnector@0.0.0.0:8088
13/12/05 08:08:34 INFO ui.MasterWebUI: Started Master web UI at 
http://ADRIBONA-DEV-1:8088
13/12/05 08:09:15 ERROR NettyRemoteTransport(null): dropping message 
RegisterWorker(worker-20131205080912-ADRIBONA-SFC-1-19280,ADRIBONA-SFC-1,19280,2,512,8081,ADRIBONA-SFC-1)
 for non-local recipient akka://sparkMaster@adribona-dev-1:7077/user/Master at 
akka://sparkMaster@ADRIBONA-DEV-1:7077 local is 
akka://sparkMaster@ADRIBONA-DEV-1:7077
13/12/05 08:09:15 ERROR NettyRemoteTransport(null): dropping message 
DaemonMsgWatch(Actor[akka://sparkWorker@ADRIBONA-SFC-1:19280/user/Worker],Actor[akka://sparkMaster@adribona-dev-1:7077/user/Master])
 for non-local recipient akka://sparkMaster@adribona-dev-1:7077/remote at 
akka://sparkMaster@ADRIBONA-DEV-1:7077 local is 
akka://sparkMaster@ADRIBONA-DEV-1:7077
13/12/05 08:18:46 ERROR NettyRemoteTransport(null): dropping message 
RegisterWorker(worker-20131205081846-ADRIBONA-DEV-1-13521,ADRIBONA-DEV-1,13521,8,31670,8081,ADRIBONA-DEV-1)
 for non-local recipient akka://sparkMaster@adribona-dev-1:7077/user/Master at 
akka://sparkMaster@ADRIBONA-DEV-1:7077 local is 
akka://sparkMaster@ADRIBONA-DEV-1:7077
13/12/05 08:18:46 ERROR NettyRemoteTransport(null): dropping message 
DaemonMsgWatch(Actor[akka://sparkWorker@ADRIBONA-DEV-1:13521/user/Worker],Actor[akka://sparkMaster@adribona-dev-1:7077/user/Master])
 for non-local recipient akka://sparkMaster@adribona-dev-1:7077/remote at 
akka://sparkMaster@ADRIBONA-DEV-1:7077 local is 
akka://sparkMaster@ADRIBONA-DEV-1:7077

WORKER:
D:\sparkspark-class.cmd org.apache.spark.deploy.worker.Worker 
spark://adribona-dev-1:7077
13/12/05 08:18:46 INFO slf4j.Slf4jEventHandler: Slf4jEventHandler started
13/12/05 08:18:46 INFO worker.Worker: Starting Spark worker 
ADRIBONA-DEV-1:13521 with 8 cores, 30.9 GB RAM
13/12/05 08:18:46 INFO worker.Worker: Spark home: D:\spark
13/12/05 08:18:46 INFO server.Server: jetty-7.x.y-SNAPSHOT
13/12/05 08:18:46 INFO handler.ContextHandler: started 
o.e.j.s.h.ContextHandler{/metrics/json,null}
13/12/05 08:18:46 INFO handler.ContextHandler: started 
o.e.j.s.h.ContextHandler{/static,null}
13/12/05 08:18:46 INFO handler.ContextHandler: started 
o.e.j.s.h.ContextHandler{/log,null}
13/12/05 08:18:46 INFO handler.ContextHandler: started 
o.e.j.s.h.ContextHandler{/logPage,null}
13/12/05 08:18:46 INFO handler.ContextHandler: started 
o.e.j.s.h.ContextHandler{/json,null}
13/12/05 08:18:46 INFO handler.ContextHandler: started 
o.e.j.s.h.ContextHandler{*,null}
13/12/05 08:18:46 INFO server.AbstractConnector: Started 
SelectChannelConnector@0.0.0.0:8081
13/12/05 08:18:46 INFO ui.WorkerWebUI: Started Worker web UI at 
http://ADRIBONA-DEV-1:8081
13/12/05 08:18:46 INFO worker.Worker: Connecting to master 
spark://adribona-dev-1:7077


--Adrian

From: Adrian Bonar [mailto:adrian.bo...@microsoft.com]
Sent: Thursday, December 5, 2013 7:49 AM
To: user@spark.incubator.apache.org
Subject: RE: Pre-build Spark for Windows 8.1

Excellent! Thank you, Matei.

From: Matei Zaharia [mailto:matei.zaha...@gmail.com]
Sent: Wednesday, December 4, 2013 4:26 PM
To: user@spark.incubator.apache.orgmailto:user@spark.incubator.apache.org
Subject: Re: Pre-build Spark for Windows 8.1

Hey Adrian,

Ideally you shouldn't use Cygwin to run on Windows - use the .cmd scripts we 
provide instead. Cygwin might be made to work but we haven't tried to do this 
so far so it's not supported. If you can fix it, that would of course be 
welcome.

Also, the deploy scripts don't work on 

Writing to HBase

2013-12-05 Thread Benjamin Kim
Does anyone have an example or some sort of starting point code when writing 
from Spark Streaming into HBase?
We currently stream ad server event log data using Flume-NG to tail log 
entries, collect them, and put them directly into a HBase table. We would like 
to do the same with Spark Streaming. But, we would like to do the data 
massaging and simple data analysis before. This will cut down the steps in 
prepping data and the number of tables for our data scientists and real-time 
feedback systems.
Thanks,Ben

Re: Writing to HBase

2013-12-05 Thread Philip Ogren

Here's a good place to start:

http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201311.mbox/%3ccacyzca3askwd-tujhqi1805bn7sctguaoruhd5xtxcsul1a...@mail.gmail.com%3E


On 12/5/2013 10:18 AM, Benjamin Kim wrote:
Does anyone have an example or some sort of starting point code when 
writing from Spark Streaming into HBase?


We currently stream ad server event log data using Flume-NG to tail 
log entries, collect them, and put them directly into a HBase table. 
We would like to do the same with Spark Streaming. But, we would like 
to do the data massaging and simple data analysis before. This will 
cut down the steps in prepping data and the number of tables for our 
data scientists and real-time feedback systems.


Thanks,
Ben




Re: Bagel caching issues

2013-12-05 Thread Josh Rosen
The variability in task completion times could be caused by variability in
the amount of work that those tasks perform rather than slow or faulty
nodes.

For PageRank, consider a link graph contains a few disproportionately
popular webpages that have many inlinks (such as Yahoo.com).  These
high-degree nodes may cause significant communications imbalances because
they receive and send many messages in a Pregel-like model.  If you look at
the distribution of shuffled data sizes, does it exhibit similar skew to
the task completion times?

The PowerGraph paper gives a good overview of the challenges posed by these
types of large-scale natural-graphs and develops techniques to split up and
parallelize the processing of these high-degree nodes:
http://graphlab.org/powergraph-presented-at-osdi/

On Thu, Dec 5, 2013 at 6:54 AM, Mayuresh Kunjir
mayuresh.kun...@gmail.comwrote:

 Thanks Jay for your response. Stragglers are a big problem here. I am
 seeing such tasks in many stages of the workflow on a consistent basis.
 It's not due to any particular nodes being slow since the slow tasks are
 observed on all the nodes at different points in time.
 The distribution of task completion times is too skewed for my liking.
 GC delays is a possible reason, but I am just speculating.

 ~Mayuresh




 On Thu, Dec 5, 2013 at 5:31 AM, huangjay ja...@live.cn wrote:

 Hi,

 Maybe you need to check those nodes. It's very slow.


 3487SUCCESSPROCESS_LOCALip-10-60-150-111.ec2.internal 2013/12/01 02:11:3817.7
 m16.3 m 23.3 MB3447SUCCESS PROCESS_LOCALip-10-12-54-63.ec2.internal2013/12/01
 02:11:26 20.1 m13.9 m50.9 MB

 在 2013年12月1日,上午10:59,Mayuresh Kunjir mayuresh.kun...@gmail.com 写道:

 I tried passing DISK_ONLY storage level to Bagel's run method. It's
 running without any error (so far) but is too slow. I am attaching details
 for a stage corresponding to second iteration of my algorithm. (foreach
 at 
 Bagel.scala:237http://ec2-54-234-176-171.compute-1.amazonaws.com:4040/stages/stage?id=23)
 It's been running for more than 35 minutes. I am noticing very high GC time
 for some tasks. Listing below the setup parameters.

 #nodes = 16
 SPARK_WORKER_MEMORY = 13G
 SPARK_MEM = 13G
 RDD storage fraction = 0.5
 degree of parallelism = 192 (16 nodes * 4 cores each * 3)
 Serializer = Kryo
 Vertex data size after serialization = ~12G (probably too high, but it's
 the bare minimum required for the algorithm.)

 I would be grateful if you could suggest some further optimizations or
 point out reasons why/if Bagel is not suitable for this data size. I need
 to further scale my cluster and not feeling confident at all looking at
 this.

 Thanks and regards,
 ~Mayuresh


 On Sat, Nov 30, 2013 at 3:07 PM, Mayuresh Kunjir 
 mayuresh.kun...@gmail.com wrote:

 Hi Spark users,

 I am running a pagerank-style algorithm on Bagel and bumping into out
 of memory issues with that.

 Referring to the following table, rdd_120 is the rdd of vertices,
 serialized and compressed in memory. On each iteration, Bagel deserializes
 the compressed rdd. e.g. rdd_126 shows the uncompressed version of rdd_120
 persisted in memory and disk. As iterations keep piling on, the cached
 partitions start getting evicted. The moment a rdd_120 partition gets
 evicted, it necessitates a recomputations and the performance goes for a
 toss. Although we don't need uncompressed rdds from previous iterations,
 they are the last ones to get evicted thanks to LRU policy.

 Should I make Bagel use DISK_ONLY persistence? How much of a performance
 hit would that be? Or maybe there is a better solution here.

 Storage
  RDD NameStorage Level Cached PartitionsFraction Cached Size in MemorySize
 on Disk 
 rdd_83http://ec2-54-234-176-171.compute-1.amazonaws.com:4040/storage/rdd?id=83Memory
  Serialized1x Replicated2312%83.7 MB0.0 B
 rdd_95http://ec2-54-234-176-171.compute-1.amazonaws.com:4040/storage/rdd?id=95Memory
  Serialized1x Replicated23
 12% 2.5 MB 0.0 B 
 rdd_120http://ec2-54-234-176-171.compute-1.amazonaws.com:4040/storage/rdd?id=120Memory
  Serialized1x Replicated2513%761.1 MB0.0 B
 rdd_126http://ec2-54-234-176-171.compute-1.amazonaws.com:4040/storage/rdd?id=126Disk
  Memory Deserialized 1x Replicated192
 100% 77.9 GB 1016.5 MB 
 rdd_134http://ec2-54-234-176-171.compute-1.amazonaws.com:4040/storage/rdd?id=134Disk
  Memory Deserialized 1x Replicated18596%60.8 GB475.4 MB
 Thanks and regards,
 ~Mayuresh


 BigFrame - Details for Stage 23.htm





Re: takeSample() computation

2013-12-05 Thread Matei Zaharia
Hi Matt,

Try using take() instead, which will only begin computing from the start of the 
RDD (first partition) if the number of elements you ask for is small.

Note that if you’re doing any shuffle operations, like groupBy or sort, then 
the stages before that do have to be computed fully.

Matei

On Dec 5, 2013, at 10:13 AM, Matt Cheah mch...@palantir.com wrote:

 Hi everyone,
 
 I have a question about RDD.takeSample(). This is an action, not a 
 transformation – but is any optimization made to reduce the amount of 
 computation that's done, for example only running the transformations over a 
 smaller subset of the data since only a sample will be returned as a result?
 
 The context is, I'm trying to measure the amount of time a set of 
 transformations takes on our dataset without persisting to disk. So I want to 
 stack the operations on the RDD and then invoke an action that doesn't save 
 the result to disk but can still give me a good idea of how long transforming 
 the whole dataset takes.
 
 Thanks,
 
 -Matt Cheah



Re: takeSample() computation

2013-12-05 Thread Matt Cheah
Actually, we want the opposite – we want as much data to be computed as 
possible.

It's only for benchmarking purposes, of course.

-Matt Cheah

From: Matei Zaharia matei.zaha...@gmail.commailto:matei.zaha...@gmail.com
Reply-To: 
user@spark.incubator.apache.orgmailto:user@spark.incubator.apache.org 
user@spark.incubator.apache.orgmailto:user@spark.incubator.apache.org
Date: Thursday, December 5, 2013 10:31 AM
To: user@spark.incubator.apache.orgmailto:user@spark.incubator.apache.org 
user@spark.incubator.apache.orgmailto:user@spark.incubator.apache.org
Cc: Mingyu Kim m...@palantir.commailto:m...@palantir.com
Subject: Re: takeSample() computation

Hi Matt,

Try using take() instead, which will only begin computing from the start of the 
RDD (first partition) if the number of elements you ask for is small.

Note that if you’re doing any shuffle operations, like groupBy or sort, then 
the stages before that do have to be computed fully.

Matei

On Dec 5, 2013, at 10:13 AM, Matt Cheah 
mch...@palantir.commailto:mch...@palantir.com wrote:

Hi everyone,

I have a question about RDD.takeSample(). This is an action, not a 
transformation – but is any optimization made to reduce the amount of 
computation that's done, for example only running the transformations over a 
smaller subset of the data since only a sample will be returned as a result?

The context is, I'm trying to measure the amount of time a set of 
transformations takes on our dataset without persisting to disk. So I want to 
stack the operations on the RDD and then invoke an action that doesn't save the 
result to disk but can still give me a good idea of how long transforming the 
whole dataset takes.

Thanks,

-Matt Cheah



Re: Pre-build Spark for Windows 8.1

2013-12-05 Thread Matei Zaharia
Hi,

When you launch the worker, try using spark://ADRIBONA-DEV-1:7077 as the URL 
(uppercase instead of lowercase). Unfortunately Akka is very specific about 
seeing hostnames written in the same way on each node, or else it thinks the 
message is for another machine!

Matei

On Dec 5, 2013, at 8:27 AM, Adrian Bonar adrian.bo...@microsoft.com wrote:

 The master starts up now as expected but the workers are unable to connect to 
 the master. It looks like the master is refusing the connection messages but 
 I’m not sure why. The first two error lines below are from trying to connect 
 a worker from a separate machine and the last two error lines are from trying 
 to connect a worker on the same machine as the master. I verified that the 
 workers do not show up in the master’s web ui.
  
 MASTER:
 D:\sparkspark-class org.apache.spark.deploy.master.Master
 13/12/05 08:08:34 INFO slf4j.Slf4jEventHandler: Slf4jEventHandler started
 13/12/05 08:08:34 INFO master.Master: Starting Spark master at 
 spark://ADRIBONA-DEV-1:7077
 13/12/05 08:08:34 INFO server.Server: jetty-7.x.y-SNAPSHOT
 13/12/05 08:08:34 INFO handler.ContextHandler: started 
 o.e.j.s.h.ContextHandler{/metrics/master/json,null}
 13/12/05 08:08:34 INFO handler.ContextHandler: started 
 o.e.j.s.h.ContextHandler{/metrics/applications/json,null}
 13/12/05 08:08:34 INFO handler.ContextHandler: started 
 o.e.j.s.h.ContextHandler{/static,null}
 13/12/05 08:08:34 INFO handler.ContextHandler: started 
 o.e.j.s.h.ContextHandler{/app/json,null}
 13/12/05 08:08:34 INFO handler.ContextHandler: started 
 o.e.j.s.h.ContextHandler{/app,null}
 13/12/05 08:08:34 INFO handler.ContextHandler: started 
 o.e.j.s.h.ContextHandler{/json,null}
 13/12/05 08:08:34 INFO handler.ContextHandler: started 
 o.e.j.s.h.ContextHandler{*,null}
 13/12/05 08:08:34 INFO server.AbstractConnector: Started 
 SelectChannelConnector@0.0.0.0:8088
 13/12/05 08:08:34 INFO ui.MasterWebUI: Started Master web UI at 
 http://ADRIBONA-DEV-1:8088
 13/12/05 08:09:15 ERROR NettyRemoteTransport(null): dropping message 
 RegisterWorker(worker-20131205080912-ADRIBONA-SFC-1-19280,ADRIBONA-SFC-1,19280,2,512,8081,ADRIBONA-SFC-1)
  for non-local recipient akka://sparkMaster@adribona-dev-1:7077/user/Master 
 at akka://sparkMaster@ADRIBONA-DEV-1:7077 local is 
 akka://sparkMaster@ADRIBONA-DEV-1:7077
 13/12/05 08:09:15 ERROR NettyRemoteTransport(null): dropping message 
 DaemonMsgWatch(Actor[akka://sparkWorker@ADRIBONA-SFC-1:19280/user/Worker],Actor[akka://sparkMaster@adribona-dev-1:7077/user/Master])
  for non-local recipient akka://sparkMaster@adribona-dev-1:7077/remote at 
 akka://sparkMaster@ADRIBONA-DEV-1:7077 local is 
 akka://sparkMaster@ADRIBONA-DEV-1:7077
 13/12/05 08:18:46 ERROR NettyRemoteTransport(null): dropping message 
 RegisterWorker(worker-20131205081846-ADRIBONA-DEV-1-13521,ADRIBONA-DEV-1,13521,8,31670,8081,ADRIBONA-DEV-1)
  for non-local recipient akka://sparkMaster@adribona-dev-1:7077/user/Master 
 atakka://sparkMaster@ADRIBONA-DEV-1:7077 local is 
 akka://sparkMaster@ADRIBONA-DEV-1:7077
 13/12/05 08:18:46 ERROR NettyRemoteTransport(null): dropping message 
 DaemonMsgWatch(Actor[akka://sparkWorker@ADRIBONA-DEV-1:13521/user/Worker],Actor[akka://sparkMaster@adribona-dev-1:7077/user/Master])
  for non-local recipient akka://sparkMaster@adribona-dev-1:7077/remote at 
 akka://sparkMaster@ADRIBONA-DEV-1:7077 local is 
 akka://sparkMaster@ADRIBONA-DEV-1:7077
  
 WORKER:
 D:\sparkspark-class.cmd org.apache.spark.deploy.worker.Worker 
 spark://adribona-dev-1:7077
 13/12/05 08:18:46 INFO slf4j.Slf4jEventHandler: Slf4jEventHandler started
 13/12/05 08:18:46 INFO worker.Worker: Starting Spark worker 
 ADRIBONA-DEV-1:13521 with 8 cores, 30.9 GB RAM
 13/12/05 08:18:46 INFO worker.Worker: Spark home: D:\spark
 13/12/05 08:18:46 INFO server.Server: jetty-7.x.y-SNAPSHOT
 13/12/05 08:18:46 INFO handler.ContextHandler: started 
 o.e.j.s.h.ContextHandler{/metrics/json,null}
 13/12/05 08:18:46 INFO handler.ContextHandler: started 
 o.e.j.s.h.ContextHandler{/static,null}
 13/12/05 08:18:46 INFO handler.ContextHandler: started 
 o.e.j.s.h.ContextHandler{/log,null}
 13/12/05 08:18:46 INFO handler.ContextHandler: started 
 o.e.j.s.h.ContextHandler{/logPage,null}
 13/12/05 08:18:46 INFO handler.ContextHandler: started 
 o.e.j.s.h.ContextHandler{/json,null}
 13/12/05 08:18:46 INFO handler.ContextHandler: started 
 o.e.j.s.h.ContextHandler{*,null}
 13/12/05 08:18:46 INFO server.AbstractConnector: Started 
 SelectChannelConnector@0.0.0.0:8081
 13/12/05 08:18:46 INFO ui.WorkerWebUI: Started Worker web UI at 
 http://ADRIBONA-DEV-1:8081
 13/12/05 08:18:46 INFO worker.Worker: Connecting to master 
 spark://adribona-dev-1:7077
  
  
 --Adrian
  
 From: Adrian Bonar [mailto:adrian.bo...@microsoft.com] 
 Sent: Thursday, December 5, 2013 7:49 AM
 To: user@spark.incubator.apache.org
 Subject: RE: Pre-build Spark for Windows 8.1
  
 Excellent! Thank you, Matei.
  
 From: Matei Zaharia [mailto:matei.zaha...@gmail.com] 
 Sent: 

Re: Pre-build Spark for Windows 8.1

2013-12-05 Thread Andrew Ash
Speaking of akka and host sensitivity...  How much have you hacked on akka
to get it to support all of: myhost.mydomain.int, myhost, and 10.1.1.1?
 It's kind of a pain to get the Spark URL to exactly match.  I'm wondering
if there are usability gains that could be made here or if we're pretty
stuck.


On Thu, Dec 5, 2013 at 2:43 PM, Matei Zaharia matei.zaha...@gmail.comwrote:

 Hi,

 When you launch the worker, try using spark://ADRIBONA-DEV-1:7077 as the
 URL (uppercase instead of lowercase). Unfortunately Akka is very specific
 about seeing hostnames written in the same way on each node, or else it
 thinks the message is for another machine!

 Matei

 On Dec 5, 2013, at 8:27 AM, Adrian Bonar adrian.bo...@microsoft.com
 wrote:

  The master starts up now as expected but the workers are unable to
 connect to the master. It looks like the master is refusing the connection
 messages but I’m not sure why. The first two error lines below are from
 trying to connect a worker from a separate machine and the last two error
 lines are from trying to connect a worker on the same machine as the
 master. I verified that the workers do not show up in the master’s web ui.

 MASTER:
 D:\sparkspark-class org.apache.spark.deploy.master.Master
 13/12/05 08:08:34 INFO slf4j.Slf4jEventHandler: Slf4jEventHandler started
 13/12/05 08:08:34 INFO master.Master: Starting Spark master at
 spark://ADRIBONA-DEV-1:7077
 13/12/05 08:08:34 INFO server.Server: jetty-7.x.y-SNAPSHOT
 13/12/05 08:08:34 INFO handler.ContextHandler: started
 o.e.j.s.h.ContextHandler{/metrics/master/json,null}
 13/12/05 08:08:34 INFO handler.ContextHandler: started
 o.e.j.s.h.ContextHandler{/metrics/applications/json,null}
 13/12/05 08:08:34 INFO handler.ContextHandler: started
 o.e.j.s.h.ContextHandler{/static,null}
 13/12/05 08:08:34 INFO handler.ContextHandler: started
 o.e.j.s.h.ContextHandler{/app/json,null}
 13/12/05 08:08:34 INFO handler.ContextHandler: started
 o.e.j.s.h.ContextHandler{/app,null}
 13/12/05 08:08:34 INFO handler.ContextHandler: started
 o.e.j.s.h.ContextHandler{/json,null}
 13/12/05 08:08:34 INFO handler.ContextHandler: started
 o.e.j.s.h.ContextHandler{*,null}
 13/12/05 08:08:34 INFO server.AbstractConnector: Started
 SelectChannelConnector@0.0.0.0:8088
 13/12/05 08:08:34 INFO ui.MasterWebUI: Started Master web UI at
 http://ADRIBONA-DEV-1:8088 http://adribona-dev-1:8088/
 13/12/05 08:09:15 *ERROR* NettyRemoteTransport(null): dropping message
 RegisterWorker(worker-20131205080912-ADRIBONA-SFC-1-19280,ADRIBONA-SFC-1,19280,2,512,8081,ADRIBONA-SFC-1)
 for non-local recipient akka://sparkMaster@adribona-dev-1:7077/user/Master
  at akka://sparkMaster@ADRIBONA-DEV-1:7077 local is
 akka://sparkMaster@ADRIBONA-DEV-1:7077
 13/12/05 08:09:15 *ERROR* NettyRemoteTransport(null): dropping message
 DaemonMsgWatch(Actor[
 akka://sparkWorker@ADRIBONA-SFC-1:19280/user/Worker],Actor[akka://sparkMaster@adribona-dev-1:7077/user/Master])
 for non-local recipient akka://sparkMaster@adribona-dev-1:7077/remote at
 akka://sparkMaster@ADRIBONA-DEV-1:7077 local is
 akka://sparkMaster@ADRIBONA-DEV-1:7077
 13/12/05 08:18:46 *ERROR* NettyRemoteTransport(null): dropping message
 RegisterWorker(worker-20131205081846-ADRIBONA-DEV-1-13521,ADRIBONA-DEV-1,13521,8,31670,8081,ADRIBONA-DEV-1)
 for non-local recipient akka://sparkMaster@adribona-dev-1:7077/user/Master
  atakka://sparkMaster@ADRIBONA-DEV-1:7077 local is
 akka://sparkMaster@ADRIBONA-DEV-1:7077
 13/12/05 08:18:46 *ERROR* NettyRemoteTransport(null): dropping message
 DaemonMsgWatch(Actor[
 akka://sparkWorker@ADRIBONA-DEV-1:13521/user/Worker],Actor[akka://sparkMaster@adribona-dev-1:7077/user/Master])
 for non-local recipient akka://sparkMaster@adribona-dev-1:7077/remote at
 akka://sparkMaster@ADRIBONA-DEV-1:7077 local is
 akka://sparkMaster@ADRIBONA-DEV-1:7077

 WORKER:
 D:\sparkspark-class.cmd org.apache.spark.deploy.worker.Worker
 spark://adribona-dev-1:7077
 13/12/05 08:18:46 INFO slf4j.Slf4jEventHandler: Slf4jEventHandler started
 13/12/05 08:18:46 INFO worker.Worker: Starting Spark worker
 ADRIBONA-DEV-1:13521 with 8 cores, 30.9 GB RAM
 13/12/05 08:18:46 INFO worker.Worker: Spark home: D:\spark
 13/12/05 08:18:46 INFO server.Server: jetty-7.x.y-SNAPSHOT
 13/12/05 08:18:46 INFO handler.ContextHandler: started
 o.e.j.s.h.ContextHandler{/metrics/json,null}
 13/12/05 08:18:46 INFO handler.ContextHandler: started
 o.e.j.s.h.ContextHandler{/static,null}
 13/12/05 08:18:46 INFO handler.ContextHandler: started
 o.e.j.s.h.ContextHandler{/log,null}
 13/12/05 08:18:46 INFO handler.ContextHandler: started
 o.e.j.s.h.ContextHandler{/logPage,null}
 13/12/05 08:18:46 INFO handler.ContextHandler: started
 o.e.j.s.h.ContextHandler{/json,null}
 13/12/05 08:18:46 INFO handler.ContextHandler: started
 o.e.j.s.h.ContextHandler{*,null}
 13/12/05 08:18:46 INFO server.AbstractConnector: Started
 SelectChannelConnector@0.0.0.0:8081
 13/12/05 08:18:46 INFO ui.WorkerWebUI: Started Worker web UI at
 http://ADRIBONA-DEV-1:8081 

Re: takeSample() computation

2013-12-05 Thread Matei Zaharia
Ah, got it. Then takeSample is going to do what you want, because it needs a 
uniform sample. If you don’t want any result at all, you can also use 
RDD.foreach() with an empty function.

Matei

On Dec 5, 2013, at 12:54 PM, Matt Cheah mch...@palantir.com wrote:

 Actually, we want the opposite – we want as much data to be computed as 
 possible.
 
 It's only for benchmarking purposes, of course.
 
 -Matt Cheah
 
 From: Matei Zaharia matei.zaha...@gmail.com
 Reply-To: user@spark.incubator.apache.org user@spark.incubator.apache.org
 Date: Thursday, December 5, 2013 10:31 AM
 To: user@spark.incubator.apache.org user@spark.incubator.apache.org
 Cc: Mingyu Kim m...@palantir.com
 Subject: Re: takeSample() computation
 
 Hi Matt,
 
 Try using take() instead, which will only begin computing from the start of 
 the RDD (first partition) if the number of elements you ask for is small.
 
 Note that if you’re doing any shuffle operations, like groupBy or sort, then 
 the stages before that do have to be computed fully.
 
 Matei
 
 On Dec 5, 2013, at 10:13 AM, Matt Cheah mch...@palantir.com wrote:
 
 Hi everyone,
 
 I have a question about RDD.takeSample(). This is an action, not a 
 transformation – but is any optimization made to reduce the amount of 
 computation that's done, for example only running the transformations over a 
 smaller subset of the data since only a sample will be returned as a result?
 
 The context is, I'm trying to measure the amount of time a set of 
 transformations takes on our dataset without persisting to disk. So I want 
 to stack the operations on the RDD and then invoke an action that doesn't 
 save the result to disk but can still give me a good idea of how long 
 transforming the whole dataset takes.
 
 Thanks,
 
 -Matt Cheah
 



RE: Pre-build Spark for Windows 8.1

2013-12-05 Thread Adrian Bonar
Strange, but that definitely did the trick. Thanks again!

From: Matei Zaharia [mailto:matei.zaha...@gmail.com]
Sent: Thursday, December 5, 2013 2:44 PM
To: user@spark.incubator.apache.org
Subject: Re: Pre-build Spark for Windows 8.1

Hi,

When you launch the worker, try using spark://ADRIBONA-DEV-1:7077 as the URL 
(uppercase instead of lowercase). Unfortunately Akka is very specific about 
seeing hostnames written in the same way on each node, or else it thinks the 
message is for another machine!

Matei

On Dec 5, 2013, at 8:27 AM, Adrian Bonar 
adrian.bo...@microsoft.commailto:adrian.bo...@microsoft.com wrote:


The master starts up now as expected but the workers are unable to connect to 
the master. It looks like the master is refusing the connection messages but 
I'm not sure why. The first two error lines below are from trying to connect a 
worker from a separate machine and the last two error lines are from trying to 
connect a worker on the same machine as the master. I verified that the workers 
do not show up in the master's web ui.

MASTER:
D:\sparkspark-class org.apache.spark.deploy.master.Master
13/12/05 08:08:34 INFO slf4j.Slf4jEventHandler: Slf4jEventHandler started
13/12/05 08:08:34 INFO master.Master: Starting Spark master at 
spark://ADRIBONA-DEV-1:7077
13/12/05 08:08:34 INFO server.Server: jetty-7.x.y-SNAPSHOT
13/12/05 08:08:34 INFO handler.ContextHandler: started 
o.e.j.s.h.ContextHandler{/metrics/master/json,null}
13/12/05 08:08:34 INFO handler.ContextHandler: started 
o.e.j.s.h.ContextHandler{/metrics/applications/json,null}
13/12/05 08:08:34 INFO handler.ContextHandler: started 
o.e.j.s.h.ContextHandler{/static,null}
13/12/05 08:08:34 INFO handler.ContextHandler: started 
o.e.j.s.h.ContextHandler{/app/json,null}
13/12/05 08:08:34 INFO handler.ContextHandler: started 
o.e.j.s.h.ContextHandler{/app,null}
13/12/05 08:08:34 INFO handler.ContextHandler: started 
o.e.j.s.h.ContextHandler{/json,null}
13/12/05 08:08:34 INFO handler.ContextHandler: started 
o.e.j.s.h.ContextHandler{*,null}
13/12/05 08:08:34 INFO server.AbstractConnector: Started 
SelectChannelConnector@0.0.0.0mailto:SelectChannelConnector@0.0.0.0:8088
13/12/05 08:08:34 INFO ui.MasterWebUI: Started Master web UI at 
http://ADRIBONA-DEV-1:8088http://adribona-dev-1:8088/
13/12/05 08:09:15 ERROR NettyRemoteTransport(null): dropping message 
RegisterWorker(worker-20131205080912-ADRIBONA-SFC-1-19280,ADRIBONA-SFC-1,19280,2,512,8081,ADRIBONA-SFC-1)
 for non-local recipient akka://sparkMaster@adribona-dev-1:7077/user/Master at 
akka://sparkMaster@ADRIBONA-DEV-1:7077 local is 
akka://sparkMaster@ADRIBONA-DEV-1:7077
13/12/05 08:09:15 ERROR NettyRemoteTransport(null): dropping message 
DaemonMsgWatch(Actor[akka://sparkWorker@ADRIBONA-SFC-1:19280/user/Worker],Actor[akka://sparkMaster@adribona-dev-1:7077/user/Masterakka://sparkWorker@ADRIBONA-SFC-1:19280/user/Worker%5d,Actor%5bakka:/sparkMaster@adribona-dev-1:7077/user/Master])
 for non-local recipient akka://sparkMaster@adribona-dev-1:7077/remote at 
akka://sparkMaster@ADRIBONA-DEV-1:7077 local is 
akka://sparkMaster@ADRIBONA-DEV-1:7077
13/12/05 08:18:46 ERROR NettyRemoteTransport(null): dropping message 
RegisterWorker(worker-20131205081846-ADRIBONA-DEV-1-13521,ADRIBONA-DEV-1,13521,8,31670,8081,ADRIBONA-DEV-1)
 for non-local recipient akka://sparkMaster@adribona-dev-1:7077/user/Master 
atakka://sparkMaster@ADRIBONA-DEV-1:7077 local is 
akka://sparkMaster@ADRIBONA-DEV-1:7077
13/12/05 08:18:46 ERROR NettyRemoteTransport(null): dropping message 
DaemonMsgWatch(Actor[akka://sparkWorker@ADRIBONA-DEV-1:13521/user/Worker],Actor[akka://sparkMaster@adribona-dev-1:7077/user/Masterakka://sparkWorker@ADRIBONA-DEV-1:13521/user/Worker%5d,Actor%5bakka:/sparkMaster@adribona-dev-1:7077/user/Master])
 for non-local recipient akka://sparkMaster@adribona-dev-1:7077/remote at 
akka://sparkMaster@ADRIBONA-DEV-1:7077 local is 
akka://sparkMaster@ADRIBONA-DEV-1:7077

WORKER:
D:\sparkspark-class.cmd org.apache.spark.deploy.worker.Worker 
spark://adribona-dev-1:7077
13/12/05 08:18:46 INFO slf4j.Slf4jEventHandler: Slf4jEventHandler started
13/12/05 08:18:46 INFO worker.Worker: Starting Spark worker 
ADRIBONA-DEV-1:13521 with 8 cores, 30.9 GB RAM
13/12/05 08:18:46 INFO worker.Worker: Spark home: D:\spark
13/12/05 08:18:46 INFO server.Server: jetty-7.x.y-SNAPSHOT
13/12/05 08:18:46 INFO handler.ContextHandler: started 
o.e.j.s.h.ContextHandler{/metrics/json,null}
13/12/05 08:18:46 INFO handler.ContextHandler: started 
o.e.j.s.h.ContextHandler{/static,null}
13/12/05 08:18:46 INFO handler.ContextHandler: started 
o.e.j.s.h.ContextHandler{/log,null}
13/12/05 08:18:46 INFO handler.ContextHandler: started 
o.e.j.s.h.ContextHandler{/logPage,null}
13/12/05 08:18:46 INFO handler.ContextHandler: started 
o.e.j.s.h.ContextHandler{/json,null}
13/12/05 08:18:46 INFO handler.ContextHandler: started 
o.e.j.s.h.ContextHandler{*,null}
13/12/05 08:18:46 INFO server.AbstractConnector: Started 

Re: Spark heap issues

2013-12-05 Thread purav aggarwal
Try allocating some more resources to your application.
You seem to be using 512Mb for you worker node - (you can verify that from
the master UI)

Try putting the following settings into your code and see if it helps -

System.setProperty(spark.executor.memory,15g)   // Will allocate more
memory
System.setProperty(spark.akka.frameSize,2000)
System.setProperty(spark.akka.threads,16)   // Dependent upon
number of cores with your worker machine


On Fri, Dec 6, 2013 at 1:06 AM, learner1014 all learner1...@gmail.comwrote:

 Hi,

 Trying to do a join operation on an RDD, my input is pipe delimited data
 and there are 2 files.
 One file is 24MB and the other file is 285MB.
 Setup being used is the single node (server) setup: SPARK_MEM set to 512m

 Master
 /pkg/java/jdk1.7.0_11/bin/java -cp
 :/spark-0.8.0-incubating-bin-cdh4/conf:/spark-0.8.0-incubating-bin-cdh4/assembly/target/scala-2.9.3/spark-assembly-0.8.0-incubating-hadoop1.2.1.jar
 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps
 -Dspark.boundedMemoryCache.memoryFraction=0.4
 -Dspark.cache.class=spark.DiskSpillingCache -XX:+UseConcMarkSweepGC
 -Djava.library.path= -Xms512m -Xmx512m
 org.apache.spark.deploy.master.Master --ip localhost --port 7077
 --webui-port 8080

 Worker
 /pkg/java/jdk1.7.0_11/bin/java -cp
 :/spark-0.8.0-incubating-bin-cdh4/conf:/spark-0.8.0-incubating-bin-cdh4/assembly/target/scala-2.9.3/spark-assembly-0.8.0-incubating-hadoop1.2.1.jar
 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps
 -Dspark.boundedMemoryCache.memoryFraction=0.4
 -Dspark.cache.class=spark.DiskSpillingCache -XX:+UseConcMarkSweepGC
 -Djava.library.path= -Xms512m -Xmx512m
 org.apache.spark.deploy.worker.Worker spark://localhost:7077


 App
 /pkg/java/jdk1.7.0_11/bin/java -cp
 :/spark-0.8.0-incubating-bin-cdh4/conf:/spark-0.8.0-incubating-bin-cdh4/assembly/target/scala-2.9.3/spark-assembly-0.8.0-incubating-hadoop1.2.1.jar:/spark-0.8.0-incubating-bin-cdh4/core/target/scala-2.9.3/test-classes:/spark-0.8.0-incubating-bin-cdh4/repl/target/scala-2.9.3/test-classes:/spark-0.8.0-incubating-bin-cdh4/mllib/target/scala-2.9.3/test-classes:/spark-0.8.0-incubating-bin-cdh4/bagel/target/scala-2.9.3/test-classes:/spark-0.8.0-incubating-bin-cdh4/streaming/target/scala-2.9.3/test-classes
 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps
 -Dspark.boundedMemoryCache.memoryFraction=0.4
 -Dspark.cache.class=spark.DiskSpillingCache -XX:+UseConcMarkSweepGC
 -Xms512M -Xmx512M org.apache.spark.executor.StandaloneExecutorBackend
 akka://spark@localhost:33024/user/StandaloneScheduler 1 localhost 4


 Here is the code
 import org.apache.spark.SparkContext
 import org.apache.spark.SparkContext._
 import org.apache.spark.storage.StorageLevel

 object SimpleApp {

   def main (args: Array[String]) {


 System.setProperty(spark.local.dir,/spark-0.8.0-incubating-bin-cdh4/tmp);
   System.setProperty(spark.serializer,
 org.apache.spark.serializer.KryoSerializer)
   System.setProperty(spark.akka.timeout, 30)  //in seconds

   val dataFile2 = /tmp_data/data1.txt
   val dataFile1 = /tmp_data/data2.txt
   val sc = new SparkContext(spark://localhost:7077, Simple App,
 /spark-0.8.0-incubating-bin-cdh4,
   List(target/scala-2.9.3/simple-project_2.9.3-1.0.jar))

   val data10 = sc.textFile(dataFile1, 128)
   val data11 = data10.map(x = x.split(|))
   val data12 = data11.map( x  =  (x(1).toInt - x) )


   val data20 = sc.textFile(dataFile2, 128)
   val data21 = data20.map(x = x.split(|))
   val data22 = data21.map(x = (x(1).toInt - x))


   val data3 = data12.join(data22, 128)
   val data4 = data3.distinct(4)
   val numAs = data10.count()
   val numBs = data20.count()
   val numCs = data3.count()
   val numDs = data4.count()
   println(Lines in 1: %s, Lines in 2: %s Lines in 3: %s Lines in 4:
 %s.format(numAs, numBs, numCs, numDs))
   data4.foreach(println)
 }

 I see the following errors
 13/12/04 10:53:55 WARN storage.BlockManagerMaster: Error sending message
 to BlockManagerMaster in 1 attempts
 java.util.concurrent.TimeoutException: Futures timed out after [1]
 milliseconds
 at akka.dispatch.DefaultPromise.ready(Future.scala:870)
 at akka.dispatch.DefaultPromise.result(Future.scala:874)
 at akka.dispatch.Await$.result(Future.scala:74)

 and
 13/12/04 10:53:55 ERROR executor.Executor: Exception in task ID 517
 java.lang.OutOfMemoryError: Java heap space
 at com.esotericsoftware.kryo.io.Input.readString(Input.java:448)
 at
 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$StringArraySerializer.read(DefaultArraySerializers.java:282)
 at
 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$StringArraySerializer.read(DefaultArraySerializers.java:262)
 at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
 at
 com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:43)
 at