Re: Spark driver hangs on start of job

2015-07-02 Thread Sjoerd Mulder
Hi Richard,

I have actually applied the following fix to our 1.4.0 version and this
seem to resolve the zombies :)

https://github.com/apache/spark/pull/7077/files

Sjoerd

2015-06-26 20:08 GMT+02:00 Richard Marscher rmarsc...@localytics.com:

 Hi,

 we are on 1.3.1 right now so in case there are differences in the Spark
 files I'll walk through the logic of what we did and post a couple gists at
 the end. We haven't committed to forking Spark for our own deployments yet,
 so right now we shadow some Spark classes in our application code with our
 versions of the classes. Keep in mind I am not a Spark committer so the
 following is a best effort basis that is working for us. But it may be that
 someone more knowledgable about the Spark codebase might see a pitfall to
 my solution or a better solution.

 --

 First, we'll start with the root issue in TaskSchedulerImpl. You will find
 the code that prints the Initial job has not accepted any resources
 warning inside the submitTasks function. Spark creates a separate thread
 that checks some conditions every STARVATION_TIMEOUT milliseconds until
 the submitted task set has been launched. It only posts the warn logging
 here and does nothing. I will come back to this part of the code in a
 moment.

 The code that determines when the hasLaunchedTask flag gets set (and
 thus closes out the starvation thread and the task set is being worked on
 by the cluster) is within the resourceOffers function. The various Spark
 Scheduler Backend classes will periodically call this function in
 TaskSchedulerImpl until cluster resources have been assigned to the task
 set.

 To start signaling the zombied scenario, I created a new flag: @volatile
 private var hasZombied = false. In our experience we always get the
 resources in resourceOffer before the starvation thread runs, otherwise we
 have always hit the zombie scenario if resources weren't allocated yet. So
 I added a conditional before the if(tasks.size  0) { hasLaunchedTask =
 true } block. The conditional checks if(!hasLaunchedTask  hasZombied) {
 dagScheduler.ourCustomFunction() }. I'll explain that DAGScheduler call in
 a moment.

 The last detail here is to add code inside the starvation thread block
 after it posts the warning log. Set hasZombied to true and then call
 this.cancel() to stop the starvation thread from continuing to run. With
 this we now have all the steps needed inside TaskSchedulerImpl to start
 signaling out the zombied condition.

 Back to the custom function. DAGScheduler has reference to the appropriate
 Spark listeners that can propagate errors to the task set and more
 importantly back to your application code. If you look at DAGScheduler
 class, you will find a function called cleanUpAfterSchedulerStop(). This
 function does everything we want, except it is hard coded to a specific
 exception val error = new SparkException(...). What I did was copy this
 and made another function that returned a custom Exception I created that I
 use to signal the zombie, something like
 SparkTaskResourceAllocationZombieError. Now you call this function within
 the conditional block in TaskSchedulerImpl.resourceOffers and you should
 see your exception propagating out to your application code so you can take
 appropriate actions.

 In our case, we are submitting Spark applications programmatically from a
 Scala application service on an EC2 instance to a Spark Standalone cluster
 in EC2. Whenever we see this error, the application service EC2 instance is
 unable to get resources from the cluster even when attempting subsequent
 Spark applications for a long period of time (it eventually recovers hours
 or days later but that is not useful for us). So in our case we need to
 reschedule the failed Spark application on another EC2 application instance
 and shut down this current EC2 instance because it can no longer get
 cluster resources. Your use case may be different, but at least action can
 be taken at an application level.

 Here is some source code, you should be able to locate most of my
 additions to the code by searching for comments starting with //
 Localytics Code
 TaskSchedulerImpl gist:
 https://gist.github.com/rmarsch/e5d298e582ab75957957
 DAGScheduler gist: https://gist.github.com/rmarsch/ae8f5bb03b11e8d4f8f7

 Regards,
 Richard

 On Fri, Jun 26, 2015 at 12:08 PM, Sjoerd Mulder sjoerdmul...@gmail.com
 wrote:

 Hi Richard,

 I would  like to see how we can get a workaround to get out of the Zombie
 situation since were planning for production :)

 If you could share the workaround or point directions that would be great!

 Sjoerd

 2015-06-26 16:53 GMT+02:00 Richard Marscher rmarsc...@localytics.com:

 We've seen this issue as well in production. We also aren't sure what
 causes it, but have just recently shaded some of the Spark code in
 TaskSchedulerImpl that we use to effectively bubble up an exception from
 Spark instead of zombie in this situation. If you are interested I can go
 into 

