[jira] [Updated] (SPARK-10486) Spark intermittently fails to recover from a worker failure (in standalone mode)

2015-09-08 Thread Cheuk Lam (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-10486?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cheuk Lam updated SPARK-10486:
--
Description: 
We have run into a problem where some Spark job is aborted after one worker is 
killed in a 2-worker standalone cluster.  The problem is intermittent, but we 
can consistently reproduce it.  The problem only appears to happen when we kill 
a worker.  It doesn't seem to happen when we kill an executor directly.

The program we use to reproduce the problem is some iterative program based on 
GraphX, although the nature of the issue doesn't seem to be GraphX related.  
This is how we reproduce the problem:
* Set up a standalone cluster of 2 workers;
* Run a Spark application of some iterative program (ours is some based on 
GraphX);
* Kill a worker process (and thus the associated executor);
* Intermittently some job will be aborted.

The driver and the executor logs are available, as well as the application 
history (event log file).  But they are quite large and can't be attached here.

~

After looking into the log files, we think the failure is caused by the 
following two things combined:
* The BlockManagerMasterEndpoint in the driver has some stale block info 
corresponding to the dead executor after the worker has been killed.  The 
driver does appear to handle the "RemoveExecutor" message and cleans up all 
related block info.  But subsequently, and intermittently, it receives some 
Akka messages to re-register the dead BlockManager and re-add some of its 
blocks.  As a result, upon GetLocations requests from the remaining executor, 
the driver responds with some stale block info, instructing the remaining 
executor to fetch blocks from the dead executor.  Please see the driver log 
excerption below that shows the sequence of events described above.  In the 
log, there are two executors: 1.2.3.4 was the one which got shut down, while 
5.6.7.8 is the remaining executor.  The driver also ran on 5.6.7.8.
* When the remaining executor's BlockManager issues a doGetRemote() call to 
fetch the block of data, it fails because the targeted BlockManager which 
resided in the dead executor is gone.  This failure results in an exception 
forwarded to the caller, bypassing the mechanism in the doGetRemote() function 
to trigger a re-computation of the block.  I don't know whether that is 
intentional or not.

Driver log excerption that shows that the driver received messages to 
re-register the dead executor after handling the RemoveExecutor message:

11690 15/09/02 20:35:16 [sparkDriver-akka.actor.default-dispatcher-15] DEBUG 
AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: [actor] handled message 
(172.236378 ms) 
AkkaMessage(RegisterExecutor(0,AkkaRpcEndpointRef(Actor[akka.tcp://sparkExecutor@1.2.3.4:36140/user/Executor#670388190]),1.2.3.4:36140,8,Map(stdout
 -> 
http://1.2.3.4:8081/logPage/?appId=app-20150902203512-&executorId=0&logType=stdout,
 stderr -> 
http://1.2.3.4:8081/logPage/?appId=app-20150902203512-&executorId=0&logType=stderr)),true)
 from Actor[akka.tcp://sparkExecutor@1.2.3.4:36140/temp/$f]

11717 15/09/02 20:35:16 [sparkDriver-akka.actor.default-dispatcher-15] DEBUG 
AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: [actor] received message 
AkkaMessage(RegisterBlockManager(BlockManagerId(0, 1.2.3.4, 
52615),6667936727,AkkaRpcEndpointRef(Actor[akka.tcp://sparkExecutor@1.2.3.4:36140/user/BlockManagerEndpoint1#-21635])),true)
 from Actor[akka.tcp://sparkExecutor@1.2.3.4:36140/temp/$g]

11717 15/09/02 20:35:16 [sparkDriver-akka.actor.default-dispatcher-15] DEBUG 
AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: Received RPC message: 
AkkaMessage(RegisterBlockManager(BlockManagerId(0, 1.2.3.4, 
52615),6667936727,AkkaRpcEndpointRef(Actor[akka.tcp://sparkExecutor@1.2.3.4:36140/user/BlockManagerEndpoint1#-21635])),true)

11718 15/09/02 20:35:16 [sparkDriver-akka.actor.default-dispatcher-15] INFO 
BlockManagerMasterEndpoint: Registering block manager 1.2.3.4:52615 with 6.2 GB 
RAM, BlockManagerId(0, 1.2.3.4, 52615)

11719 15/09/02 20:35:16 [sparkDriver-akka.actor.default-dispatcher-15] DEBUG 
AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: [actor] handled message 
(1.498313 ms) AkkaMessage(RegisterBlockManager(BlockManagerId(0, 1.2.3.4, 
52615),6667936727,AkkaRpcEndpointRef(Actor[akka.tcp://sparkExecutor@1.2.3.4:36140/user/BlockManagerEndpoint1#-21635])),true)
 from Actor[akka.tcp://sparkExecutor@1.2.3.4:36140/temp/$g]

...

308892 15/09/02 20:40:13 [sparkDriver-akka.actor.default-dispatcher-3] ERROR 
TaskSchedulerImpl: Lost executor 0 on 1.2.3.4: worker lost

...

308903 15/09/02 20:40:13 [dag-scheduler-event-loop] INFO DAGScheduler: Executor 
lost: 0 (epoch 178)

308904 15/09/02 20:40:13 [sparkDriver-akka.actor.default-dispatcher-3] DEBUG 
AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: [actor] received message 
Akk

[jira] [Updated] (SPARK-10486) Spark intermittently fails to recover from a worker failure (in standalone mode)

2015-09-08 Thread Cheuk Lam (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-10486?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cheuk Lam updated SPARK-10486:
--
Description: 
We have run into a problem where some Spark job is aborted after one worker is 
killed in a 2-worker standalone cluster.  The problem is intermittent, but we 
can consistently reproduce it.  The problem only appears to happen when we kill 
a worker.  It doesn't seem to happen when we kill an executor directly.

The program we use to reproduce the problem is some iterative program based on 
GraphX, although the nature of the issue doesn't seem to be GraphX related.  
This is how we reproduce the problem:
* Set up a standalone cluster of 2 workers;
* Run a Spark application of some iterative program (ours is some based on 
GraphX);
* Kill a worker process (and thus the associated executor);
* Intermittently some job will be aborted.

The driver and the executor logs are available, as well as the application 
history (event log file).  But they are quite large and can't be attached here.

~

After looking into the log files, we think the failure is caused by the 
following two things combined:
* The BlockManagerMasterEndpoint in the driver has some stale block info 
corresponding to the dead executor after the worker has been killed.  The 
driver does appear to handle the "RemoveExecutor" message and cleans up all 
related block info.  But subsequently, and intermittently, it receives some 
Akka messages to re-register the dead BlockManager and re-add some of its 
blocks.  As a result, upon GetLocations requests from the remaining executor, 
the driver responds with some stale block info, instructing the remaining 
executor to fetch blocks from the dead executor.  Please see the driver log 
excerption below that shows the sequence of events described above.  In the 
log, there are two executors: 1.2.3.4 was the one which got shut down, while 
5.6.7.8 is the remaining executor.  The driver also ran on 5.6.7.8.
* When the remaining executor's BlockManager issues a doGetRemote() call to 
fetch the block of data, it fails because the targeted BlockManager which 
resided in the dead executor is gone.  This failure results in an exception 
forwarded to the caller, bypassing the mechanism in the doGetRemote() function 
to trigger a re-computation of the block.  I don't know whether that is 
intentional or not.

Driver log excerption that shows the dead BlockManager was re-registered at the 
driver followed by some stale block info:

11690 15/09/02 20:35:16 [sparkDriver-akka.actor.default-dispatcher-15] DEBUG 
AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: [actor] handled message 
(172.236378 ms) 
AkkaMessage(RegisterExecutor(0,AkkaRpcEndpointRef(Actor[akka.tcp://sparkExecutor@1.2.3.4:36140/user/Executor#670388190]),1.2.3.4:36140,8,Map(stdout
 -> 
http://1.2.3.4:8081/logPage/?appId=app-20150902203512-&executorId=0&logType=stdout,
 stderr -> 
http://1.2.3.4:8081/logPage/?appId=app-20150902203512-&executorId=0&logType=stderr)),true)
 from Actor[akka.tcp://sparkExecutor@1.2.3.4:36140/temp/$f]

11717 15/09/02 20:35:16 [sparkDriver-akka.actor.default-dispatcher-15] DEBUG 
AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: [actor] received message 
AkkaMessage(RegisterBlockManager(BlockManagerId(0, 1.2.3.4, 
52615),6667936727,AkkaRpcEndpointRef(Actor[akka.tcp://sparkExecutor@1.2.3.4:36140/user/BlockManagerEndpoint1#-21635])),true)
 from Actor[akka.tcp://sparkExecutor@1.2.3.4:36140/temp/$g]

11717 15/09/02 20:35:16 [sparkDriver-akka.actor.default-dispatcher-15] DEBUG 
AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: Received RPC message: 
AkkaMessage(RegisterBlockManager(BlockManagerId(0, 1.2.3.4, 
52615),6667936727,AkkaRpcEndpointRef(Actor[akka.tcp://sparkExecutor@1.2.3.4:36140/user/BlockManagerEndpoint1#-21635])),true)

11718 15/09/02 20:35:16 [sparkDriver-akka.actor.default-dispatcher-15] INFO 
BlockManagerMasterEndpoint: Registering block manager 1.2.3.4:52615 with 6.2 GB 
RAM, BlockManagerId(0, 1.2.3.4, 52615)

11719 15/09/02 20:35:16 [sparkDriver-akka.actor.default-dispatcher-15] DEBUG 
AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: [actor] handled message 
(1.498313 ms) AkkaMessage(RegisterBlockManager(BlockManagerId(0, 1.2.3.4, 
52615),6667936727,AkkaRpcEndpointRef(Actor[akka.tcp://sparkExecutor@1.2.3.4:36140/user/BlockManagerEndpoint1#-21635])),true)
 from Actor[akka.tcp://sparkExecutor@1.2.3.4:36140/temp/$g]

...

308892 15/09/02 20:40:13 [sparkDriver-akka.actor.default-dispatcher-3] ERROR 
TaskSchedulerImpl: Lost executor 0 on 1.2.3.4: worker lost

...

308903 15/09/02 20:40:13 [dag-scheduler-event-loop] INFO DAGScheduler: Executor 
lost: 0 (epoch 178)

308904 15/09/02 20:40:13 [sparkDriver-akka.actor.default-dispatcher-3] DEBUG 
AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: [actor] received message 
AkkaMessage(RemoveExecut

[jira] [Updated] (SPARK-10486) Spark intermittently fails to recover from a worker failure (in standalone mode)

2015-09-08 Thread Cheuk Lam (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-10486?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cheuk Lam updated SPARK-10486:
--
Description: 
We have run into a problem where some Spark job is aborted after one worker is 
killed in a 2-worker standalone cluster.  The problem is intermittent, but we 
can consistently reproduce it.  The problem only appears to happen when we kill 
a worker.  It doesn't seem to happen when we kill an executor directly.

The program we use to reproduce the problem is some iterative program based on 
GraphX, although the nature of the issue doesn't seem to be GraphX related.  
This is how we reproduce the problem:
* Set up a standalone cluster of 2 workers;
* Run a Spark application of some iterative program (ours is some based on 
GraphX);
* Kill a worker process (and thus the associated executor);
* Intermittently some job will be aborted.

The driver and the executor logs are available, as well as the application 
history (event log file) though it is quite large.  But they are quite large 
and can't be attached here.

~

After looking into the log files, we think the failure is caused by the 
following two things combined:
* The BlockManagerMasterEndpoint in the driver has some stale block info 
corresponding to the dead executor after the worker has been killed.  The 
driver does appear to handle the "RemoveExecutor" message and cleans up all 
related block info.  But subsequently, and intermittently, it receives some 
Akka messages to re-register the dead BlockManager and re-add some of its 
blocks.  As a result, upon GetLocations requests from the remaining executor, 
the driver responds with some stale block info, instructing the remaining 
executor to fetch blocks from the dead executor.  Please see the driver log 
excerption below that shows the sequence of events described above.  In the 
log, there are two executors: 1.2.3.4 was the one which got shut down, while 
5.6.7.8 is the remaining executor.  The driver also ran on 5.6.7.8.
* When the remaining executor's BlockManager issues a doGetRemote() call to 
fetch the block of data, it fails because the targeted BlockManager which 
resided in the dead executor is gone.  This failure results in an exception 
forwarded to the caller, bypassing the mechanism in the doGetRemote() function 
to trigger a re-computation of the block.  I don't know whether that is 
intentional or not.

Driver log excerption that shows the dead BlockManager was re-registered at the 
driver followed by some stale block info:

11690 15/09/02 20:35:16 [sparkDriver-akka.actor.default-dispatcher-15] DEBUG 
AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: [actor] handled message 
(172.236378 ms) 
AkkaMessage(RegisterExecutor(0,AkkaRpcEndpointRef(Actor[akka.tcp://sparkExecutor@1.2.3.4:36140/user/Executor#670388190]),1.2.3.4:36140,8,Map(stdout
 -> 
http://1.2.3.4:8081/logPage/?appId=app-20150902203512-&executorId=0&logType=stdout,
 stderr -> 
http://1.2.3.4:8081/logPage/?appId=app-20150902203512-&executorId=0&logType=stderr)),true)
 from Actor[akka.tcp://sparkExecutor@1.2.3.4:36140/temp/$f]

11717 15/09/02 20:35:16 [sparkDriver-akka.actor.default-dispatcher-15] DEBUG 
AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: [actor] received message 
AkkaMessage(RegisterBlockManager(BlockManagerId(0, 1.2.3.4, 
52615),6667936727,AkkaRpcEndpointRef(Actor[akka.tcp://sparkExecutor@1.2.3.4:36140/user/BlockManagerEndpoint1#-21635])),true)
 from Actor[akka.tcp://sparkExecutor@1.2.3.4:36140/temp/$g]

11717 15/09/02 20:35:16 [sparkDriver-akka.actor.default-dispatcher-15] DEBUG 
AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: Received RPC message: 
AkkaMessage(RegisterBlockManager(BlockManagerId(0, 1.2.3.4, 
52615),6667936727,AkkaRpcEndpointRef(Actor[akka.tcp://sparkExecutor@1.2.3.4:36140/user/BlockManagerEndpoint1#-21635])),true)

11718 15/09/02 20:35:16 [sparkDriver-akka.actor.default-dispatcher-15] INFO 
BlockManagerMasterEndpoint: Registering block manager 1.2.3.4:52615 with 6.2 GB 
RAM, BlockManagerId(0, 1.2.3.4, 52615)

11719 15/09/02 20:35:16 [sparkDriver-akka.actor.default-dispatcher-15] DEBUG 
AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: [actor] handled message 
(1.498313 ms) AkkaMessage(RegisterBlockManager(BlockManagerId(0, 1.2.3.4, 
52615),6667936727,AkkaRpcEndpointRef(Actor[akka.tcp://sparkExecutor@1.2.3.4:36140/user/BlockManagerEndpoint1#-21635])),true)
 from Actor[akka.tcp://sparkExecutor@1.2.3.4:36140/temp/$g]

...

308892 15/09/02 20:40:13 [sparkDriver-akka.actor.default-dispatcher-3] ERROR 
TaskSchedulerImpl: Lost executor 0 on 1.2.3.4: worker lost

...

308903 15/09/02 20:40:13 [dag-scheduler-event-loop] INFO DAGScheduler: Executor 
lost: 0 (epoch 178)

308904 15/09/02 20:40:13 [sparkDriver-akka.actor.default-dispatcher-3] DEBUG 
AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: [actor] received message

[jira] [Created] (SPARK-10486) Spark intermittently fails to recover from a worker failure (in standalone mode)

2015-09-08 Thread Cheuk Lam (JIRA)
Cheuk Lam created SPARK-10486:
-

 Summary: Spark intermittently fails to recover from a worker 
failure (in standalone mode)
 Key: SPARK-10486
 URL: https://issues.apache.org/jira/browse/SPARK-10486
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.4.1
Reporter: Cheuk Lam
Priority: Critical


We have run into a problem where some Spark job is aborted after one worker is 
killed in a 2-worker standalone cluster.  The problem is intermittent, but we 
can consistently reproduce it.  The problem only appears to happen when we kill 
a worker.  It doesn't seem to happen when we kill an executor directly.

The program we use to reproduce the problem is some iterative program based on 
GraphX, although the nature of the issue doesn't seem to be GraphX related.  
This is how we reproduce the problem:
* Set up a standalone cluster of 2 workers;
* Run a Spark application of some iterative program (ours is some based on 
GraphX);
* Kill a worker process (and thus the associated executor);
* Intermittently some job will be aborted.

Please see the attached driver log for the sequence of error messages that lead 
to the abortion of a job.  The executor logs are also available, as well as the 
application history (event log file) though it is quite large (700MB).

~

After looking into the log files, we think the failure is caused by the 
following two things combined:
* The BlockManagerMasterEndpoint in the driver has some stale block info 
corresponding to the dead executor after the worker has been killed.  The 
driver does appear to handle the "RemoveExecutor" message and cleans up all 
related block info.  But subsequently, and intermittently, it receives some 
Akka messages to re-register the dead BlockManager and re-add some of its 
blocks.  As a result, upon GetLocations requests from the remaining executor, 
the driver responds with some stale block info, instructing the remaining 
executor to fetch blocks from the dead executor.  Please see the driver log 
excerption below that shows the sequence of events described above.  In the 
log, there are two executors: 1.2.3.4 was the one which got shut down, while 
5.6.7.8 is the remaining executor.  The driver also ran on 5.6.7.8.
* When the remaining executor's BlockManager issues a doGetRemote() call to 
fetch the block of data, it fails because the targeted BlockManager which 
resided in the dead executor is gone.  This failure results in an exception 
forwarded to the caller, bypassing the mechanism in the doGetRemote() function 
to trigger a re-computation of the block.  I don't know whether that is 
intentional or not.

Driver log excerption that shows the dead BlockManager was re-registered at the 
driver followed by some stale block info:

11690 15/09/02 20:35:16 [sparkDriver-akka.actor.default-dispatcher-15] DEBUG 
AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: [actor] handled message 
(172.236378 ms) 
AkkaMessage(RegisterExecutor(0,AkkaRpcEndpointRef(Actor[akka.tcp://sparkExecutor@1.2.3.4:36140/user/Executor#670388190]),1.2.3.4:36140,8,Map(stdout
 -> 
http://1.2.3.4:8081/logPage/?appId=app-20150902203512-&executorId=0&logType=stdout,
 stderr -> 
http://1.2.3.4:8081/logPage/?appId=app-20150902203512-&executorId=0&logType=stderr)),true)
 from Actor[akka.tcp://sparkExecutor@1.2.3.4:36140/temp/$f]

11717 15/09/02 20:35:16 [sparkDriver-akka.actor.default-dispatcher-15] DEBUG 
AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: [actor] received message 
AkkaMessage(RegisterBlockManager(BlockManagerId(0, 1.2.3.4, 
52615),6667936727,AkkaRpcEndpointRef(Actor[akka.tcp://sparkExecutor@1.2.3.4:36140/user/BlockManagerEndpoint1#-21635])),true)
 from Actor[akka.tcp://sparkExecutor@1.2.3.4:36140/temp/$g]

11717 15/09/02 20:35:16 [sparkDriver-akka.actor.default-dispatcher-15] DEBUG 
AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: Received RPC message: 
AkkaMessage(RegisterBlockManager(BlockManagerId(0, 1.2.3.4, 
52615),6667936727,AkkaRpcEndpointRef(Actor[akka.tcp://sparkExecutor@1.2.3.4:36140/user/BlockManagerEndpoint1#-21635])),true)

11718 15/09/02 20:35:16 [sparkDriver-akka.actor.default-dispatcher-15] INFO 
BlockManagerMasterEndpoint: Registering block manager 1.2.3.4:52615 with 6.2 GB 
RAM, BlockManagerId(0, 1.2.3.4, 52615)

11719 15/09/02 20:35:16 [sparkDriver-akka.actor.default-dispatcher-15] DEBUG 
AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: [actor] handled message 
(1.498313 ms) AkkaMessage(RegisterBlockManager(BlockManagerId(0, 1.2.3.4, 
52615),6667936727,AkkaRpcEndpointRef(Actor[akka.tcp://sparkExecutor@1.2.3.4:36140/user/BlockManagerEndpoint1#-21635])),true)
 from Actor[akka.tcp://sparkExecutor@1.2.3.4:36140/temp/$g]

...

308892 15/09/02 20:40:13 [sparkDriver-akka.actor.default-dispatcher-3] ERROR 
TaskSchedulerImpl: Lost