Re: executor failures w/ scala 2.10
Those log messages are new to the Akka 2.2 and are usually seen when a node is disassociated with other by either a network failure or even clean shutdown. This suggests some network issue to me, are you running on EC2 ? It might be a temporary thing in that case. I had like to have more details on the long jobs though, how long ? On Wed, Oct 30, 2013 at 1:29 AM, Imran Rashid wrote: > We've been testing out the 2.10 branch of spark, and we're running into > some issues were akka disconnects from the executors after a while. We ran > some simple tests first, and all was well, so we started upgrading our > whole codebase to 2.10. Everything seemed to be working, but then we > noticed that when we run long jobs, and then things start failing. > > > The first suspicious thing is that we get akka warnings about > undeliverable messages sent to deadLetters: > > 22013-10-29 11:03:54,577 [spark-akka.actor.default-dispatcher-17] INFO > akka.actor.LocalActorRef - Message > [akka.remote.transport.ActorTransportAdapter$DisassociateUnderlying] from > Actor[akka://spark/deadLetters] to > Actor[akka://spark/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2Fspark%4010.10.5.81%3A46572-3#656094700] > was not delivered. [4] dead letters encountered. This logging can be turned > off or adjusted with configuration settings 'akka.log-dead-letters' and > 'akka.log-dead-letters-during-shutdown'. > > 2013-10-29 11:03:54,579 [spark-akka.actor.default-dispatcher-19] INFO > akka.actor.LocalActorRef - Message > [akka.remote.transport.AssociationHandle$Disassociated] from > Actor[akka://spark/deadLetters] to > Actor[akka://spark/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2Fspark%4010.10.5.81%3A46572-3#656094700] > was not delivered. [5] dead letters encountered. This logging can be turned > off or adjusted with configuration settings 'akka.log-dead-letters' and > 'akka.log-dead-letters-during-shutdown'. > > > > Generally within a few seconds after the first such message, there are a > bunch more, and then the executor is marked as failed, and a new one is > started: > > 2013-10-29 11:03:58,775 [spark-akka.actor.default-dispatcher-3] INFO > akka.actor.LocalActorRef - Message > [akka.remote.transport.ActorTransportAdapter$DisassociateUnderlying] from > Actor[akka://spark/deadLetters] to > Actor[akka://spark/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FsparkExecutor% > 40dhd2.quantifind.com%3A45794-6#-890135716] was not delivered. [10] dead > letters encountered, no more dead letters will be logged. This logging can > be turned off or adjusted with configuration settings > 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. > > 2013-10-29 11:03:58,778 [spark-akka.actor.default-dispatcher-17] INFO > org.apache.spark.deploy.client.Client$ClientActor - Executor updated: > app-2013102911-/1 is now FAILED (Command exited with code 1) > > 2013-10-29 11:03:58,784 [spark-akka.actor.default-dispatcher-17] INFO > org.apache.spark.deploy.client.Client$ClientActor - Executor added: > app-2013102911-/2 on > worker-20131029105824-dhd2.quantifind.com-51544 (dhd2.quantifind.com:51544) > with 24 cores > > 2013-10-29 11:03:58,784 [spark-akka.actor.default-dispatcher-18] ERROR > akka.remote.EndpointWriter - AssociationError [akka.tcp:// > sp...@ddd0.quantifind.com:43068] -> [akka.tcp:// > sparkexecu...@dhd2.quantifind.com:45794]: Error [Association failed with > [akka.tcp://sparkexecu...@dhd2.quantifind.com:45794]] [ > akka.remote.EndpointAssociationException: Association failed with > [akka.tcp://sparkexecu...@dhd2.quantifind.com:45794] > Caused by: > akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: > Connection refused: dhd2.quantifind.com/10.10.5.64:45794] > > > > Looking in the logs of the failed executor, there are some similar > messages about undeliverable messages, but I don't see any reason: > > 13/10/29 11:03:52 INFO executor.Executor: Finished task ID 943 > > 13/10/29 11:03:53 INFO actor.LocalActorRef: Message [akka.actor.FSM$Timer] > from Actor[akka://sparkExecutor/deadLetters] to > Actor[akka://sparkExecutor/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2Fspark% > 40ddd0.quantifind.com%3A43068-1#772172548] was not delivered. [1] dead > letters encountered. This logging can be turned off or adjusted with > configuration settings 'akka.log-dead-letters' and > 'akka.log-dead-letters-during-shutdown'. > > 13/10/29 11:03:53 INFO actor.LocalActorRef: Message > [akka.remote.transport.AssociationHandle$Disassociated] from > Actor[akka://sparkExecutor/deadLetters] to > Actor[akka://sparkExecutor/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2Fspark% > 40ddd0.quantifind.com%3A43068-1#772172548] was not delivered. [2] dead > letters encountered. This logging can be turned off or adjusted with > configuration settings 'akka.log-dead-letters' and > 'akka.log-dead-letters-during-shutdown'. > >
RE: spark-0.8.0 and hadoop-2.1.0-beta
I am also working on porting the trunk code onto 2.2.0. Seems quite many API changes but many of them are just a rename work. While Yarn 2.1.0 beta also add some client API for easy interaction with YARN framework, but there are not many examples on how to use them ( API and wiki doc are both old and not reflecting the new API), some part of SPARK YARN code will need to be rewritten with the new client API And I am not quite familiar with the user certification part of code, it might take times for it seems to me this part of codes also change a little bit, some methods gone, and I don't find the replacement or they are not need anymore. Best Regards, Raymond Liu From: Matei Zaharia [mailto:matei.zaha...@gmail.com] Sent: Wednesday, October 30, 2013 2:35 AM To: user@spark.incubator.apache.org Subject: Re: spark-0.8.0 and hadoop-2.1.0-beta I'm curious, Viren, do you have a patch you could post to build this against YARN 2.1 / 2.2? It would be nice to see how big the changes are. Matei On Sep 30, 2013, at 10:14 AM, viren kumar wrote: I was able to get Spark 0.8.0 to compile with Hadoop/Yarn 2.1.0-beta, by following some of the changes described here: http://hortonworks.com/blog/stabilizing-yarn-apis-for-apache-hadoop-2-beta-and-beyond/ That should help you build most of it. One change not covered there is the change from ProtoUtils.convertFromProtoFormat(containerToken, cmAddress) to ConverterUtils.convertFromYarn(containerToken, cmAddress). Not 100% sure that my changes are correct. Hope that helps, Viren On Sun, Sep 29, 2013 at 8:59 AM, Matei Zaharia wrote: Hi Terence, YARN's API changed in an incompatible way in Hadoop 2.1.0, so I'd suggest sticking with 2.0.x for now. We may create a different branch for this version. Unfortunately due to the API change it may not be possible to support this version while also supporting other widely-used versions like 0.23.x. Matei On Sep 29, 2013, at 11:00 AM, Terance Dias wrote: > > Hi, I'm trying to build spark-0.8.0 with hadoop-2.1.0-beta. > I have changed the following properties in SparkBuild.scala file. > > val DEFAULT_HADOOP_VERSION = "2.1.0-beta" > val DEFAULT_YARN = true > > when i do sbt clean compile, I get an error saying > > [error] > /usr/local/spark-0.8.0-incubating/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala:42: > not found: type AMRMProtocol > [error] private var resourceManager: AMRMProtocol = null > > Thanks, > Terance. >
Re: Spark cluster memory configuration for spark-shell
You are correct. If you are just using spark-shell in local mode (i.e., without cluster), you can set the SPARK_MEM environment variable to give the driver more memory. E.g.: SPARK_MEM=24g ./spark-shell Otherwise, if you're using a real cluster, the driver shouldn't require a significant amount of memory, so SPARK_MEM should not have to be used. On Tue, Oct 29, 2013 at 12:40 PM, Soumya Simanta wrote: > I'm new to Spark. I want to try out a few simple example from the Spark > shell. However, I'm not sure how to configure it so that I can make the > max. use of memory on my workers. > > On average I've around 48 GB of RAM on each node on my cluster. I've > around 10 nodes. > > Based on the documentation I could find memory based configuration in two > places. > > *1. $SPARK_INSTALL_DIR/dist/conf/spark-env.sh * > > *SPARK_WORKER_MEMORY* Total amount of memory to allow Spark applications > to use on the machine, e.g. 1000m, 2g (default: total memory minus 1 GB); > note that each application's *individual* memory is configured using its > spark.executor.memory property. > *2. spark.executor.memory JVM flag. * > spark.executor.memory512m Amount of memory to use per executor process, > in the same format as JVM memory strings (e.g. 512m, 2g). > > http://spark.incubator.apache.org/docs/latest/configuration.html#system-properties > > In my case I want to use the max. memory possible on each node. My > understanding is that I don't have to change *SPARK_WORKER_MEMORY *and I > will have to increase spark.executor.memory to something big (e.g., 24g or > 32g). Is this correct? If yes, what is the correct way of setting this > property if I just want to use the spark-shell. > > > Thanks. > -Soumya > >
Re: Getting exception org.apache.spark.SparkException: Job aborted: Task 1.0:37 failed more than 4 times
You may check 'out' files in logs directory for the failure details. On Wed, Oct 30, 2013 at 12:17 AM, Soumya Simanta wrote: > I'm using a pretty recent version of Spark (> 0.8) from Github and it's > failing with the following exception for a very simple task on the > spark-shell. > > > *scala> val file = sc.textFile("hdfs://...")* > file: org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at > :12 > > *scala> val errors = file.filter(line => line.contains("sometext"))* > errors: org.apache.spark.rdd.RDD[String] = FilteredRDD[2] at filter at > :14 > > *scala> errors.count()* > org.apache.spark.SparkException: Job aborted: Task 0.0:32 failed more than > 4 times > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:819) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:817) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:817) > at > org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:432) > at org.apache.spark.scheduler.DAGScheduler.org > $apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:494) > at > org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:158) >
Spark Checkpointing Bug
I have disabled checkpointing in my Spark Streaming application with: ssc.checkpoint(null) This application does not need checkpointing, and there is no operation that I'm doing (only filter operations, and, importantly, NO WINDOWING) that should need checkpointing. And yet I am getting this: 2013-10-29 14:47:22 ERROR LocalScheduler - Exception in task 3 java.lang.Exception: Could not compute split, block input-0-1383076026200 not found at spark.rdd.BlockRDD.compute(BlockRDD.scala:32) at spark.RDD.computeOrReadCheckpoint(RDD.scala:207) at spark.RDD.iterator(RDD.scala:196) at spark.rdd.MappedRDD.compute(MappedRDD.scala:12) at spark.RDD.computeOrReadCheckpoint(RDD.scala:207) at spark.RDD.iterator(RDD.scala:196) at spark.rdd.FilteredRDD.compute(FilteredRDD.scala:15) at spark.RDD.computeOrReadCheckpoint(RDD.scala:207) at spark.RDD.iterator(RDD.scala:196) at spark.rdd.MappedRDD.compute(MappedRDD.scala:12) at spark.RDD.computeOrReadCheckpoint(RDD.scala:207) at spark.RDD.iterator(RDD.scala:196) at spark.rdd.UnionPartition.iterator(UnionRDD.scala:12) at spark.rdd.UnionRDD.compute(UnionRDD.scala:52) at spark.RDD.computeOrReadCheckpoint(RDD.scala:207) at spark.RDD.iterator(RDD.scala:196) at spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:19) at spark.RDD.computeOrReadCheckpoint(RDD.scala:207) at spark.RDD.iterator(RDD.scala:196) at spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:127) at spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:75) at spark.scheduler.local.LocalScheduler.runTask$1(LocalScheduler.scala:76) at spark.scheduler.local.LocalScheduler$$anon$1.run(LocalScheduler.scala:49) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441) at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) at java.util.concurrent.FutureTask.run(FutureTask.java:138) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) at java.lang.Thread.run(Thread.java:662) Why might I be getting this exception? Thanks, Craig Vanderborgh
Re: met a problem while running a streaming example program
yes, it works after checkout branch-0.8. thanks. On Tue, Oct 29, 2013 at 12:51 PM, Patrick Wendell wrote: > If you just add the "extends Serializable" changes from here it should > work. > > On Tue, Oct 29, 2013 at 9:36 AM, Patrick Wendell > wrote: > > This was fixed on 0.8 branch and master: > > https://github.com/apache/incubator-spark/pull/63/files > > > > - Patrick > > > > On Tue, Oct 29, 2013 at 9:17 AM, Thunder Stumpges > > wrote: > >> I vaguely remember running into this same error. It says there > >> "java.io.NotSerializableException: > >> org.apache.spark.streaming.examples.clickstream.PageView"... can you > >> check the PageView class in the examples and make sure it has the > >> @serializable directive? I seem to remember having to add it. > >> > >> good luck, > >> Thunder > >> > >> > >> On Tue, Oct 29, 2013 at 6:54 AM, dachuan wrote: > >>> Hi, > >>> > >>> I have tried the clickstream example, it runs into an exception, > anybody met > >>> this before? > >>> > >>> Since the program mentioned "local[2]", so I run it in my local > machine. > >>> > >>> thanks in advance, > >>> dachuan. > >>> > >>> Log Snippet 1: > >>> > >>> 13/10/29 08:50:25 INFO scheduler.DAGScheduler: Submitting 46 missing > tasks > >>> from Stage 12 (MapPartitionsRDD[63] at combineByKey at > >>> ShuffledDStream.scala:41) > >>> 13/10/29 08:50:25 INFO local.LocalTaskSetManager: Size of task 75 is > 4230 > >>> bytes > >>> 13/10/29 08:50:25 INFO local.LocalScheduler: Running 75 > >>> 13/10/29 08:50:25 INFO spark.CacheManager: Cache key is rdd_9_0 > >>> 13/10/29 08:50:25 INFO spark.CacheManager: Computing partition > >>> org.apache.spark.rdd.BlockRDDPartition@0 > >>> 13/10/29 08:50:25 WARN storage.BlockManager: Putting block rdd_9_0 > failed > >>> 13/10/29 08:50:25 INFO local.LocalTaskSetManager: Loss was due to > >>> java.io.NotSerializableException > >>> java.io.NotSerializableException: > >>> org.apache.spark.streaming.examples.clickstream.PageView > >>> > >>> Log Snippet 2: > >>> org.apache.spark.SparkException: Job failed: Task 12.0:0 failed more > than 4 > >>> times; aborting job java.io.NotSerializableException: > >>> org.apache.spark.streaming.examples.clickstream.PageView > >>> at > >>> > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760) > >>> at > >>> > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758) > >>> at > >>> > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60) > >>> at > >>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > >>> at > >>> > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758) > >>> at > >>> > org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:379) > >>> at > >>> org.apache.spark.scheduler.DAGScheduler.org > $apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441) > >>> at > >>> > org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149) > >>> > >>> Two commands that run this app: > >>> ./run-example > >>> org.apache.spark.streaming.exampl.clickstream.PageViewGenerator 4 > 10 > >>> ./run-example > org.apache.spark.streaming.examples.clickstream.PageViewStream > >>> errorRatePerZipCode localhost 4 > >>> > -- Dachuan Huang Cellphone: 614-390-7234 2015 Neil Avenue Ohio State University Columbus, Ohio U.S.A. 43210
Spark v0.7.3: spark.SparkException Bug
This is a bug report on an internal Spark problem. This exception results in the death of the task in which it arises, but somehow Spark seems to recover and soldier on. Here is the exception, which is completely outside of any user level code: java.lang.NullPointerException 2013-10-29 14:38:20 ERROR JobManager - Running streaming job 2425 @ 138307910 ms failed spark.SparkException: Job failed: ShuffleMapTask(3846, 1) failed: ExceptionFailure(java.lang.NullPointerException,java.lang.NullPointerException,[Ljava.lang.StackTraceElement;@4d5c3086 ) at spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:642) at spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:640) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:640) at spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:601) at spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:300) at spark.scheduler.DAGScheduler.spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:364) at spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:107) 2013-10-29 14:38:20 ERROR LocalScheduler - Exception in task 1 Thanks, Craig Vanderborgh
Getting exception org.apache.spark.SparkException: Job aborted: Task 1.0:37 failed more than 4 times
I'm using a pretty recent version of Spark (> 0.8) from Github and it's failing with the following exception for a very simple task on the spark-shell. *scala> val file = sc.textFile("hdfs://...")* file: org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at :12 *scala> val errors = file.filter(line => line.contains("sometext"))* errors: org.apache.spark.rdd.RDD[String] = FilteredRDD[2] at filter at :14 *scala> errors.count()* org.apache.spark.SparkException: Job aborted: Task 0.0:32 failed more than 4 times at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:819) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:817) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:817) at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:432) at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:494) at org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:158)
executor failures w/ scala 2.10
We've been testing out the 2.10 branch of spark, and we're running into some issues were akka disconnects from the executors after a while. We ran some simple tests first, and all was well, so we started upgrading our whole codebase to 2.10. Everything seemed to be working, but then we noticed that when we run long jobs, and then things start failing. The first suspicious thing is that we get akka warnings about undeliverable messages sent to deadLetters: 22013-10-29 11:03:54,577 [spark-akka.actor.default-dispatcher-17] INFO akka.actor.LocalActorRef - Message [akka.remote.transport.ActorTransportAdapter$DisassociateUnderlying] from Actor[akka://spark/deadLetters] to Actor[akka://spark/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2Fspark%4010.10.5.81%3A46572-3#656094700] was not delivered. [4] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. 2013-10-29 11:03:54,579 [spark-akka.actor.default-dispatcher-19] INFO akka.actor.LocalActorRef - Message [akka.remote.transport.AssociationHandle$Disassociated] from Actor[akka://spark/deadLetters] to Actor[akka://spark/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2Fspark%4010.10.5.81%3A46572-3#656094700] was not delivered. [5] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. Generally within a few seconds after the first such message, there are a bunch more, and then the executor is marked as failed, and a new one is started: 2013-10-29 11:03:58,775 [spark-akka.actor.default-dispatcher-3] INFO akka.actor.LocalActorRef - Message [akka.remote.transport.ActorTransportAdapter$DisassociateUnderlying] from Actor[akka://spark/deadLetters] to Actor[akka://spark/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FsparkExecutor% 40dhd2.quantifind.com%3A45794-6#-890135716] was not delivered. [10] dead letters encountered, no more dead letters will be logged. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. 2013-10-29 11:03:58,778 [spark-akka.actor.default-dispatcher-17] INFO org.apache.spark.deploy.client.Client$ClientActor - Executor updated: app-2013102911-/1 is now FAILED (Command exited with code 1) 2013-10-29 11:03:58,784 [spark-akka.actor.default-dispatcher-17] INFO org.apache.spark.deploy.client.Client$ClientActor - Executor added: app-2013102911-/2 on worker-20131029105824-dhd2.quantifind.com-51544 (dhd2.quantifind.com:51544) with 24 cores 2013-10-29 11:03:58,784 [spark-akka.actor.default-dispatcher-18] ERROR akka.remote.EndpointWriter - AssociationError [akka.tcp:// sp...@ddd0.quantifind.com:43068] -> [akka.tcp:// sparkexecu...@dhd2.quantifind.com:45794]: Error [Association failed with [akka.tcp://sparkexecu...@dhd2.quantifind.com:45794]] [ akka.remote.EndpointAssociationException: Association failed with [akka.tcp://sparkexecu...@dhd2.quantifind.com:45794] Caused by: akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: Connection refused: dhd2.quantifind.com/10.10.5.64:45794] Looking in the logs of the failed executor, there are some similar messages about undeliverable messages, but I don't see any reason: 13/10/29 11:03:52 INFO executor.Executor: Finished task ID 943 13/10/29 11:03:53 INFO actor.LocalActorRef: Message [akka.actor.FSM$Timer] from Actor[akka://sparkExecutor/deadLetters] to Actor[akka://sparkExecutor/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2Fspark% 40ddd0.quantifind.com%3A43068-1#772172548] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. 13/10/29 11:03:53 INFO actor.LocalActorRef: Message [akka.remote.transport.AssociationHandle$Disassociated] from Actor[akka://sparkExecutor/deadLetters] to Actor[akka://sparkExecutor/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2Fspark% 40ddd0.quantifind.com%3A43068-1#772172548] was not delivered. [2] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. 13/10/29 11:03:53 INFO actor.LocalActorRef: Message [akka.remote.transport.AssociationHandle$Disassociated] from Actor[akka://sparkExecutor/deadLetters] to Actor[akka://sparkExecutor/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2Fspark% 40ddd0.quantifind.com%3A43068-1#772172548] was not delivered. [3] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. 13/10/29 11:03:53 ERROR executor.StandaloneExecutorBackend: D
Spark cluster memory configuration for spark-shell
I'm new to Spark. I want to try out a few simple example from the Spark shell. However, I'm not sure how to configure it so that I can make the max. use of memory on my workers. On average I've around 48 GB of RAM on each node on my cluster. I've around 10 nodes. Based on the documentation I could find memory based configuration in two places. *1. $SPARK_INSTALL_DIR/dist/conf/spark-env.sh * *SPARK_WORKER_MEMORY*Total amount of memory to allow Spark applications to use on the machine, e.g. 1000m, 2g (default: total memory minus 1 GB); note that each application's *individual* memory is configured using its spark.executor.memory property. *2. spark.executor.memory JVM flag. * spark.executor.memory512mAmount of memory to use per executor process, in the same format as JVM memory strings (e.g. 512m, 2g). http://spark.incubator.apache.org/docs/latest/configuration.html#system-properties In my case I want to use the max. memory possible on each node. My understanding is that I don't have to change *SPARK_WORKER_MEMORY *and I will have to increase spark.executor.memory to something big (e.g., 24g or 32g). Is this correct? If yes, what is the correct way of setting this property if I just want to use the spark-shell. Thanks. -Soumya
Re: spark-0.8.0 and hadoop-2.1.0-beta
I’m curious, Viren, do you have a patch you could post to build this against YARN 2.1 / 2.2? It would be nice to see how big the changes are. Matei On Sep 30, 2013, at 10:14 AM, viren kumar wrote: > I was able to get Spark 0.8.0 to compile with Hadoop/Yarn 2.1.0-beta, by > following some of the changes described here: > http://hortonworks.com/blog/stabilizing-yarn-apis-for-apache-hadoop-2-beta-and-beyond/ > > That should help you build most of it. One change not covered there is the > change from ProtoUtils.convertFromProtoFormat(containerToken, cmAddress) to > ConverterUtils.convertFromYarn(containerToken, cmAddress). > > Not 100% sure that my changes are correct. > > Hope that helps, > Viren > > > On Sun, Sep 29, 2013 at 8:59 AM, Matei Zaharia > wrote: > Hi Terence, > > YARN's API changed in an incompatible way in Hadoop 2.1.0, so I'd suggest > sticking with 2.0.x for now. We may create a different branch for this > version. Unfortunately due to the API change it may not be possible to > support this version while also supporting other widely-used versions like > 0.23.x. > > Matei > > On Sep 29, 2013, at 11:00 AM, Terance Dias wrote: > > > > > Hi, I'm trying to build spark-0.8.0 with hadoop-2.1.0-beta. > > I have changed the following properties in SparkBuild.scala file. > > > > val DEFAULT_HADOOP_VERSION = "2.1.0-beta" > > val DEFAULT_YARN = true > > > > when i do sbt clean compile, I get an error saying > > > > [error] > > /usr/local/spark-0.8.0-incubating/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala:42: > > not found: type AMRMProtocol > > [error] private var resourceManager: AMRMProtocol = null > > > > Thanks, > > Terance. > > > >
Re: met a problem while running a streaming example program
If you just add the "extends Serializable" changes from here it should work. On Tue, Oct 29, 2013 at 9:36 AM, Patrick Wendell wrote: > This was fixed on 0.8 branch and master: > https://github.com/apache/incubator-spark/pull/63/files > > - Patrick > > On Tue, Oct 29, 2013 at 9:17 AM, Thunder Stumpges > wrote: >> I vaguely remember running into this same error. It says there >> "java.io.NotSerializableException: >> org.apache.spark.streaming.examples.clickstream.PageView"... can you >> check the PageView class in the examples and make sure it has the >> @serializable directive? I seem to remember having to add it. >> >> good luck, >> Thunder >> >> >> On Tue, Oct 29, 2013 at 6:54 AM, dachuan wrote: >>> Hi, >>> >>> I have tried the clickstream example, it runs into an exception, anybody met >>> this before? >>> >>> Since the program mentioned "local[2]", so I run it in my local machine. >>> >>> thanks in advance, >>> dachuan. >>> >>> Log Snippet 1: >>> >>> 13/10/29 08:50:25 INFO scheduler.DAGScheduler: Submitting 46 missing tasks >>> from Stage 12 (MapPartitionsRDD[63] at combineByKey at >>> ShuffledDStream.scala:41) >>> 13/10/29 08:50:25 INFO local.LocalTaskSetManager: Size of task 75 is 4230 >>> bytes >>> 13/10/29 08:50:25 INFO local.LocalScheduler: Running 75 >>> 13/10/29 08:50:25 INFO spark.CacheManager: Cache key is rdd_9_0 >>> 13/10/29 08:50:25 INFO spark.CacheManager: Computing partition >>> org.apache.spark.rdd.BlockRDDPartition@0 >>> 13/10/29 08:50:25 WARN storage.BlockManager: Putting block rdd_9_0 failed >>> 13/10/29 08:50:25 INFO local.LocalTaskSetManager: Loss was due to >>> java.io.NotSerializableException >>> java.io.NotSerializableException: >>> org.apache.spark.streaming.examples.clickstream.PageView >>> >>> Log Snippet 2: >>> org.apache.spark.SparkException: Job failed: Task 12.0:0 failed more than 4 >>> times; aborting job java.io.NotSerializableException: >>> org.apache.spark.streaming.examples.clickstream.PageView >>> at >>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760) >>> at >>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758) >>> at >>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60) >>> at >>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >>> at >>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758) >>> at >>> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:379) >>> at >>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441) >>> at >>> org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149) >>> >>> Two commands that run this app: >>> ./run-example >>> org.apache.spark.streaming.exampl.clickstream.PageViewGenerator 4 10 >>> ./run-example org.apache.spark.streaming.examples.clickstream.PageViewStream >>> errorRatePerZipCode localhost 4 >>>
Re: met a problem while running a streaming example program
This was fixed on 0.8 branch and master: https://github.com/apache/incubator-spark/pull/63/files - Patrick On Tue, Oct 29, 2013 at 9:17 AM, Thunder Stumpges wrote: > I vaguely remember running into this same error. It says there > "java.io.NotSerializableException: > org.apache.spark.streaming.examples.clickstream.PageView"... can you > check the PageView class in the examples and make sure it has the > @serializable directive? I seem to remember having to add it. > > good luck, > Thunder > > > On Tue, Oct 29, 2013 at 6:54 AM, dachuan wrote: >> Hi, >> >> I have tried the clickstream example, it runs into an exception, anybody met >> this before? >> >> Since the program mentioned "local[2]", so I run it in my local machine. >> >> thanks in advance, >> dachuan. >> >> Log Snippet 1: >> >> 13/10/29 08:50:25 INFO scheduler.DAGScheduler: Submitting 46 missing tasks >> from Stage 12 (MapPartitionsRDD[63] at combineByKey at >> ShuffledDStream.scala:41) >> 13/10/29 08:50:25 INFO local.LocalTaskSetManager: Size of task 75 is 4230 >> bytes >> 13/10/29 08:50:25 INFO local.LocalScheduler: Running 75 >> 13/10/29 08:50:25 INFO spark.CacheManager: Cache key is rdd_9_0 >> 13/10/29 08:50:25 INFO spark.CacheManager: Computing partition >> org.apache.spark.rdd.BlockRDDPartition@0 >> 13/10/29 08:50:25 WARN storage.BlockManager: Putting block rdd_9_0 failed >> 13/10/29 08:50:25 INFO local.LocalTaskSetManager: Loss was due to >> java.io.NotSerializableException >> java.io.NotSerializableException: >> org.apache.spark.streaming.examples.clickstream.PageView >> >> Log Snippet 2: >> org.apache.spark.SparkException: Job failed: Task 12.0:0 failed more than 4 >> times; aborting job java.io.NotSerializableException: >> org.apache.spark.streaming.examples.clickstream.PageView >> at >> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760) >> at >> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758) >> at >> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60) >> at >> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >> at >> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758) >> at >> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:379) >> at >> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441) >> at >> org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149) >> >> Two commands that run this app: >> ./run-example >> org.apache.spark.streaming.exampl.clickstream.PageViewGenerator 4 10 >> ./run-example org.apache.spark.streaming.examples.clickstream.PageViewStream >> errorRatePerZipCode localhost 4 >>
Re: met a problem while running a streaming example program
I vaguely remember running into this same error. It says there "java.io.NotSerializableException: org.apache.spark.streaming.examples.clickstream.PageView"... can you check the PageView class in the examples and make sure it has the @serializable directive? I seem to remember having to add it. good luck, Thunder On Tue, Oct 29, 2013 at 6:54 AM, dachuan wrote: > Hi, > > I have tried the clickstream example, it runs into an exception, anybody met > this before? > > Since the program mentioned "local[2]", so I run it in my local machine. > > thanks in advance, > dachuan. > > Log Snippet 1: > > 13/10/29 08:50:25 INFO scheduler.DAGScheduler: Submitting 46 missing tasks > from Stage 12 (MapPartitionsRDD[63] at combineByKey at > ShuffledDStream.scala:41) > 13/10/29 08:50:25 INFO local.LocalTaskSetManager: Size of task 75 is 4230 > bytes > 13/10/29 08:50:25 INFO local.LocalScheduler: Running 75 > 13/10/29 08:50:25 INFO spark.CacheManager: Cache key is rdd_9_0 > 13/10/29 08:50:25 INFO spark.CacheManager: Computing partition > org.apache.spark.rdd.BlockRDDPartition@0 > 13/10/29 08:50:25 WARN storage.BlockManager: Putting block rdd_9_0 failed > 13/10/29 08:50:25 INFO local.LocalTaskSetManager: Loss was due to > java.io.NotSerializableException > java.io.NotSerializableException: > org.apache.spark.streaming.examples.clickstream.PageView > > Log Snippet 2: > org.apache.spark.SparkException: Job failed: Task 12.0:0 failed more than 4 > times; aborting job java.io.NotSerializableException: > org.apache.spark.streaming.examples.clickstream.PageView > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60) > at > scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758) > at > org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:379) > at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441) > at > org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149) > > Two commands that run this app: > ./run-example > org.apache.spark.streaming.exampl.clickstream.PageViewGenerator 4 10 > ./run-example org.apache.spark.streaming.examples.clickstream.PageViewStream > errorRatePerZipCode localhost 4 >
Re: oome from blockmanager
Great! Glad to hear it worked out. Spark definitely has a pain point about deciding the right number of partitions, and I think we're going to be spending a lot of time trying to reduce that issue. Currently working on the patch to reduce the shuffle file block overheads, but in the meantime, you can set "spark.shuffle.consolidateFiles=false" to exchange OOMEs due to too many partitions for worse performance (probably an acceptable tradeoff). On Mon, Oct 28, 2013 at 2:31 PM, Stephen Haberman < stephen.haber...@gmail.com> wrote: > Hey guys, > > As a follow up, I raised our target partition size to 600mb (up from > 64mb), which split this report's 500gb of tiny S3 files into ~700 > partitions, and everything ran much smoother. > > In retrospect, this was the same issue we'd ran into before, having too > many partitions, and had previously solved by throwing some guesses at > coalesce to make it magically go away. > > But now I feel like we have a much better understanding of why the > numbers need to be what they are, which is great. > > So, thanks for all the input and helping me understand what's going on. > > It'd be great to see some of the optimizations to BlockManager happen, > but I understand in the end why it needs to track what it does. And I > was also admittedly using a small cluster anyway. > > - Stephen > >
Re: compare/contrast Spark with Cascading
Hey Prashant, I assume you mean steps to reproduce the OOM. I do not currently. I just ran into them when porting some jobs from map-red. I never turned it into a reproducible test, and i do not exclude that it was my poor programming that caused it. However it happened with a bunch of jobs, and then i asked on the message boards about the OOM, and people pointed me to the assumption about reducer input having to fit in memory. At that point i felt like that was too much of a limitation for the jobs i was trying to port and i gave up. On Tue, Oct 29, 2013 at 1:12 AM, Prashant Sharma wrote: > Hey Koert, > > Can you give me steps to reproduce this ? > > > On Tue, Oct 29, 2013 at 10:06 AM, Koert Kuipers wrote: > >> Matei, >> We have some jobs where even the input for a single key in a groupBy >> would not fit in the the tasks memory. We rely on mapred to stream from >> disk to disk as it reduces. >> I think spark should be able to handle that situation to truly be able to >> claim it can replace map-red (or not?). >> Best, Koert >> >> >> On Mon, Oct 28, 2013 at 8:51 PM, Matei Zaharia >> wrote: >> >>> FWIW, the only thing that Spark expects to fit in memory if you use >>> DISK_ONLY caching is the input to each reduce task. Those currently don't >>> spill to disk. The solution if datasets are large is to add more reduce >>> tasks, whereas Hadoop would run along with a small number of tasks that do >>> lots of disk IO. But this is something we will likely change soon. Other >>> than that, everything runs in a streaming fashion and there's no need for >>> the data to fit in memory. Our goal is certainly to work on any size >>> datasets, and some of our current users are explicitly using Spark to >>> replace things like Hadoop Streaming in just batch jobs (see e.g. Yahoo!'s >>> presentation from http://ampcamp.berkeley.edu/3/). If you run into >>> trouble with these, let us know, since it is an explicit goal of the >>> project to support it. >>> >>> Matei >>> >>> On Oct 28, 2013, at 5:32 PM, Koert Kuipers wrote: >>> >>> no problem :) i am actually not familiar with what oscar has said on >>> this. can you share or point me to the conversation thread? >>> >>> it is my opinion based on the little experimenting i have done. but i am >>> willing to be convinced otherwise. >>> one the very first things i did when we started using spark is run jobs >>> with DISK_ONLY, and see if it could some of the jobs that map-reduce does >>> for us. however i ran into OOMs, presumably because spark makes assumptions >>> that some things should fit in memory. i have to admit i didn't try too >>> hard after the first OOMs. >>> >>> if spark were able to scale from the quick in-memory query to the >>> overnight disk-only giant batch query, i would love it! spark has a much >>> nicer api than map-red, and one could use a single set of algos for >>> everything from quick/realtime queries to giant batch jobs. as far as i am >>> concerned map-red would be done. our clusters of the future would be hdfs + >>> spark. >>> >>> >>> On Mon, Oct 28, 2013 at 8:16 PM, Mark Hamstra >>> wrote: >>> And I didn't mean to skip over you, Koert. I'm just more familiar with what Oscar said on the subject than with your opinion. On Mon, Oct 28, 2013 at 5:13 PM, Mark Hamstra wrote: > Hmmm... I was unaware of this concept that Spark is for medium to >> large datasets but not for very large datasets. > > > It is in the opinion of some at Twitter. That doesn't make it true or > a universally held opinion. > > > > On Mon, Oct 28, 2013 at 5:08 PM, Ashish Rangole wrote: > >> Hmmm... I was unaware of this concept that Spark is for medium to >> large datasets but not for very large datasets. What size is very large? >> >> Can someone please elaborate on why this would be the case and what >> stops Spark, as it is today, to be successfully run on very large >> datasets? >> I'll appreciate it. >> >> I would think that Spark should be able to pull off Hadoop level >> throughput in worst case with DISK_ONLY caching. >> >> Thanks >> On Oct 28, 2013 1:37 PM, "Koert Kuipers" wrote: >> >>> i would say scaling (cascading + DSL for scala) offers similar >>> functionality to spark, and a similar syntax. >>> the main difference between spark and scalding is target jobs: >>> scalding is for long running jobs on very large data. the data is >>> read from and written to disk between steps. jobs run from minutes to >>> days. >>> spark is for faster jobs on medium to large data. the data is >>> primarily held in memory. jobs run from a few seconds to a few hours. >>> although spark can work with data on disks it still makes assumptions >>> that >>> data needs to fit in memory for certain steps (although less and less >>> with >>> every release). spark also makes iterative designs mu
met a problem while running a streaming example program
Hi, I have tried the clickstream example, it runs into an exception, anybody met this before? Since the program mentioned "local[2]", so I run it in my local machine. thanks in advance, dachuan. Log Snippet 1: 13/10/29 08:50:25 INFO scheduler.DAGScheduler: Submitting 46 missing tasks from Stage 12 (MapPartitionsRDD[63] at combineByKey at ShuffledDStream.scala:41) 13/10/29 08:50:25 INFO local.LocalTaskSetManager: Size of task 75 is 4230 bytes 13/10/29 08:50:25 INFO local.LocalScheduler: Running 75 13/10/29 08:50:25 INFO spark.CacheManager: Cache key is rdd_9_0 13/10/29 08:50:25 INFO spark.CacheManager: Computing partition org.apache.spark.rdd.BlockRDDPartition@0 13/10/29 08:50:25 WARN storage.BlockManager: Putting block rdd_9_0 failed 13/10/29 08:50:25 INFO local.LocalTaskSetManager: Loss was due to java.io.NotSerializableException java.io.NotSerializableException: org.apache.spark.streaming.examples.clickstream.PageView Log Snippet 2: org.apache.spark.SparkException: Job failed: Task 12.0:0 failed more than 4 times; aborting job java.io.NotSerializableException: org.apache.spark.streaming.examples.clickstream.PageView at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758) at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:379) at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441) at org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149) Two commands that run this app: ./run-example org.apache.spark.streaming.exampl.clickstream.PageViewGenerator 4 10 ./run-example org.apache.spark.streaming.examples.clickstream.PageViewStream errorRatePerZipCode localhost 4
Re: Task output before a shuffle
On 29 Oct 2013, at 02:47, Matei Zaharia wrote: > Yes, we still write out data after these tasks in Spark 0.8, and it needs to > be written out before any stage that reads it can start. The main reason is > simplicity when there are faults, as well as more flexible scheduling (you > don't have to decide where each reduce task is in advance, you can have more > reduce tasks than you have CPU cores, etc). Thank you for the answer! I have a follow-up: In which fraction (RDD or non-RDD) of the heap will the output be stored before spilling to disk? I have a job where I read over all large data set once and don't persist anything. Would it make sense to set "spark.storage.memoryFraction" to a smaller value in order to avoid spilling to disk? - Ufuk
Reading corrupted hadoop sequence files
Hi, I am trying to read some hadoop sequence files. Some of the lines are not being parsed and causing exceptions. This throws global exception and the job dies and files could not be loaded. Is there a way to catch the exception at each object and return null if the corresponding object could not be formed. I have seen some solution for hadoop map-reduce framework like this( http://stackoverflow.com/questions/14920236/how-to-prevent-hadoop-job-to-fail-on-corrupted-input-file). How to do it in spark? Thanks