Re: Spark driver hangs on start of job

2015-07-02 Thread Richard Marscher
Ah I see, glad that simple patch works for your problem. That seems to be a
different underlying problem than we have been experiencing. In our case,
the executors are failing properly, its just that none of the new ones will
ever escape experiencing the same exact issue. So we start a death spiral
of thousands of failed executors, all of which can't connect with the
driver. Meanwhile, the driver just sits there in the zombie state doing
nothing while it waits for executors to respond. In that light, my solution
is geared towards solving things on the driver-side gracefully.


On Thu, Jul 2, 2015 at 4:37 AM, Sjoerd Mulder sjoerdmul...@gmail.com
wrote:

 Hi Richard,

 I have actually applied the following fix to our 1.4.0 version and this
 seem to resolve the zombies :)

 https://github.com/apache/spark/pull/7077/files

 Sjoerd

 2015-06-26 20:08 GMT+02:00 Richard Marscher rmarsc...@localytics.com:

 Hi,

 we are on 1.3.1 right now so in case there are differences in the Spark
 files I'll walk through the logic of what we did and post a couple gists at
 the end. We haven't committed to forking Spark for our own deployments yet,
 so right now we shadow some Spark classes in our application code with our
 versions of the classes. Keep in mind I am not a Spark committer so the
 following is a best effort basis that is working for us. But it may be that
 someone more knowledgable about the Spark codebase might see a pitfall to
 my solution or a better solution.

 --

 First, we'll start with the root issue in TaskSchedulerImpl. You will
 find the code that prints the Initial job has not accepted any resources
 warning inside the submitTasks function. Spark creates a separate thread
 that checks some conditions every STARVATION_TIMEOUT milliseconds until
 the submitted task set has been launched. It only posts the warn logging
 here and does nothing. I will come back to this part of the code in a
 moment.

 The code that determines when the hasLaunchedTask flag gets set (and
 thus closes out the starvation thread and the task set is being worked on
 by the cluster) is within the resourceOffers function. The various Spark
 Scheduler Backend classes will periodically call this function in
 TaskSchedulerImpl until cluster resources have been assigned to the task
 set.

 To start signaling the zombied scenario, I created a new flag: @volatile
 private var hasZombied = false. In our experience we always get the
 resources in resourceOffer before the starvation thread runs, otherwise we
 have always hit the zombie scenario if resources weren't allocated yet. So
 I added a conditional before the if(tasks.size  0) { hasLaunchedTask =
 true } block. The conditional checks if(!hasLaunchedTask  hasZombied) {
 dagScheduler.ourCustomFunction() }. I'll explain that DAGScheduler call in
 a moment.

 The last detail here is to add code inside the starvation thread block
 after it posts the warning log. Set hasZombied to true and then call
 this.cancel() to stop the starvation thread from continuing to run. With
 this we now have all the steps needed inside TaskSchedulerImpl to start
 signaling out the zombied condition.

 Back to the custom function. DAGScheduler has reference to the
 appropriate Spark listeners that can propagate errors to the task set and
 more importantly back to your application code. If you look at DAGScheduler
 class, you will find a function called cleanUpAfterSchedulerStop(). This
 function does everything we want, except it is hard coded to a specific
 exception val error = new SparkException(...). What I did was copy this
 and made another function that returned a custom Exception I created that I
 use to signal the zombie, something like
 SparkTaskResourceAllocationZombieError. Now you call this function within
 the conditional block in TaskSchedulerImpl.resourceOffers and you should
 see your exception propagating out to your application code so you can take
 appropriate actions.

 In our case, we are submitting Spark applications programmatically from a
 Scala application service on an EC2 instance to a Spark Standalone cluster
 in EC2. Whenever we see this error, the application service EC2 instance is
 unable to get resources from the cluster even when attempting subsequent
 Spark applications for a long period of time (it eventually recovers hours
 or days later but that is not useful for us). So in our case we need to
 reschedule the failed Spark application on another EC2 application instance
 and shut down this current EC2 instance because it can no longer get
 cluster resources. Your use case may be different, but at least action can
 be taken at an application level.

 Here is some source code, you should be able to locate most of my
 additions to the code by searching for comments starting with //
 Localytics Code
 TaskSchedulerImpl gist:
 https://gist.github.com/rmarsch/e5d298e582ab75957957
 DAGScheduler gist: https://gist.github.com/rmarsch/ae8f5bb03b11e8d4f8f7

 Regards,
 Richard

 On 

