Re: Using Cassandra as an input stream from Java
Hi Lucas, That did the trick just had to change JavaPairRDDByteBuffer, SortedMapByteBuffer, IColumn to JavaPairRDDByteBuffer,* ? extends * SortedMapByteBuffer, IColumn thanks for the help. Regards, Pulasthi On Thu, Dec 5, 2013 at 10:40 AM, Lucas Fernandes Brunialti lbrunia...@igcorp.com.br wrote: Hi all, This should work: JavaPairRDDByteBuffer, SortedMapByteBuffer, IColumn casRdd = context.newAPIHadoopRDD(job.getConfiguration(), ColumnFamilyInputFormat.class.asSubclass(org.apache.hadoop.mapreduce.InputFormat.class), ByteBuffer.class, SortedMap.class); I have translated the word count written in scala to java, i just can't send it right now... Best Regards. Lucas. On Dec 5, 2013 1:51 AM, Pulasthi Supun Wickramasinghe pulasthi...@gmail.com wrote: Hi Tal, Just checking if you have added your code to github :). if you have could you point me to it. Best Regards, Pulasthi On Thu, Nov 28, 2013 at 11:54 PM, Patrick Wendell pwend...@gmail.comwrote: Tal - that would be great to have open sourced if you can do it! On Thu, Nov 28, 2013 at 1:00 AM, Pulasthi Supun Wickramasinghe pulasthi...@gmail.com wrote: Hi Tal, Thanks for the info will try it out and see how it goes. On Thu, Nov 28, 2013 at 2:19 PM, Tal Sliwowicz ta...@taboola.com wrote: Hi Pulasthi, I couldn't make it work, so what I ended up doing was implement 3 Java classes - one that extends org.apache.hadoop.mapreduce.InputFormat , another that extends org.apache.hadoop.mapreduce.InputSplit and a 3rd that extends org.apache.hadoop.mapreduce.RecordReader and used them to load data from Cassandra to an RDD (using the newAPIHadoopRDD() method). It works great! I'm cleaning up the code a bit and will upload to github as an open source (after the summit). That's great looking forward check it out after you publish on github :). Thanks, Pulasthi I hope this helps for now, Tal On Thu, Nov 28, 2013 at 10:21 AM, Pulasthi Supun Wickramasinghe pulasthi...@gmail.com wrote: Hi Tal, I also tried doing this by converting the scala sample into Java but i am getting an compile time error below is the code JavaSparkContext sc = new JavaSparkContext(local[3], casDemo); //Build the job configuration with ConfigHelper provided by Cassandra Job job = null; try { job = new Job(); } catch (IOException e) { e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. } job.setInputFormatClass(ColumnFamilyInputFormat.class); String host = args[1]; String port = args[2]; ConfigHelper.setInputInitialAddress(job.getConfiguration(), host); ConfigHelper.setInputRpcPort(job.getConfiguration(), port); ConfigHelper.setOutputInitialAddress(job.getConfiguration(), host); ConfigHelper.setOutputRpcPort(job.getConfiguration(), port); ConfigHelper.setInputColumnFamily(job.getConfiguration(), casDemo, Words); ConfigHelper.setOutputColumnFamily(job.getConfiguration(), casDemo, WordCount); SlicePredicate predicate = new SlicePredicate(); SliceRange sliceRange = new SliceRange(); sliceRange.setStart(new byte[0]); sliceRange.setFinish(new byte[0]); predicate.setSlice_range(sliceRange); ConfigHelper.setInputSlicePredicate(job.getConfiguration(), predicate); ConfigHelper.setInputPartitioner(job.getConfiguration(), Murmur3Partitioner); ConfigHelper.setOutputPartitioner(job.getConfiguration(), Murmur3Partitioner); // Make a new Hadoop RDD final SortedMapByteBuffer, IColumn byteBufferIColumnSortedMap = new TreeMapByteBuffer, IColumn(); JavaPairRDDByteBuffer, ? extends SortedMap casRdd = sc.newAPIHadoopRDD(job.getConfiguration(), ColumnFamilyInputFormat.class, ByteBuffer.class, byteBufferIColumnSortedMap.getClass()); i also tried the code segment that you have provided but i keep getting the following error. java: /home/pulasthi/work/spark/sparkcasstest/src/main/java/org/wso2/spark/CassSpark.java:66: K,V,FnewAPIHadoopRDD(org.apache.hadoop.conf.Configuration,java.lang.ClassF,java.lang.ClassK,java.lang.ClassV) in org.apache.spark.api.java.JavaSparkContext cannot be applied to (org.apache.hadoop.conf.Configuration,java.lang.Classorg.apache.cassandra.hadoop.ColumnFamilyInputFormat,java.lang.Classjava.nio.ByteBuffer,java.lang.Classcapture#92 of ? extends java.util.SortedMap) Did you encounter this if so any help on this would be appreciated. Best Regards, Pulasthi On Tue, Nov 19, 2013 at 9:42 PM, Tal Sliwowicz ta...@taboola.com wrote: Hi, I'm trying to use data stored in cassandra (v1.2) and need some help. I've translated the the scala example - CassandraTest.scala - to Java, but I keep
Re: Persisting MatrixFactorizationModel
Thanks a lot Evan... On Wed, Dec 4, 2013 at 8:31 PM, Evan R. Sparks evan.spa...@gmail.comwrote: Ah, actually - I just remembered that the user and product features of the model are RDDs, so - you might be better off saving those components to HDFS and then at load time reading them back in and creating a new MatrixFactorizationModel. Sorry for the confusion! Note, the above solution only works if you want to deploy your model to a spark cluster. If the model is small enough and you really want to deploy it to several hosts, you could consider calling collect() on its components and then serializing the results as I suggested before. In general these models are usually pretty small (order of MB), so that's not such a bad option - when you get to 10s of millions of users or products, then you might consider pre-materializing some pieces of it (e.g. calculate top 100 predictions for all users or something) and save those intermediate results to serve up. - Evan On Wed, Dec 4, 2013 at 9:54 AM, Aslan Bekirov aslanbeki...@gmail.comwrote: I thought to convert model to RDD and save to HDFS, and then load it. I will try your method. Thanks a lot. On Wed, Dec 4, 2013 at 7:41 PM, Evan R. Sparks evan.spa...@gmail.comwrote: The model is serializable - so you should be able to write it out to disk and load it up in another program. See, e.g. - https://gist.github.com/ramn/5566596 (Note, I haven't tested this particular example, but it looks alright). Spark makes use of this type of scala (and kryo, etc.) serialization internally, so you can check the Spark codebase for more examples. On Wed, Dec 4, 2013 at 9:34 AM, Aslan Bekirov aslanbeki...@gmail.comwrote: Hi All, I am creating a model by calling train method of ALS. val model = ALS.train(ratings.) I need to persist this model. Use it from different clients, enable clients to make predictions using this model. In other words, persist and reload this model. Any suggestions, please? BR, Aslan
Re: How to balance task load
Hi Hao, Where tasks go is influenced by where the data they operate on resides. If the data is on one executor, it may make more sense to do all the computation on that node rather than ship data across the network. How was the data distributed across your cluster? Andrew On Mon, Dec 2, 2013 at 7:52 AM, Hao REN julien19890...@gmail.com wrote: Sorry for spam. To complete the my previous post: The map action sometimes creates 4 tasks which are all executed by the same executor. I believe that if a task dispatch like: executor_0 : 1 task; executor_1 : 1 task; executor_2 : 2 task; it will give a better performance. Can we force this kind of schedule in Spark ? Thank you. 2013/12/2 Hao REN julien19890...@gmail.com Hi, When running some tests on EC2 with spark, I notice that: the tasks are not fairly distributed to executor. For example, a map action produces 4 tasks, but they all go to the Executors (3) - *Memory:* 0.0 B Used (19.0 GB Total) - *Disk:* 0.0 B Used Executor IDAddress RDD blocksMemory used Disk usedActive tasks Failed tasksComplete tasks Total tasks 0 ip-10-10-141-143.ec2.internal:52816 00.0 B / 6.3 GB0.0 B40041 ip-10-40-38-190.ec2.internal:60314 0 0.0 B / 6.3 GB 0.0 B0 0 00 2ip-10-62-35-223.ec2.internal:405.0 B / 6.3 GB0.0 B -- REN Hao Data Engineer @ ClaraVista Paris, France Tel: +33 06 14 54 57 24
Re: How to balance task load
Hi Andrew, My data was loaded in HDFS. Actually, I got the answer from the spark-user google group. Patrick said: All cores in the cluster are considered fungible since the tasks are completely parallel. So until you run out of cores on any given node, it might get all the tasks. In some cases this provides *better* performance because you aren't moving data around as much. Thank you for your reply. =) 2013/12/5 Andrew Ash and...@andrewash.com Hi Hao, Where tasks go is influenced by where the data they operate on resides. If the data is on one executor, it may make more sense to do all the computation on that node rather than ship data across the network. How was the data distributed across your cluster? Andrew On Mon, Dec 2, 2013 at 7:52 AM, Hao REN julien19890...@gmail.com wrote: Sorry for spam. To complete the my previous post: The map action sometimes creates 4 tasks which are all executed by the same executor. I believe that if a task dispatch like: executor_0 : 1 task; executor_1 : 1 task; executor_2 : 2 task; it will give a better performance. Can we force this kind of schedule in Spark ? Thank you. 2013/12/2 Hao REN julien19890...@gmail.com Hi, When running some tests on EC2 with spark, I notice that: the tasks are not fairly distributed to executor. For example, a map action produces 4 tasks, but they all go to the Executors (3) - *Memory:* 0.0 B Used (19.0 GB Total) - *Disk:* 0.0 B Used Executor IDAddress RDD blocksMemory used Disk usedActive tasks Failed tasksComplete tasks Total tasks 0 ip-10-10-141-143.ec2.internal:52816 00.0 B / 6.3 GB0.0 B40041 ip-10-40-38-190.ec2.internal:60314 0 0.0 B / 6.3 GB 0.0 B0 0 00 2ip-10-62-35-223.ec2.internal:405.0 B / 6.3 GB0.0 B -- REN Hao Data Engineer @ ClaraVista Paris, France Tel: +33 06 14 54 57 24 -- REN Hao Data Engineer @ ClaraVista Paris, France Tel: +33 06 14 54 57 24
Re: Bagel caching issues
Hi, Maybe you need to check those nodes. It's very slow. 3487SUCCESS PROCESS_LOCAL ip-10-60-150-111.ec2.internal 2013/12/01 02:11:38 17.7 m 16.3 m 23.3 MB 3447SUCCESS PROCESS_LOCAL ip-10-12-54-63.ec2.internal 2013/12/01 02:11:26 20.1 m 13.9 m 50.9 MB 在 2013年12月1日,上午10:59,Mayuresh Kunjir mayuresh.kun...@gmail.com 写道: I tried passing DISK_ONLY storage level to Bagel's run method. It's running without any error (so far) but is too slow. I am attaching details for a stage corresponding to second iteration of my algorithm. (foreach at Bagel.scala:237) It's been running for more than 35 minutes. I am noticing very high GC time for some tasks. Listing below the setup parameters. #nodes = 16 SPARK_WORKER_MEMORY = 13G SPARK_MEM = 13G RDD storage fraction = 0.5 degree of parallelism = 192 (16 nodes * 4 cores each * 3) Serializer = Kryo Vertex data size after serialization = ~12G (probably too high, but it's the bare minimum required for the algorithm.) I would be grateful if you could suggest some further optimizations or point out reasons why/if Bagel is not suitable for this data size. I need to further scale my cluster and not feeling confident at all looking at this. Thanks and regards, ~Mayuresh On Sat, Nov 30, 2013 at 3:07 PM, Mayuresh Kunjir mayuresh.kun...@gmail.com wrote: Hi Spark users, I am running a pagerank-style algorithm on Bagel and bumping into out of memory issues with that. Referring to the following table, rdd_120 is the rdd of vertices, serialized and compressed in memory. On each iteration, Bagel deserializes the compressed rdd. e.g. rdd_126 shows the uncompressed version of rdd_120 persisted in memory and disk. As iterations keep piling on, the cached partitions start getting evicted. The moment a rdd_120 partition gets evicted, it necessitates a recomputations and the performance goes for a toss. Although we don't need uncompressed rdds from previous iterations, they are the last ones to get evicted thanks to LRU policy. Should I make Bagel use DISK_ONLY persistence? How much of a performance hit would that be? Or maybe there is a better solution here. Storage RDD Name Storage LevelCached Partitions Fraction Cached Size in Memory Size on Disk rdd_83Memory Serialized1x Replicated 23 12% 83.7 MB 0.0 B rdd_95Memory Serialized1x Replicated 23 12% 2.5 MB 0.0 B rdd_120 Memory Serialized1x Replicated 25 13% 761.1 MB 0.0 B rdd_126 Disk Memory Deserialized 1x Replicated 192100%77.9 GB 1016.5 MB rdd_134 Disk Memory Deserialized 1x Replicated 185 96% 60.8 GB 475.4 MB Thanks and regards, ~Mayuresh BigFrame - Details for Stage 23.htm
RE: Pre-build Spark for Windows 8.1
Excellent! Thank you, Matei. From: Matei Zaharia [mailto:matei.zaha...@gmail.com] Sent: Wednesday, December 4, 2013 4:26 PM To: user@spark.incubator.apache.org Subject: Re: Pre-build Spark for Windows 8.1 Hey Adrian, Ideally you shouldn't use Cygwin to run on Windows - use the .cmd scripts we provide instead. Cygwin might be made to work but we haven't tried to do this so far so it's not supported. If you can fix it, that would of course be welcome. Also, the deploy scripts don't work on Windows - we assumed that people would mostly use Windows for local development and testing. However, if you do want to launch a cluster, you can use spark-class.cmd manually to launch a Master and then Workers. Use the commands in the first part of http://spark.incubator.apache.org/docs/latest/spark-standalone.html. Matei On Dec 4, 2013, at 2:31 PM, Adrian Bonar adrian.bo...@microsoft.commailto:adrian.bo...@microsoft.com wrote: Separate from my previous thread about building a distribution of Spark on Win8, I am also trying to run the pre-build binaries with little success. I downloaded and extract spark-0.8.0-incubating-bin-hadoop1 to d:\spark and attempted to start a master with the following error: $ sh bin/start-master.sh starting org.apache.spark.deploy.master.Master, logging to /cygdrive/d/spark/bin/../logs/spark-adribona-org.apache.spark.deploy.master.Master-1-ADRIBONA-DEV-1.out failed to launch org.apache.spark.deploy.master.Master: Error: Could not find or load main class org.apache.spark.deploy.master.Master full log in /cygdrive/d/spark/bin/../logs/spark-adribona-org.apache.spark.deploy.master.Master-1-ADRIBONA-DEV-1.out --Adrian (again. :))
RE: Pre-build Spark for Windows 8.1
The master starts up now as expected but the workers are unable to connect to the master. It looks like the master is refusing the connection messages but I'm not sure why. The first two error lines below are from trying to connect a worker from a separate machine and the last two error lines are from trying to connect a worker on the same machine as the master. I verified that the workers do not show up in the master's web ui. MASTER: D:\sparkspark-class org.apache.spark.deploy.master.Master 13/12/05 08:08:34 INFO slf4j.Slf4jEventHandler: Slf4jEventHandler started 13/12/05 08:08:34 INFO master.Master: Starting Spark master at spark://ADRIBONA-DEV-1:7077 13/12/05 08:08:34 INFO server.Server: jetty-7.x.y-SNAPSHOT 13/12/05 08:08:34 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/metrics/master/json,null} 13/12/05 08:08:34 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/metrics/applications/json,null} 13/12/05 08:08:34 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/static,null} 13/12/05 08:08:34 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/app/json,null} 13/12/05 08:08:34 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/app,null} 13/12/05 08:08:34 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/json,null} 13/12/05 08:08:34 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{*,null} 13/12/05 08:08:34 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:8088 13/12/05 08:08:34 INFO ui.MasterWebUI: Started Master web UI at http://ADRIBONA-DEV-1:8088 13/12/05 08:09:15 ERROR NettyRemoteTransport(null): dropping message RegisterWorker(worker-20131205080912-ADRIBONA-SFC-1-19280,ADRIBONA-SFC-1,19280,2,512,8081,ADRIBONA-SFC-1) for non-local recipient akka://sparkMaster@adribona-dev-1:7077/user/Master at akka://sparkMaster@ADRIBONA-DEV-1:7077 local is akka://sparkMaster@ADRIBONA-DEV-1:7077 13/12/05 08:09:15 ERROR NettyRemoteTransport(null): dropping message DaemonMsgWatch(Actor[akka://sparkWorker@ADRIBONA-SFC-1:19280/user/Worker],Actor[akka://sparkMaster@adribona-dev-1:7077/user/Master]) for non-local recipient akka://sparkMaster@adribona-dev-1:7077/remote at akka://sparkMaster@ADRIBONA-DEV-1:7077 local is akka://sparkMaster@ADRIBONA-DEV-1:7077 13/12/05 08:18:46 ERROR NettyRemoteTransport(null): dropping message RegisterWorker(worker-20131205081846-ADRIBONA-DEV-1-13521,ADRIBONA-DEV-1,13521,8,31670,8081,ADRIBONA-DEV-1) for non-local recipient akka://sparkMaster@adribona-dev-1:7077/user/Master at akka://sparkMaster@ADRIBONA-DEV-1:7077 local is akka://sparkMaster@ADRIBONA-DEV-1:7077 13/12/05 08:18:46 ERROR NettyRemoteTransport(null): dropping message DaemonMsgWatch(Actor[akka://sparkWorker@ADRIBONA-DEV-1:13521/user/Worker],Actor[akka://sparkMaster@adribona-dev-1:7077/user/Master]) for non-local recipient akka://sparkMaster@adribona-dev-1:7077/remote at akka://sparkMaster@ADRIBONA-DEV-1:7077 local is akka://sparkMaster@ADRIBONA-DEV-1:7077 WORKER: D:\sparkspark-class.cmd org.apache.spark.deploy.worker.Worker spark://adribona-dev-1:7077 13/12/05 08:18:46 INFO slf4j.Slf4jEventHandler: Slf4jEventHandler started 13/12/05 08:18:46 INFO worker.Worker: Starting Spark worker ADRIBONA-DEV-1:13521 with 8 cores, 30.9 GB RAM 13/12/05 08:18:46 INFO worker.Worker: Spark home: D:\spark 13/12/05 08:18:46 INFO server.Server: jetty-7.x.y-SNAPSHOT 13/12/05 08:18:46 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/metrics/json,null} 13/12/05 08:18:46 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/static,null} 13/12/05 08:18:46 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/log,null} 13/12/05 08:18:46 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/logPage,null} 13/12/05 08:18:46 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/json,null} 13/12/05 08:18:46 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{*,null} 13/12/05 08:18:46 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:8081 13/12/05 08:18:46 INFO ui.WorkerWebUI: Started Worker web UI at http://ADRIBONA-DEV-1:8081 13/12/05 08:18:46 INFO worker.Worker: Connecting to master spark://adribona-dev-1:7077 --Adrian From: Adrian Bonar [mailto:adrian.bo...@microsoft.com] Sent: Thursday, December 5, 2013 7:49 AM To: user@spark.incubator.apache.org Subject: RE: Pre-build Spark for Windows 8.1 Excellent! Thank you, Matei. From: Matei Zaharia [mailto:matei.zaha...@gmail.com] Sent: Wednesday, December 4, 2013 4:26 PM To: user@spark.incubator.apache.orgmailto:user@spark.incubator.apache.org Subject: Re: Pre-build Spark for Windows 8.1 Hey Adrian, Ideally you shouldn't use Cygwin to run on Windows - use the .cmd scripts we provide instead. Cygwin might be made to work but we haven't tried to do this so far so it's not supported. If you can fix it, that would of course be welcome. Also, the deploy scripts don't work on
Writing to HBase
Does anyone have an example or some sort of starting point code when writing from Spark Streaming into HBase? We currently stream ad server event log data using Flume-NG to tail log entries, collect them, and put them directly into a HBase table. We would like to do the same with Spark Streaming. But, we would like to do the data massaging and simple data analysis before. This will cut down the steps in prepping data and the number of tables for our data scientists and real-time feedback systems. Thanks,Ben
Re: Writing to HBase
Here's a good place to start: http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201311.mbox/%3ccacyzca3askwd-tujhqi1805bn7sctguaoruhd5xtxcsul1a...@mail.gmail.com%3E On 12/5/2013 10:18 AM, Benjamin Kim wrote: Does anyone have an example or some sort of starting point code when writing from Spark Streaming into HBase? We currently stream ad server event log data using Flume-NG to tail log entries, collect them, and put them directly into a HBase table. We would like to do the same with Spark Streaming. But, we would like to do the data massaging and simple data analysis before. This will cut down the steps in prepping data and the number of tables for our data scientists and real-time feedback systems. Thanks, Ben
Re: Bagel caching issues
The variability in task completion times could be caused by variability in the amount of work that those tasks perform rather than slow or faulty nodes. For PageRank, consider a link graph contains a few disproportionately popular webpages that have many inlinks (such as Yahoo.com). These high-degree nodes may cause significant communications imbalances because they receive and send many messages in a Pregel-like model. If you look at the distribution of shuffled data sizes, does it exhibit similar skew to the task completion times? The PowerGraph paper gives a good overview of the challenges posed by these types of large-scale natural-graphs and develops techniques to split up and parallelize the processing of these high-degree nodes: http://graphlab.org/powergraph-presented-at-osdi/ On Thu, Dec 5, 2013 at 6:54 AM, Mayuresh Kunjir mayuresh.kun...@gmail.comwrote: Thanks Jay for your response. Stragglers are a big problem here. I am seeing such tasks in many stages of the workflow on a consistent basis. It's not due to any particular nodes being slow since the slow tasks are observed on all the nodes at different points in time. The distribution of task completion times is too skewed for my liking. GC delays is a possible reason, but I am just speculating. ~Mayuresh On Thu, Dec 5, 2013 at 5:31 AM, huangjay ja...@live.cn wrote: Hi, Maybe you need to check those nodes. It's very slow. 3487SUCCESSPROCESS_LOCALip-10-60-150-111.ec2.internal 2013/12/01 02:11:3817.7 m16.3 m 23.3 MB3447SUCCESS PROCESS_LOCALip-10-12-54-63.ec2.internal2013/12/01 02:11:26 20.1 m13.9 m50.9 MB 在 2013年12月1日,上午10:59,Mayuresh Kunjir mayuresh.kun...@gmail.com 写道: I tried passing DISK_ONLY storage level to Bagel's run method. It's running without any error (so far) but is too slow. I am attaching details for a stage corresponding to second iteration of my algorithm. (foreach at Bagel.scala:237http://ec2-54-234-176-171.compute-1.amazonaws.com:4040/stages/stage?id=23) It's been running for more than 35 minutes. I am noticing very high GC time for some tasks. Listing below the setup parameters. #nodes = 16 SPARK_WORKER_MEMORY = 13G SPARK_MEM = 13G RDD storage fraction = 0.5 degree of parallelism = 192 (16 nodes * 4 cores each * 3) Serializer = Kryo Vertex data size after serialization = ~12G (probably too high, but it's the bare minimum required for the algorithm.) I would be grateful if you could suggest some further optimizations or point out reasons why/if Bagel is not suitable for this data size. I need to further scale my cluster and not feeling confident at all looking at this. Thanks and regards, ~Mayuresh On Sat, Nov 30, 2013 at 3:07 PM, Mayuresh Kunjir mayuresh.kun...@gmail.com wrote: Hi Spark users, I am running a pagerank-style algorithm on Bagel and bumping into out of memory issues with that. Referring to the following table, rdd_120 is the rdd of vertices, serialized and compressed in memory. On each iteration, Bagel deserializes the compressed rdd. e.g. rdd_126 shows the uncompressed version of rdd_120 persisted in memory and disk. As iterations keep piling on, the cached partitions start getting evicted. The moment a rdd_120 partition gets evicted, it necessitates a recomputations and the performance goes for a toss. Although we don't need uncompressed rdds from previous iterations, they are the last ones to get evicted thanks to LRU policy. Should I make Bagel use DISK_ONLY persistence? How much of a performance hit would that be? Or maybe there is a better solution here. Storage RDD NameStorage Level Cached PartitionsFraction Cached Size in MemorySize on Disk rdd_83http://ec2-54-234-176-171.compute-1.amazonaws.com:4040/storage/rdd?id=83Memory Serialized1x Replicated2312%83.7 MB0.0 B rdd_95http://ec2-54-234-176-171.compute-1.amazonaws.com:4040/storage/rdd?id=95Memory Serialized1x Replicated23 12% 2.5 MB 0.0 B rdd_120http://ec2-54-234-176-171.compute-1.amazonaws.com:4040/storage/rdd?id=120Memory Serialized1x Replicated2513%761.1 MB0.0 B rdd_126http://ec2-54-234-176-171.compute-1.amazonaws.com:4040/storage/rdd?id=126Disk Memory Deserialized 1x Replicated192 100% 77.9 GB 1016.5 MB rdd_134http://ec2-54-234-176-171.compute-1.amazonaws.com:4040/storage/rdd?id=134Disk Memory Deserialized 1x Replicated18596%60.8 GB475.4 MB Thanks and regards, ~Mayuresh BigFrame - Details for Stage 23.htm
Re: takeSample() computation
Hi Matt, Try using take() instead, which will only begin computing from the start of the RDD (first partition) if the number of elements you ask for is small. Note that if you’re doing any shuffle operations, like groupBy or sort, then the stages before that do have to be computed fully. Matei On Dec 5, 2013, at 10:13 AM, Matt Cheah mch...@palantir.com wrote: Hi everyone, I have a question about RDD.takeSample(). This is an action, not a transformation – but is any optimization made to reduce the amount of computation that's done, for example only running the transformations over a smaller subset of the data since only a sample will be returned as a result? The context is, I'm trying to measure the amount of time a set of transformations takes on our dataset without persisting to disk. So I want to stack the operations on the RDD and then invoke an action that doesn't save the result to disk but can still give me a good idea of how long transforming the whole dataset takes. Thanks, -Matt Cheah
Re: takeSample() computation
Actually, we want the opposite – we want as much data to be computed as possible. It's only for benchmarking purposes, of course. -Matt Cheah From: Matei Zaharia matei.zaha...@gmail.commailto:matei.zaha...@gmail.com Reply-To: user@spark.incubator.apache.orgmailto:user@spark.incubator.apache.org user@spark.incubator.apache.orgmailto:user@spark.incubator.apache.org Date: Thursday, December 5, 2013 10:31 AM To: user@spark.incubator.apache.orgmailto:user@spark.incubator.apache.org user@spark.incubator.apache.orgmailto:user@spark.incubator.apache.org Cc: Mingyu Kim m...@palantir.commailto:m...@palantir.com Subject: Re: takeSample() computation Hi Matt, Try using take() instead, which will only begin computing from the start of the RDD (first partition) if the number of elements you ask for is small. Note that if you’re doing any shuffle operations, like groupBy or sort, then the stages before that do have to be computed fully. Matei On Dec 5, 2013, at 10:13 AM, Matt Cheah mch...@palantir.commailto:mch...@palantir.com wrote: Hi everyone, I have a question about RDD.takeSample(). This is an action, not a transformation – but is any optimization made to reduce the amount of computation that's done, for example only running the transformations over a smaller subset of the data since only a sample will be returned as a result? The context is, I'm trying to measure the amount of time a set of transformations takes on our dataset without persisting to disk. So I want to stack the operations on the RDD and then invoke an action that doesn't save the result to disk but can still give me a good idea of how long transforming the whole dataset takes. Thanks, -Matt Cheah
Re: Pre-build Spark for Windows 8.1
Hi, When you launch the worker, try using spark://ADRIBONA-DEV-1:7077 as the URL (uppercase instead of lowercase). Unfortunately Akka is very specific about seeing hostnames written in the same way on each node, or else it thinks the message is for another machine! Matei On Dec 5, 2013, at 8:27 AM, Adrian Bonar adrian.bo...@microsoft.com wrote: The master starts up now as expected but the workers are unable to connect to the master. It looks like the master is refusing the connection messages but I’m not sure why. The first two error lines below are from trying to connect a worker from a separate machine and the last two error lines are from trying to connect a worker on the same machine as the master. I verified that the workers do not show up in the master’s web ui. MASTER: D:\sparkspark-class org.apache.spark.deploy.master.Master 13/12/05 08:08:34 INFO slf4j.Slf4jEventHandler: Slf4jEventHandler started 13/12/05 08:08:34 INFO master.Master: Starting Spark master at spark://ADRIBONA-DEV-1:7077 13/12/05 08:08:34 INFO server.Server: jetty-7.x.y-SNAPSHOT 13/12/05 08:08:34 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/metrics/master/json,null} 13/12/05 08:08:34 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/metrics/applications/json,null} 13/12/05 08:08:34 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/static,null} 13/12/05 08:08:34 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/app/json,null} 13/12/05 08:08:34 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/app,null} 13/12/05 08:08:34 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/json,null} 13/12/05 08:08:34 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{*,null} 13/12/05 08:08:34 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:8088 13/12/05 08:08:34 INFO ui.MasterWebUI: Started Master web UI at http://ADRIBONA-DEV-1:8088 13/12/05 08:09:15 ERROR NettyRemoteTransport(null): dropping message RegisterWorker(worker-20131205080912-ADRIBONA-SFC-1-19280,ADRIBONA-SFC-1,19280,2,512,8081,ADRIBONA-SFC-1) for non-local recipient akka://sparkMaster@adribona-dev-1:7077/user/Master at akka://sparkMaster@ADRIBONA-DEV-1:7077 local is akka://sparkMaster@ADRIBONA-DEV-1:7077 13/12/05 08:09:15 ERROR NettyRemoteTransport(null): dropping message DaemonMsgWatch(Actor[akka://sparkWorker@ADRIBONA-SFC-1:19280/user/Worker],Actor[akka://sparkMaster@adribona-dev-1:7077/user/Master]) for non-local recipient akka://sparkMaster@adribona-dev-1:7077/remote at akka://sparkMaster@ADRIBONA-DEV-1:7077 local is akka://sparkMaster@ADRIBONA-DEV-1:7077 13/12/05 08:18:46 ERROR NettyRemoteTransport(null): dropping message RegisterWorker(worker-20131205081846-ADRIBONA-DEV-1-13521,ADRIBONA-DEV-1,13521,8,31670,8081,ADRIBONA-DEV-1) for non-local recipient akka://sparkMaster@adribona-dev-1:7077/user/Master atakka://sparkMaster@ADRIBONA-DEV-1:7077 local is akka://sparkMaster@ADRIBONA-DEV-1:7077 13/12/05 08:18:46 ERROR NettyRemoteTransport(null): dropping message DaemonMsgWatch(Actor[akka://sparkWorker@ADRIBONA-DEV-1:13521/user/Worker],Actor[akka://sparkMaster@adribona-dev-1:7077/user/Master]) for non-local recipient akka://sparkMaster@adribona-dev-1:7077/remote at akka://sparkMaster@ADRIBONA-DEV-1:7077 local is akka://sparkMaster@ADRIBONA-DEV-1:7077 WORKER: D:\sparkspark-class.cmd org.apache.spark.deploy.worker.Worker spark://adribona-dev-1:7077 13/12/05 08:18:46 INFO slf4j.Slf4jEventHandler: Slf4jEventHandler started 13/12/05 08:18:46 INFO worker.Worker: Starting Spark worker ADRIBONA-DEV-1:13521 with 8 cores, 30.9 GB RAM 13/12/05 08:18:46 INFO worker.Worker: Spark home: D:\spark 13/12/05 08:18:46 INFO server.Server: jetty-7.x.y-SNAPSHOT 13/12/05 08:18:46 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/metrics/json,null} 13/12/05 08:18:46 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/static,null} 13/12/05 08:18:46 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/log,null} 13/12/05 08:18:46 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/logPage,null} 13/12/05 08:18:46 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/json,null} 13/12/05 08:18:46 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{*,null} 13/12/05 08:18:46 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:8081 13/12/05 08:18:46 INFO ui.WorkerWebUI: Started Worker web UI at http://ADRIBONA-DEV-1:8081 13/12/05 08:18:46 INFO worker.Worker: Connecting to master spark://adribona-dev-1:7077 --Adrian From: Adrian Bonar [mailto:adrian.bo...@microsoft.com] Sent: Thursday, December 5, 2013 7:49 AM To: user@spark.incubator.apache.org Subject: RE: Pre-build Spark for Windows 8.1 Excellent! Thank you, Matei. From: Matei Zaharia [mailto:matei.zaha...@gmail.com] Sent:
Re: Pre-build Spark for Windows 8.1
Speaking of akka and host sensitivity... How much have you hacked on akka to get it to support all of: myhost.mydomain.int, myhost, and 10.1.1.1? It's kind of a pain to get the Spark URL to exactly match. I'm wondering if there are usability gains that could be made here or if we're pretty stuck. On Thu, Dec 5, 2013 at 2:43 PM, Matei Zaharia matei.zaha...@gmail.comwrote: Hi, When you launch the worker, try using spark://ADRIBONA-DEV-1:7077 as the URL (uppercase instead of lowercase). Unfortunately Akka is very specific about seeing hostnames written in the same way on each node, or else it thinks the message is for another machine! Matei On Dec 5, 2013, at 8:27 AM, Adrian Bonar adrian.bo...@microsoft.com wrote: The master starts up now as expected but the workers are unable to connect to the master. It looks like the master is refusing the connection messages but I’m not sure why. The first two error lines below are from trying to connect a worker from a separate machine and the last two error lines are from trying to connect a worker on the same machine as the master. I verified that the workers do not show up in the master’s web ui. MASTER: D:\sparkspark-class org.apache.spark.deploy.master.Master 13/12/05 08:08:34 INFO slf4j.Slf4jEventHandler: Slf4jEventHandler started 13/12/05 08:08:34 INFO master.Master: Starting Spark master at spark://ADRIBONA-DEV-1:7077 13/12/05 08:08:34 INFO server.Server: jetty-7.x.y-SNAPSHOT 13/12/05 08:08:34 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/metrics/master/json,null} 13/12/05 08:08:34 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/metrics/applications/json,null} 13/12/05 08:08:34 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/static,null} 13/12/05 08:08:34 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/app/json,null} 13/12/05 08:08:34 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/app,null} 13/12/05 08:08:34 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/json,null} 13/12/05 08:08:34 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{*,null} 13/12/05 08:08:34 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:8088 13/12/05 08:08:34 INFO ui.MasterWebUI: Started Master web UI at http://ADRIBONA-DEV-1:8088 http://adribona-dev-1:8088/ 13/12/05 08:09:15 *ERROR* NettyRemoteTransport(null): dropping message RegisterWorker(worker-20131205080912-ADRIBONA-SFC-1-19280,ADRIBONA-SFC-1,19280,2,512,8081,ADRIBONA-SFC-1) for non-local recipient akka://sparkMaster@adribona-dev-1:7077/user/Master at akka://sparkMaster@ADRIBONA-DEV-1:7077 local is akka://sparkMaster@ADRIBONA-DEV-1:7077 13/12/05 08:09:15 *ERROR* NettyRemoteTransport(null): dropping message DaemonMsgWatch(Actor[ akka://sparkWorker@ADRIBONA-SFC-1:19280/user/Worker],Actor[akka://sparkMaster@adribona-dev-1:7077/user/Master]) for non-local recipient akka://sparkMaster@adribona-dev-1:7077/remote at akka://sparkMaster@ADRIBONA-DEV-1:7077 local is akka://sparkMaster@ADRIBONA-DEV-1:7077 13/12/05 08:18:46 *ERROR* NettyRemoteTransport(null): dropping message RegisterWorker(worker-20131205081846-ADRIBONA-DEV-1-13521,ADRIBONA-DEV-1,13521,8,31670,8081,ADRIBONA-DEV-1) for non-local recipient akka://sparkMaster@adribona-dev-1:7077/user/Master atakka://sparkMaster@ADRIBONA-DEV-1:7077 local is akka://sparkMaster@ADRIBONA-DEV-1:7077 13/12/05 08:18:46 *ERROR* NettyRemoteTransport(null): dropping message DaemonMsgWatch(Actor[ akka://sparkWorker@ADRIBONA-DEV-1:13521/user/Worker],Actor[akka://sparkMaster@adribona-dev-1:7077/user/Master]) for non-local recipient akka://sparkMaster@adribona-dev-1:7077/remote at akka://sparkMaster@ADRIBONA-DEV-1:7077 local is akka://sparkMaster@ADRIBONA-DEV-1:7077 WORKER: D:\sparkspark-class.cmd org.apache.spark.deploy.worker.Worker spark://adribona-dev-1:7077 13/12/05 08:18:46 INFO slf4j.Slf4jEventHandler: Slf4jEventHandler started 13/12/05 08:18:46 INFO worker.Worker: Starting Spark worker ADRIBONA-DEV-1:13521 with 8 cores, 30.9 GB RAM 13/12/05 08:18:46 INFO worker.Worker: Spark home: D:\spark 13/12/05 08:18:46 INFO server.Server: jetty-7.x.y-SNAPSHOT 13/12/05 08:18:46 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/metrics/json,null} 13/12/05 08:18:46 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/static,null} 13/12/05 08:18:46 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/log,null} 13/12/05 08:18:46 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/logPage,null} 13/12/05 08:18:46 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/json,null} 13/12/05 08:18:46 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{*,null} 13/12/05 08:18:46 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:8081 13/12/05 08:18:46 INFO ui.WorkerWebUI: Started Worker web UI at http://ADRIBONA-DEV-1:8081
Re: takeSample() computation
Ah, got it. Then takeSample is going to do what you want, because it needs a uniform sample. If you don’t want any result at all, you can also use RDD.foreach() with an empty function. Matei On Dec 5, 2013, at 12:54 PM, Matt Cheah mch...@palantir.com wrote: Actually, we want the opposite – we want as much data to be computed as possible. It's only for benchmarking purposes, of course. -Matt Cheah From: Matei Zaharia matei.zaha...@gmail.com Reply-To: user@spark.incubator.apache.org user@spark.incubator.apache.org Date: Thursday, December 5, 2013 10:31 AM To: user@spark.incubator.apache.org user@spark.incubator.apache.org Cc: Mingyu Kim m...@palantir.com Subject: Re: takeSample() computation Hi Matt, Try using take() instead, which will only begin computing from the start of the RDD (first partition) if the number of elements you ask for is small. Note that if you’re doing any shuffle operations, like groupBy or sort, then the stages before that do have to be computed fully. Matei On Dec 5, 2013, at 10:13 AM, Matt Cheah mch...@palantir.com wrote: Hi everyone, I have a question about RDD.takeSample(). This is an action, not a transformation – but is any optimization made to reduce the amount of computation that's done, for example only running the transformations over a smaller subset of the data since only a sample will be returned as a result? The context is, I'm trying to measure the amount of time a set of transformations takes on our dataset without persisting to disk. So I want to stack the operations on the RDD and then invoke an action that doesn't save the result to disk but can still give me a good idea of how long transforming the whole dataset takes. Thanks, -Matt Cheah
RE: Pre-build Spark for Windows 8.1
Strange, but that definitely did the trick. Thanks again! From: Matei Zaharia [mailto:matei.zaha...@gmail.com] Sent: Thursday, December 5, 2013 2:44 PM To: user@spark.incubator.apache.org Subject: Re: Pre-build Spark for Windows 8.1 Hi, When you launch the worker, try using spark://ADRIBONA-DEV-1:7077 as the URL (uppercase instead of lowercase). Unfortunately Akka is very specific about seeing hostnames written in the same way on each node, or else it thinks the message is for another machine! Matei On Dec 5, 2013, at 8:27 AM, Adrian Bonar adrian.bo...@microsoft.commailto:adrian.bo...@microsoft.com wrote: The master starts up now as expected but the workers are unable to connect to the master. It looks like the master is refusing the connection messages but I'm not sure why. The first two error lines below are from trying to connect a worker from a separate machine and the last two error lines are from trying to connect a worker on the same machine as the master. I verified that the workers do not show up in the master's web ui. MASTER: D:\sparkspark-class org.apache.spark.deploy.master.Master 13/12/05 08:08:34 INFO slf4j.Slf4jEventHandler: Slf4jEventHandler started 13/12/05 08:08:34 INFO master.Master: Starting Spark master at spark://ADRIBONA-DEV-1:7077 13/12/05 08:08:34 INFO server.Server: jetty-7.x.y-SNAPSHOT 13/12/05 08:08:34 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/metrics/master/json,null} 13/12/05 08:08:34 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/metrics/applications/json,null} 13/12/05 08:08:34 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/static,null} 13/12/05 08:08:34 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/app/json,null} 13/12/05 08:08:34 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/app,null} 13/12/05 08:08:34 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/json,null} 13/12/05 08:08:34 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{*,null} 13/12/05 08:08:34 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0mailto:SelectChannelConnector@0.0.0.0:8088 13/12/05 08:08:34 INFO ui.MasterWebUI: Started Master web UI at http://ADRIBONA-DEV-1:8088http://adribona-dev-1:8088/ 13/12/05 08:09:15 ERROR NettyRemoteTransport(null): dropping message RegisterWorker(worker-20131205080912-ADRIBONA-SFC-1-19280,ADRIBONA-SFC-1,19280,2,512,8081,ADRIBONA-SFC-1) for non-local recipient akka://sparkMaster@adribona-dev-1:7077/user/Master at akka://sparkMaster@ADRIBONA-DEV-1:7077 local is akka://sparkMaster@ADRIBONA-DEV-1:7077 13/12/05 08:09:15 ERROR NettyRemoteTransport(null): dropping message DaemonMsgWatch(Actor[akka://sparkWorker@ADRIBONA-SFC-1:19280/user/Worker],Actor[akka://sparkMaster@adribona-dev-1:7077/user/Masterakka://sparkWorker@ADRIBONA-SFC-1:19280/user/Worker%5d,Actor%5bakka:/sparkMaster@adribona-dev-1:7077/user/Master]) for non-local recipient akka://sparkMaster@adribona-dev-1:7077/remote at akka://sparkMaster@ADRIBONA-DEV-1:7077 local is akka://sparkMaster@ADRIBONA-DEV-1:7077 13/12/05 08:18:46 ERROR NettyRemoteTransport(null): dropping message RegisterWorker(worker-20131205081846-ADRIBONA-DEV-1-13521,ADRIBONA-DEV-1,13521,8,31670,8081,ADRIBONA-DEV-1) for non-local recipient akka://sparkMaster@adribona-dev-1:7077/user/Master atakka://sparkMaster@ADRIBONA-DEV-1:7077 local is akka://sparkMaster@ADRIBONA-DEV-1:7077 13/12/05 08:18:46 ERROR NettyRemoteTransport(null): dropping message DaemonMsgWatch(Actor[akka://sparkWorker@ADRIBONA-DEV-1:13521/user/Worker],Actor[akka://sparkMaster@adribona-dev-1:7077/user/Masterakka://sparkWorker@ADRIBONA-DEV-1:13521/user/Worker%5d,Actor%5bakka:/sparkMaster@adribona-dev-1:7077/user/Master]) for non-local recipient akka://sparkMaster@adribona-dev-1:7077/remote at akka://sparkMaster@ADRIBONA-DEV-1:7077 local is akka://sparkMaster@ADRIBONA-DEV-1:7077 WORKER: D:\sparkspark-class.cmd org.apache.spark.deploy.worker.Worker spark://adribona-dev-1:7077 13/12/05 08:18:46 INFO slf4j.Slf4jEventHandler: Slf4jEventHandler started 13/12/05 08:18:46 INFO worker.Worker: Starting Spark worker ADRIBONA-DEV-1:13521 with 8 cores, 30.9 GB RAM 13/12/05 08:18:46 INFO worker.Worker: Spark home: D:\spark 13/12/05 08:18:46 INFO server.Server: jetty-7.x.y-SNAPSHOT 13/12/05 08:18:46 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/metrics/json,null} 13/12/05 08:18:46 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/static,null} 13/12/05 08:18:46 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/log,null} 13/12/05 08:18:46 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/logPage,null} 13/12/05 08:18:46 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/json,null} 13/12/05 08:18:46 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{*,null} 13/12/05 08:18:46 INFO server.AbstractConnector: Started
Re: Spark heap issues
Try allocating some more resources to your application. You seem to be using 512Mb for you worker node - (you can verify that from the master UI) Try putting the following settings into your code and see if it helps - System.setProperty(spark.executor.memory,15g) // Will allocate more memory System.setProperty(spark.akka.frameSize,2000) System.setProperty(spark.akka.threads,16) // Dependent upon number of cores with your worker machine On Fri, Dec 6, 2013 at 1:06 AM, learner1014 all learner1...@gmail.comwrote: Hi, Trying to do a join operation on an RDD, my input is pipe delimited data and there are 2 files. One file is 24MB and the other file is 285MB. Setup being used is the single node (server) setup: SPARK_MEM set to 512m Master /pkg/java/jdk1.7.0_11/bin/java -cp :/spark-0.8.0-incubating-bin-cdh4/conf:/spark-0.8.0-incubating-bin-cdh4/assembly/target/scala-2.9.3/spark-assembly-0.8.0-incubating-hadoop1.2.1.jar -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -Dspark.boundedMemoryCache.memoryFraction=0.4 -Dspark.cache.class=spark.DiskSpillingCache -XX:+UseConcMarkSweepGC -Djava.library.path= -Xms512m -Xmx512m org.apache.spark.deploy.master.Master --ip localhost --port 7077 --webui-port 8080 Worker /pkg/java/jdk1.7.0_11/bin/java -cp :/spark-0.8.0-incubating-bin-cdh4/conf:/spark-0.8.0-incubating-bin-cdh4/assembly/target/scala-2.9.3/spark-assembly-0.8.0-incubating-hadoop1.2.1.jar -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -Dspark.boundedMemoryCache.memoryFraction=0.4 -Dspark.cache.class=spark.DiskSpillingCache -XX:+UseConcMarkSweepGC -Djava.library.path= -Xms512m -Xmx512m org.apache.spark.deploy.worker.Worker spark://localhost:7077 App /pkg/java/jdk1.7.0_11/bin/java -cp :/spark-0.8.0-incubating-bin-cdh4/conf:/spark-0.8.0-incubating-bin-cdh4/assembly/target/scala-2.9.3/spark-assembly-0.8.0-incubating-hadoop1.2.1.jar:/spark-0.8.0-incubating-bin-cdh4/core/target/scala-2.9.3/test-classes:/spark-0.8.0-incubating-bin-cdh4/repl/target/scala-2.9.3/test-classes:/spark-0.8.0-incubating-bin-cdh4/mllib/target/scala-2.9.3/test-classes:/spark-0.8.0-incubating-bin-cdh4/bagel/target/scala-2.9.3/test-classes:/spark-0.8.0-incubating-bin-cdh4/streaming/target/scala-2.9.3/test-classes -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -Dspark.boundedMemoryCache.memoryFraction=0.4 -Dspark.cache.class=spark.DiskSpillingCache -XX:+UseConcMarkSweepGC -Xms512M -Xmx512M org.apache.spark.executor.StandaloneExecutorBackend akka://spark@localhost:33024/user/StandaloneScheduler 1 localhost 4 Here is the code import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.storage.StorageLevel object SimpleApp { def main (args: Array[String]) { System.setProperty(spark.local.dir,/spark-0.8.0-incubating-bin-cdh4/tmp); System.setProperty(spark.serializer, org.apache.spark.serializer.KryoSerializer) System.setProperty(spark.akka.timeout, 30) //in seconds val dataFile2 = /tmp_data/data1.txt val dataFile1 = /tmp_data/data2.txt val sc = new SparkContext(spark://localhost:7077, Simple App, /spark-0.8.0-incubating-bin-cdh4, List(target/scala-2.9.3/simple-project_2.9.3-1.0.jar)) val data10 = sc.textFile(dataFile1, 128) val data11 = data10.map(x = x.split(|)) val data12 = data11.map( x = (x(1).toInt - x) ) val data20 = sc.textFile(dataFile2, 128) val data21 = data20.map(x = x.split(|)) val data22 = data21.map(x = (x(1).toInt - x)) val data3 = data12.join(data22, 128) val data4 = data3.distinct(4) val numAs = data10.count() val numBs = data20.count() val numCs = data3.count() val numDs = data4.count() println(Lines in 1: %s, Lines in 2: %s Lines in 3: %s Lines in 4: %s.format(numAs, numBs, numCs, numDs)) data4.foreach(println) } I see the following errors 13/12/04 10:53:55 WARN storage.BlockManagerMaster: Error sending message to BlockManagerMaster in 1 attempts java.util.concurrent.TimeoutException: Futures timed out after [1] milliseconds at akka.dispatch.DefaultPromise.ready(Future.scala:870) at akka.dispatch.DefaultPromise.result(Future.scala:874) at akka.dispatch.Await$.result(Future.scala:74) and 13/12/04 10:53:55 ERROR executor.Executor: Exception in task ID 517 java.lang.OutOfMemoryError: Java heap space at com.esotericsoftware.kryo.io.Input.readString(Input.java:448) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$StringArraySerializer.read(DefaultArraySerializers.java:282) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$StringArraySerializer.read(DefaultArraySerializers.java:262) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:43) at