Re: Spark driver hangs on start of job
Ah I see, glad that simple patch works for your problem. That seems to be a different underlying problem than we have been experiencing. In our case, the executors are failing properly, its just that none of the new ones will ever escape experiencing the same exact issue. So we start a death spiral of thousands of failed executors, all of which can't connect with the driver. Meanwhile, the driver just sits there in the "zombie" state doing nothing while it waits for executors to respond. In that light, my solution is geared towards solving things on the driver-side gracefully. On Thu, Jul 2, 2015 at 4:37 AM, Sjoerd Mulder wrote: > Hi Richard, > > I have actually applied the following fix to our 1.4.0 version and this > seem to resolve the zombies :) > > https://github.com/apache/spark/pull/7077/files > > Sjoerd > > 2015-06-26 20:08 GMT+02:00 Richard Marscher : > >> Hi, >> >> we are on 1.3.1 right now so in case there are differences in the Spark >> files I'll walk through the logic of what we did and post a couple gists at >> the end. We haven't committed to forking Spark for our own deployments yet, >> so right now we shadow some Spark classes in our application code with our >> versions of the classes. Keep in mind I am not a Spark committer so the >> following is a best effort basis that is working for us. But it may be that >> someone more knowledgable about the Spark codebase might see a pitfall to >> my solution or a better solution. >> >> -- >> >> First, we'll start with the root issue in TaskSchedulerImpl. You will >> find the code that prints the "Initial job has not accepted any resources" >> warning inside the "submitTasks" function. Spark creates a separate thread >> that checks some conditions every "STARVATION_TIMEOUT" milliseconds until >> the submitted task set has been launched. It only posts the warn logging >> here and does nothing. I will come back to this part of the code in a >> moment. >> >> The code that determines when the "hasLaunchedTask" flag gets set (and >> thus closes out the starvation thread and the task set is being worked on >> by the cluster) is within the "resourceOffers" function. The various Spark >> Scheduler Backend classes will periodically call this function in >> TaskSchedulerImpl until cluster resources have been assigned to the task >> set. >> >> To start signaling the zombied scenario, I created a new flag: "@volatile >> private var hasZombied = false". In our experience we always get the >> resources in resourceOffer before the starvation thread runs, otherwise we >> have always hit the zombie scenario if resources weren't allocated yet. So >> I added a conditional before the "if(tasks.size > 0) { hasLaunchedTask = >> true }" block. The conditional checks "if(!hasLaunchedTask && hasZombied) { >> dagScheduler.ourCustomFunction() }". I'll explain that DAGScheduler call in >> a moment. >> >> The last detail here is to add code inside the starvation thread block >> after it posts the warning log. Set "hasZombied" to true and then call >> "this.cancel()" to stop the starvation thread from continuing to run. With >> this we now have all the steps needed inside TaskSchedulerImpl to start >> signaling out the zombied condition. >> >> Back to the custom function. DAGScheduler has reference to the >> appropriate Spark listeners that can propagate errors to the task set and >> more importantly back to your application code. If you look at DAGScheduler >> class, you will find a function called "cleanUpAfterSchedulerStop()". This >> function does everything we want, except it is hard coded to a specific >> exception "val error = new SparkException(...)". What I did was copy this >> and made another function that returned a custom Exception I created that I >> use to signal the zombie, something like >> SparkTaskResourceAllocationZombieError. Now you call this function within >> the conditional block in TaskSchedulerImpl.resourceOffers and you should >> see your exception propagating out to your application code so you can take >> appropriate actions. >> >> In our case, we are submitting Spark applications programmatically from a >> Scala application service on an EC2 instance to a Spark Standalone cluster >> in EC2. Whenever we see this error, the application service EC2 instance is >> unable to get resources from the cluster even when attempting subsequent >> Spark applications for a long period of time (it eventually recovers hours >> or days later but that is not useful for us). So in our case we need to >> reschedule the failed Spark application on another EC2 application instance >> and shut down this current EC2 instance because it can no longer get >> cluster resources. Your use case may be different, but at least action can >> be taken at an application level. >> >> Here is some source code, you should be able to locate most of my >> additions to the code by searching for comments starting with "// >> Localytics Code" >> TaskSchedulerImpl gist: >> https://gist
Re: Spark driver hangs on start of job
Hi Richard, I have actually applied the following fix to our 1.4.0 version and this seem to resolve the zombies :) https://github.com/apache/spark/pull/7077/files Sjoerd 2015-06-26 20:08 GMT+02:00 Richard Marscher : > Hi, > > we are on 1.3.1 right now so in case there are differences in the Spark > files I'll walk through the logic of what we did and post a couple gists at > the end. We haven't committed to forking Spark for our own deployments yet, > so right now we shadow some Spark classes in our application code with our > versions of the classes. Keep in mind I am not a Spark committer so the > following is a best effort basis that is working for us. But it may be that > someone more knowledgable about the Spark codebase might see a pitfall to > my solution or a better solution. > > -- > > First, we'll start with the root issue in TaskSchedulerImpl. You will find > the code that prints the "Initial job has not accepted any resources" > warning inside the "submitTasks" function. Spark creates a separate thread > that checks some conditions every "STARVATION_TIMEOUT" milliseconds until > the submitted task set has been launched. It only posts the warn logging > here and does nothing. I will come back to this part of the code in a > moment. > > The code that determines when the "hasLaunchedTask" flag gets set (and > thus closes out the starvation thread and the task set is being worked on > by the cluster) is within the "resourceOffers" function. The various Spark > Scheduler Backend classes will periodically call this function in > TaskSchedulerImpl until cluster resources have been assigned to the task > set. > > To start signaling the zombied scenario, I created a new flag: "@volatile > private var hasZombied = false". In our experience we always get the > resources in resourceOffer before the starvation thread runs, otherwise we > have always hit the zombie scenario if resources weren't allocated yet. So > I added a conditional before the "if(tasks.size > 0) { hasLaunchedTask = > true }" block. The conditional checks "if(!hasLaunchedTask && hasZombied) { > dagScheduler.ourCustomFunction() }". I'll explain that DAGScheduler call in > a moment. > > The last detail here is to add code inside the starvation thread block > after it posts the warning log. Set "hasZombied" to true and then call > "this.cancel()" to stop the starvation thread from continuing to run. With > this we now have all the steps needed inside TaskSchedulerImpl to start > signaling out the zombied condition. > > Back to the custom function. DAGScheduler has reference to the appropriate > Spark listeners that can propagate errors to the task set and more > importantly back to your application code. If you look at DAGScheduler > class, you will find a function called "cleanUpAfterSchedulerStop()". This > function does everything we want, except it is hard coded to a specific > exception "val error = new SparkException(...)". What I did was copy this > and made another function that returned a custom Exception I created that I > use to signal the zombie, something like > SparkTaskResourceAllocationZombieError. Now you call this function within > the conditional block in TaskSchedulerImpl.resourceOffers and you should > see your exception propagating out to your application code so you can take > appropriate actions. > > In our case, we are submitting Spark applications programmatically from a > Scala application service on an EC2 instance to a Spark Standalone cluster > in EC2. Whenever we see this error, the application service EC2 instance is > unable to get resources from the cluster even when attempting subsequent > Spark applications for a long period of time (it eventually recovers hours > or days later but that is not useful for us). So in our case we need to > reschedule the failed Spark application on another EC2 application instance > and shut down this current EC2 instance because it can no longer get > cluster resources. Your use case may be different, but at least action can > be taken at an application level. > > Here is some source code, you should be able to locate most of my > additions to the code by searching for comments starting with "// > Localytics Code" > TaskSchedulerImpl gist: > https://gist.github.com/rmarsch/e5d298e582ab75957957 > DAGScheduler gist: https://gist.github.com/rmarsch/ae8f5bb03b11e8d4f8f7 > > Regards, > Richard > > On Fri, Jun 26, 2015 at 12:08 PM, Sjoerd Mulder > wrote: > >> Hi Richard, >> >> I would like to see how we can get a workaround to get out of the Zombie >> situation since were planning for production :) >> >> If you could share the workaround or point directions that would be great! >> >> Sjoerd >> >> 2015-06-26 16:53 GMT+02:00 Richard Marscher : >> >>> We've seen this issue as well in production. We also aren't sure what >>> causes it, but have just recently shaded some of the Spark code in >>> TaskSchedulerImpl that we use to effectively bubble up an exception from >>> Spark i
Re: Spark driver hangs on start of job
Hi, we are on 1.3.1 right now so in case there are differences in the Spark files I'll walk through the logic of what we did and post a couple gists at the end. We haven't committed to forking Spark for our own deployments yet, so right now we shadow some Spark classes in our application code with our versions of the classes. Keep in mind I am not a Spark committer so the following is a best effort basis that is working for us. But it may be that someone more knowledgable about the Spark codebase might see a pitfall to my solution or a better solution. -- First, we'll start with the root issue in TaskSchedulerImpl. You will find the code that prints the "Initial job has not accepted any resources" warning inside the "submitTasks" function. Spark creates a separate thread that checks some conditions every "STARVATION_TIMEOUT" milliseconds until the submitted task set has been launched. It only posts the warn logging here and does nothing. I will come back to this part of the code in a moment. The code that determines when the "hasLaunchedTask" flag gets set (and thus closes out the starvation thread and the task set is being worked on by the cluster) is within the "resourceOffers" function. The various Spark Scheduler Backend classes will periodically call this function in TaskSchedulerImpl until cluster resources have been assigned to the task set. To start signaling the zombied scenario, I created a new flag: "@volatile private var hasZombied = false". In our experience we always get the resources in resourceOffer before the starvation thread runs, otherwise we have always hit the zombie scenario if resources weren't allocated yet. So I added a conditional before the "if(tasks.size > 0) { hasLaunchedTask = true }" block. The conditional checks "if(!hasLaunchedTask && hasZombied) { dagScheduler.ourCustomFunction() }". I'll explain that DAGScheduler call in a moment. The last detail here is to add code inside the starvation thread block after it posts the warning log. Set "hasZombied" to true and then call "this.cancel()" to stop the starvation thread from continuing to run. With this we now have all the steps needed inside TaskSchedulerImpl to start signaling out the zombied condition. Back to the custom function. DAGScheduler has reference to the appropriate Spark listeners that can propagate errors to the task set and more importantly back to your application code. If you look at DAGScheduler class, you will find a function called "cleanUpAfterSchedulerStop()". This function does everything we want, except it is hard coded to a specific exception "val error = new SparkException(...)". What I did was copy this and made another function that returned a custom Exception I created that I use to signal the zombie, something like SparkTaskResourceAllocationZombieError. Now you call this function within the conditional block in TaskSchedulerImpl.resourceOffers and you should see your exception propagating out to your application code so you can take appropriate actions. In our case, we are submitting Spark applications programmatically from a Scala application service on an EC2 instance to a Spark Standalone cluster in EC2. Whenever we see this error, the application service EC2 instance is unable to get resources from the cluster even when attempting subsequent Spark applications for a long period of time (it eventually recovers hours or days later but that is not useful for us). So in our case we need to reschedule the failed Spark application on another EC2 application instance and shut down this current EC2 instance because it can no longer get cluster resources. Your use case may be different, but at least action can be taken at an application level. Here is some source code, you should be able to locate most of my additions to the code by searching for comments starting with "// Localytics Code" TaskSchedulerImpl gist: https://gist.github.com/rmarsch/e5d298e582ab75957957 DAGScheduler gist: https://gist.github.com/rmarsch/ae8f5bb03b11e8d4f8f7 Regards, Richard On Fri, Jun 26, 2015 at 12:08 PM, Sjoerd Mulder wrote: > Hi Richard, > > I would like to see how we can get a workaround to get out of the Zombie > situation since were planning for production :) > > If you could share the workaround or point directions that would be great! > > Sjoerd > > 2015-06-26 16:53 GMT+02:00 Richard Marscher : > >> We've seen this issue as well in production. We also aren't sure what >> causes it, but have just recently shaded some of the Spark code in >> TaskSchedulerImpl that we use to effectively bubble up an exception from >> Spark instead of zombie in this situation. If you are interested I can go >> into more detail about that. Otherwise I'm also keen to find out more on >> how this might be happening. >> >> On Fri, Jun 26, 2015 at 8:28 AM, Sjoerd Mulder >> wrote: >> >>> Hi, >>> >>> I have a really annoying issue that i cannot replicate consistently, >>> still it happens every +- 100 submissions. (it's a jo
Re: Spark driver hangs on start of job
We've seen this issue as well in production. We also aren't sure what causes it, but have just recently shaded some of the Spark code in TaskSchedulerImpl that we use to effectively bubble up an exception from Spark instead of zombie in this situation. If you are interested I can go into more detail about that. Otherwise I'm also keen to find out more on how this might be happening. On Fri, Jun 26, 2015 at 8:28 AM, Sjoerd Mulder wrote: > Hi, > > I have a really annoying issue that i cannot replicate consistently, still > it happens every +- 100 submissions. (it's a job that's running every 3 > minutes). > Already reported an issue for this: > https://issues.apache.org/jira/browse/SPARK-8592 > > Here are the Thread dump of the Driver and the Executor: > https://docs.google.com/document/d/1x7ZwUzlvRqeJQ12FoGhpLV1zqDAmVsaF2HYhzkPNBKQ > > Any direction is should look into? > > Spark 1.4.0 > Java 1.8.0_45 (Oracle Corporation) > Scala 2.11.6 > > I already tried to resolve the NPE by not logging the ActorRef. This makes > the NPE go away :) > > But the root cause lies deeper I expect, since then the driver then still > hangs with the "*WARN TaskSchedulerImpl: Initial job has not accepted any > resources; check your cluster UI to ensure that workers are registered and > have sufficient resources*" messages. But there are enough resources > available in the cluster, it has plenty of CPU and Memory left. > > Logs from Driver: > > 15/06/26 11:58:19 INFO Remoting: Starting remoting > 15/06/26 11:58:19 INFO Remoting: Remoting started; listening on addresses > :[akka.tcp://sparkDriver@172.17.0.123:51415] > 15/06/26 11:58:19 INFO Utils: Successfully started service 'sparkDriver' > on port 51415. > 15/06/26 11:58:20 INFO SparkEnv: Registering MapOutputTracker > 15/06/26 11:58:20 INFO SparkEnv: Registering BlockManagerMaster > 15/06/26 11:58:20 INFO DiskBlockManager: Created local directory at > /tmp/spark-ff1f5a88-4e1d-4fe0-9c54-890e4174f02a/blockmgr-92b1e974-53bb-45a3-b918-916759e14630 > 15/06/26 11:58:20 INFO MemoryStore: MemoryStore started with capacity > 265.1 MB > 15/06/26 11:58:20 INFO HttpFileServer: HTTP File server directory is > /tmp/spark-ff1f5a88-4e1d-4fe0-9c54-890e4174f02a/httpd-f5894293-33aa-4eaa-9740-4a36c054b6c8 > 15/06/26 11:58:20 INFO HttpServer: Starting HTTP Server > 15/06/26 11:58:20 INFO Utils: Successfully started service 'HTTP file > server' on port 33176. > 15/06/26 11:58:20 INFO SparkEnv: Registering OutputCommitCoordinator > 15/06/26 11:58:20 INFO Utils: Successfully started service 'SparkUI' on > port 4040. > 15/06/26 11:58:20 INFO SparkUI: Started SparkUI at > http://172.17.0.123:4040 > 15/06/26 11:58:20 INFO SparkContext: Added JAR > file:/opt/jar/spark/spark-job-1.0-SNAPSHOT.jar at > http://172.17.0.123:33176/jars/spark-job-1.0-SNAPSHOT.jar with timestamp > 1435319900257 > 15/06/26 11:58:20 INFO AppClient$ClientActor: Connecting to master > akka.tcp://sparkMaster@172.17.42.1:7077/user/Master... > 15/06/26 11:58:20 INFO SparkDeploySchedulerBackend: Connected to Spark > cluster with app ID app-20150626115820-0917 > 15/06/26 11:58:20 INFO AppClient$ClientActor: Executor added: > app-20150626115820-0917/0 on worker-20150625133752-10.0.7.171-47050 ( > 10.0.7.171:47050) with 1 cores > 15/06/26 11:58:20 INFO SparkDeploySchedulerBackend: Granted executor ID > app-20150626115820-0917/0 on hostPort 10.0.7.171:47050 with 1 cores, 2.0 > GB RAM > 15/06/26 11:58:20 INFO TaskSchedulerImpl: Starting speculative execution > thread > 15/06/26 11:58:20 INFO AppClient$ClientActor: Executor updated: > app-20150626115820-0917/0 is now LOADING > 15/06/26 11:58:20 INFO AppClient$ClientActor: Executor updated: > app-20150626115820-0917/0 is now RUNNING > 15/06/26 11:58:20 INFO Utils: Successfully started service > 'org.apache.spark.network.netty.NettyBlockTransferService' on port 52000. > 15/06/26 11:58:20 INFO NettyBlockTransferService: Server created on 52000 > 15/06/26 11:58:20 INFO BlockManagerMaster: Trying to register BlockManager > 15/06/26 11:58:20 INFO BlockManagerMasterEndpoint: Registering block > manager 172.17.0.123:52000 with 265.1 MB RAM, BlockManagerId(driver, > 172.17.0.123, 52000) > 15/06/26 11:58:20 INFO BlockManagerMaster: Registered BlockManager > 15/06/26 11:58:20 INFO SparkDeploySchedulerBackend: SchedulerBackend is > ready for scheduling beginning after reached minRegisteredResourcesRatio: > 0.0 > 15/06/26 11:58:24 INFO Exchange: Using SparkSqlSerializer2. > 15/06/26 11:58:24 INFO Exchange: Using SparkSqlSerializer2. > 15/06/26 11:58:24 INFO SparkContext: Starting job: map at > SparkProductEventAggregator.scala:144 > 15/06/26 11:58:24 INFO Version: Elasticsearch Hadoop v2.1.0.rc1 > [5cc3f53084] > 15/06/26 11:58:24 INFO ScalaEsRowRDD: Reading from > [675d42c8-9823-4d3c-8e86-5aa611d38770/events] > 15/06/26 11:58:24 INFO ScalaEsRowRDD: Discovered mapping > {675d42c8-9823-4d3c-8e86-5aa611d38770=[REMOVED]} for > [675d42c8-9823-4d3c-8e86-5aa611d38770/events] > 15/06/26 11:58:24 INFO DAGSchedul
Spark driver hangs on start of job
Hi, I have a really annoying issue that i cannot replicate consistently, still it happens every +- 100 submissions. (it's a job that's running every 3 minutes). Already reported an issue for this: https://issues.apache.org/jira/browse/SPARK-8592 Here are the Thread dump of the Driver and the Executor: https://docs.google.com/document/d/1x7ZwUzlvRqeJQ12FoGhpLV1zqDAmVsaF2HYhzkPNBKQ Any direction is should look into? Spark 1.4.0 Java 1.8.0_45 (Oracle Corporation) Scala 2.11.6 I already tried to resolve the NPE by not logging the ActorRef. This makes the NPE go away :) But the root cause lies deeper I expect, since then the driver then still hangs with the "*WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources*" messages. But there are enough resources available in the cluster, it has plenty of CPU and Memory left. Logs from Driver: 15/06/26 11:58:19 INFO Remoting: Starting remoting 15/06/26 11:58:19 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@172.17.0.123:51415] 15/06/26 11:58:19 INFO Utils: Successfully started service 'sparkDriver' on port 51415. 15/06/26 11:58:20 INFO SparkEnv: Registering MapOutputTracker 15/06/26 11:58:20 INFO SparkEnv: Registering BlockManagerMaster 15/06/26 11:58:20 INFO DiskBlockManager: Created local directory at /tmp/spark-ff1f5a88-4e1d-4fe0-9c54-890e4174f02a/blockmgr-92b1e974-53bb-45a3-b918-916759e14630 15/06/26 11:58:20 INFO MemoryStore: MemoryStore started with capacity 265.1 MB 15/06/26 11:58:20 INFO HttpFileServer: HTTP File server directory is /tmp/spark-ff1f5a88-4e1d-4fe0-9c54-890e4174f02a/httpd-f5894293-33aa-4eaa-9740-4a36c054b6c8 15/06/26 11:58:20 INFO HttpServer: Starting HTTP Server 15/06/26 11:58:20 INFO Utils: Successfully started service 'HTTP file server' on port 33176. 15/06/26 11:58:20 INFO SparkEnv: Registering OutputCommitCoordinator 15/06/26 11:58:20 INFO Utils: Successfully started service 'SparkUI' on port 4040. 15/06/26 11:58:20 INFO SparkUI: Started SparkUI at http://172.17.0.123:4040 15/06/26 11:58:20 INFO SparkContext: Added JAR file:/opt/jar/spark/spark-job-1.0-SNAPSHOT.jar at http://172.17.0.123:33176/jars/spark-job-1.0-SNAPSHOT.jar with timestamp 1435319900257 15/06/26 11:58:20 INFO AppClient$ClientActor: Connecting to master akka.tcp://sparkMaster@172.17.42.1:7077/user/Master... 15/06/26 11:58:20 INFO SparkDeploySchedulerBackend: Connected to Spark cluster with app ID app-20150626115820-0917 15/06/26 11:58:20 INFO AppClient$ClientActor: Executor added: app-20150626115820-0917/0 on worker-20150625133752-10.0.7.171-47050 ( 10.0.7.171:47050) with 1 cores 15/06/26 11:58:20 INFO SparkDeploySchedulerBackend: Granted executor ID app-20150626115820-0917/0 on hostPort 10.0.7.171:47050 with 1 cores, 2.0 GB RAM 15/06/26 11:58:20 INFO TaskSchedulerImpl: Starting speculative execution thread 15/06/26 11:58:20 INFO AppClient$ClientActor: Executor updated: app-20150626115820-0917/0 is now LOADING 15/06/26 11:58:20 INFO AppClient$ClientActor: Executor updated: app-20150626115820-0917/0 is now RUNNING 15/06/26 11:58:20 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 52000. 15/06/26 11:58:20 INFO NettyBlockTransferService: Server created on 52000 15/06/26 11:58:20 INFO BlockManagerMaster: Trying to register BlockManager 15/06/26 11:58:20 INFO BlockManagerMasterEndpoint: Registering block manager 172.17.0.123:52000 with 265.1 MB RAM, BlockManagerId(driver, 172.17.0.123, 52000) 15/06/26 11:58:20 INFO BlockManagerMaster: Registered BlockManager 15/06/26 11:58:20 INFO SparkDeploySchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.0 15/06/26 11:58:24 INFO Exchange: Using SparkSqlSerializer2. 15/06/26 11:58:24 INFO Exchange: Using SparkSqlSerializer2. 15/06/26 11:58:24 INFO SparkContext: Starting job: map at SparkProductEventAggregator.scala:144 15/06/26 11:58:24 INFO Version: Elasticsearch Hadoop v2.1.0.rc1 [5cc3f53084] 15/06/26 11:58:24 INFO ScalaEsRowRDD: Reading from [675d42c8-9823-4d3c-8e86-5aa611d38770/events] 15/06/26 11:58:24 INFO ScalaEsRowRDD: Discovered mapping {675d42c8-9823-4d3c-8e86-5aa611d38770=[REMOVED]} for [675d42c8-9823-4d3c-8e86-5aa611d38770/events] 15/06/26 11:58:24 INFO DAGScheduler: Registering RDD 5 (map at SparkProductEventAggregator.scala:144) 15/06/26 11:58:24 INFO DAGScheduler: Got job 0 (map at SparkProductEventAggregator.scala:144) with 200 output partitions (allowLocal=false) 15/06/26 11:58:24 INFO DAGScheduler: Final stage: ResultStage 1(map at SparkProductEventAggregator.scala:144) 15/06/26 11:58:24 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 0) 15/06/26 11:58:24 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 0) 15/06/26 11:58:24 INFO DAGScheduler: Submitting ShuffleMapStage 0 (MapPartitionsRDD[5] at map at SparkProductEventAggregator.scala:144),