Re: Spark driver hangs on start of job

2015-06-26 Thread Richard Marscher
We've seen this issue as well in production. We also aren't sure what
causes it, but have just recently shaded some of the Spark code in
TaskSchedulerImpl that we use to effectively bubble up an exception from
Spark instead of zombie in this situation. If you are interested I can go
into more detail about that. Otherwise I'm also keen to find out more on
how this might be happening.

On Fri, Jun 26, 2015 at 8:28 AM, Sjoerd Mulder sjoerdmul...@gmail.com
wrote:

 Hi,

 I have a really annoying issue that i cannot replicate consistently, still
 it happens every +- 100 submissions. (it's a job that's running every 3
 minutes).
 Already reported an issue for this:
 https://issues.apache.org/jira/browse/SPARK-8592

 Here are the Thread dump of the Driver and the Executor:
 https://docs.google.com/document/d/1x7ZwUzlvRqeJQ12FoGhpLV1zqDAmVsaF2HYhzkPNBKQ

 Any direction is should look into?

 Spark 1.4.0
 Java 1.8.0_45 (Oracle Corporation)
 Scala 2.11.6

 I already tried to resolve the NPE by not logging the ActorRef. This makes
 the NPE go away :)

 But  the root cause lies deeper I expect, since then the driver then still
 hangs with the *WARN TaskSchedulerImpl: Initial job has not accepted any
 resources; check your cluster UI to ensure that workers are registered and
 have sufficient resources* messages. But there are enough resources
 available in the cluster, it has plenty of CPU and Memory left.

 Logs from Driver:

 15/06/26 11:58:19 INFO Remoting: Starting remoting
 15/06/26 11:58:19 INFO Remoting: Remoting started; listening on addresses
 :[akka.tcp://sparkDriver@172.17.0.123:51415]
 15/06/26 11:58:19 INFO Utils: Successfully started service 'sparkDriver'
 on port 51415.
 15/06/26 11:58:20 INFO SparkEnv: Registering MapOutputTracker
 15/06/26 11:58:20 INFO SparkEnv: Registering BlockManagerMaster
 15/06/26 11:58:20 INFO DiskBlockManager: Created local directory at
 /tmp/spark-ff1f5a88-4e1d-4fe0-9c54-890e4174f02a/blockmgr-92b1e974-53bb-45a3-b918-916759e14630
 15/06/26 11:58:20 INFO MemoryStore: MemoryStore started with capacity
 265.1 MB
 15/06/26 11:58:20 INFO HttpFileServer: HTTP File server directory is
 /tmp/spark-ff1f5a88-4e1d-4fe0-9c54-890e4174f02a/httpd-f5894293-33aa-4eaa-9740-4a36c054b6c8
 15/06/26 11:58:20 INFO HttpServer: Starting HTTP Server
 15/06/26 11:58:20 INFO Utils: Successfully started service 'HTTP file
 server' on port 33176.
 15/06/26 11:58:20 INFO SparkEnv: Registering OutputCommitCoordinator
 15/06/26 11:58:20 INFO Utils: Successfully started service 'SparkUI' on
 port 4040.
 15/06/26 11:58:20 INFO SparkUI: Started SparkUI at
 http://172.17.0.123:4040
 15/06/26 11:58:20 INFO SparkContext: Added JAR
 file:/opt/jar/spark/spark-job-1.0-SNAPSHOT.jar at
 http://172.17.0.123:33176/jars/spark-job-1.0-SNAPSHOT.jar with timestamp
 1435319900257
 15/06/26 11:58:20 INFO AppClient$ClientActor: Connecting to master
 akka.tcp://sparkMaster@172.17.42.1:7077/user/Master...
 15/06/26 11:58:20 INFO SparkDeploySchedulerBackend: Connected to Spark
 cluster with app ID app-20150626115820-0917
 15/06/26 11:58:20 INFO AppClient$ClientActor: Executor added:
 app-20150626115820-0917/0 on worker-20150625133752-10.0.7.171-47050 (
 10.0.7.171:47050) with 1 cores
 15/06/26 11:58:20 INFO SparkDeploySchedulerBackend: Granted executor ID
 app-20150626115820-0917/0 on hostPort 10.0.7.171:47050 with 1 cores, 2.0
 GB RAM
 15/06/26 11:58:20 INFO TaskSchedulerImpl: Starting speculative execution
 thread
 15/06/26 11:58:20 INFO AppClient$ClientActor: Executor updated:
 app-20150626115820-0917/0 is now LOADING
 15/06/26 11:58:20 INFO AppClient$ClientActor: Executor updated:
 app-20150626115820-0917/0 is now RUNNING
 15/06/26 11:58:20 INFO Utils: Successfully started service
 'org.apache.spark.network.netty.NettyBlockTransferService' on port 52000.
 15/06/26 11:58:20 INFO NettyBlockTransferService: Server created on 52000
 15/06/26 11:58:20 INFO BlockManagerMaster: Trying to register BlockManager
 15/06/26 11:58:20 INFO BlockManagerMasterEndpoint: Registering block
 manager 172.17.0.123:52000 with 265.1 MB RAM, BlockManagerId(driver,
 172.17.0.123, 52000)
 15/06/26 11:58:20 INFO BlockManagerMaster: Registered BlockManager
 15/06/26 11:58:20 INFO SparkDeploySchedulerBackend: SchedulerBackend is
 ready for scheduling beginning after reached minRegisteredResourcesRatio:
 0.0
 15/06/26 11:58:24 INFO Exchange: Using SparkSqlSerializer2.
 15/06/26 11:58:24 INFO Exchange: Using SparkSqlSerializer2.
 15/06/26 11:58:24 INFO SparkContext: Starting job: map at
 SparkProductEventAggregator.scala:144
 15/06/26 11:58:24 INFO Version: Elasticsearch Hadoop v2.1.0.rc1
 [5cc3f53084]
 15/06/26 11:58:24 INFO ScalaEsRowRDD: Reading from
 [675d42c8-9823-4d3c-8e86-5aa611d38770/events]
 15/06/26 11:58:24 INFO ScalaEsRowRDD: Discovered mapping
 {675d42c8-9823-4d3c-8e86-5aa611d38770=[REMOVED]} for
 [675d42c8-9823-4d3c-8e86-5aa611d38770/events]
 15/06/26 11:58:24 INFO DAGScheduler: Registering RDD 5 (map at
 SparkProductEventAggregator.scala:144)
 

Re: Spark driver hangs on start of job

2015-06-26 Thread Richard Marscher
Hi,

we are on 1.3.1 right now so in case there are differences in the Spark
files I'll walk through the logic of what we did and post a couple gists at
the end. We haven't committed to forking Spark for our own deployments yet,
so right now we shadow some Spark classes in our application code with our
versions of the classes. Keep in mind I am not a Spark committer so the
following is a best effort basis that is working for us. But it may be that
someone more knowledgable about the Spark codebase might see a pitfall to
my solution or a better solution.

--

First, we'll start with the root issue in TaskSchedulerImpl. You will find
the code that prints the Initial job has not accepted any resources
warning inside the submitTasks function. Spark creates a separate thread
that checks some conditions every STARVATION_TIMEOUT milliseconds until
the submitted task set has been launched. It only posts the warn logging
here and does nothing. I will come back to this part of the code in a
moment.

The code that determines when the hasLaunchedTask flag gets set (and thus
closes out the starvation thread and the task set is being worked on by the
cluster) is within the resourceOffers function. The various Spark
Scheduler Backend classes will periodically call this function in
TaskSchedulerImpl until cluster resources have been assigned to the task
set.

To start signaling the zombied scenario, I created a new flag: @volatile
private var hasZombied = false. In our experience we always get the
resources in resourceOffer before the starvation thread runs, otherwise we
have always hit the zombie scenario if resources weren't allocated yet. So
I added a conditional before the if(tasks.size  0) { hasLaunchedTask =
true } block. The conditional checks if(!hasLaunchedTask  hasZombied) {
dagScheduler.ourCustomFunction() }. I'll explain that DAGScheduler call in
a moment.

The last detail here is to add code inside the starvation thread block
after it posts the warning log. Set hasZombied to true and then call
this.cancel() to stop the starvation thread from continuing to run. With
this we now have all the steps needed inside TaskSchedulerImpl to start
signaling out the zombied condition.

Back to the custom function. DAGScheduler has reference to the appropriate
Spark listeners that can propagate errors to the task set and more
importantly back to your application code. If you look at DAGScheduler
class, you will find a function called cleanUpAfterSchedulerStop(). This
function does everything we want, except it is hard coded to a specific
exception val error = new SparkException(...). What I did was copy this
and made another function that returned a custom Exception I created that I
use to signal the zombie, something like
SparkTaskResourceAllocationZombieError. Now you call this function within
the conditional block in TaskSchedulerImpl.resourceOffers and you should
see your exception propagating out to your application code so you can take
appropriate actions.

In our case, we are submitting Spark applications programmatically from a
Scala application service on an EC2 instance to a Spark Standalone cluster
in EC2. Whenever we see this error, the application service EC2 instance is
unable to get resources from the cluster even when attempting subsequent
Spark applications for a long period of time (it eventually recovers hours
or days later but that is not useful for us). So in our case we need to
reschedule the failed Spark application on another EC2 application instance
and shut down this current EC2 instance because it can no longer get
cluster resources. Your use case may be different, but at least action can
be taken at an application level.

Here is some source code, you should be able to locate most of my additions
to the code by searching for comments starting with // Localytics Code
TaskSchedulerImpl gist: https://gist.github.com/rmarsch/e5d298e582ab75957957
DAGScheduler gist: https://gist.github.com/rmarsch/ae8f5bb03b11e8d4f8f7

Regards,
Richard

On Fri, Jun 26, 2015 at 12:08 PM, Sjoerd Mulder sjoerdmul...@gmail.com
wrote:

 Hi Richard,

 I would  like to see how we can get a workaround to get out of the Zombie
 situation since were planning for production :)

 If you could share the workaround or point directions that would be great!

 Sjoerd

 2015-06-26 16:53 GMT+02:00 Richard Marscher rmarsc...@localytics.com:

 We've seen this issue as well in production. We also aren't sure what
 causes it, but have just recently shaded some of the Spark code in
 TaskSchedulerImpl that we use to effectively bubble up an exception from
 Spark instead of zombie in this situation. If you are interested I can go
 into more detail about that. Otherwise I'm also keen to find out more on
 how this might be happening.

 On Fri, Jun 26, 2015 at 8:28 AM, Sjoerd Mulder sjoerdmul...@gmail.com
 wrote:

 Hi,

 I have a really annoying issue that i cannot replicate consistently,
 still it happens every +- 100 submissions. (it's a job