We have run into a problem where some Spark job is aborted after a worker is
killed in a 2-worker standalone cluster.  The problem is intermittent, but
we can consistently reproduce it.  The problem only appears to happen when
we kill a worker.  It doesn't happen when we kill an executor directly.

Has anyone run into a similar problem?  We would appreciate if anyone could
share some related experience and/or any suggestion to troubleshoot this
problem.  Thank you.

~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

For those who are interested, we did look into the logs and this is our
analysis so far.  We think the failure is caused by the following two things
combined, but we don't know how the first thing could happen.
* The BlockManagerMasterEndpoint in the driver has some stale block info
corresponding to the dead executor after the worker has been killed.  The
driver does appear to handle the "RemoveExecutor" message and cleans up all
related block info.  But subsequently, and intermittently, it receives some
Akka messages to re-register the dead BlockManager and re-add some of its
blocks.  As a result, upon GetLocations requests from the remaining
executor, the driver responds with some stale block info, instructing the
remaining executor to fetch blocks from the dead executor.  Please see the
driver log excerption below that shows the sequence of events described
above.  In the log, there are two executors: 1.2.3.4 was the one which got
shut down, while 5.6.7.8 is the remaining executor.  The driver also ran on
5.6.7.8.
* When the remaining executor's BlockManager issues a doGetRemote() call to
fetch the block of data, it fails because the targeted BlockManager which
resided in the dead executor is gone.  This failure results in an exception
forwarded to the caller, bypassing the mechanism in the doGetRemote()
function to trigger a re-computation of the block.  I don't know whether
that is intentional or not.

Driver log excerption that shows the driver received messages to re-register
the dead BlockManager after handling the RemoveExecutor message:

11690 15/09/02 20:35:16 [sparkDriver-akka.actor.default-dispatcher-15] DEBUG
AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: [actor] handled message
(172.236378 ms)
AkkaMessage(RegisterExecutor(0,AkkaRpcEndpointRef(Actor[akka.tcp://[email protected]:36140/user/Executor#670388190]),1.2.3.4:36140,8,Map(stdout
->
http://1.2.3.4:8081/logPage/?appId=app-20150902203512-0000&executorId=0&logType=stdout,
stderr ->
http://1.2.3.4:8081/logPage/?appId=app-20150902203512-0000&executorId=0&logType=stderr)),true)
from Actor[akka.tcp://[email protected]:36140/temp/$f]

11717 15/09/02 20:35:16 [sparkDriver-akka.actor.default-dispatcher-15] DEBUG
AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: [actor] received
message AkkaMessage(RegisterBlockManager(BlockManagerId(0, 1.2.3.4,
52615),6667936727,AkkaRpcEndpointRef(Actor[akka.tcp://[email protected]:36140/user/BlockManagerEndpoint1#-216777735])),true)
from Actor[akka.tcp://[email protected]:36140/temp/$g]

11717 15/09/02 20:35:16 [sparkDriver-akka.actor.default-dispatcher-15] DEBUG
AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: Received RPC message:
AkkaMessage(RegisterBlockManager(BlockManagerId(0, 1.2.3.4,
52615),6667936727,AkkaRpcEndpointRef(Actor[akka.tcp://[email protected]:36140/user/BlockManagerEndpoint1#-216777735])),true)

11718 15/09/02 20:35:16 [sparkDriver-akka.actor.default-dispatcher-15] INFO
BlockManagerMasterEndpoint: Registering block manager 1.2.3.4:52615 with 6.2
GB RAM, BlockManagerId(0, 1.2.3.4, 52615)

11719 15/09/02 20:35:16 [sparkDriver-akka.actor.default-dispatcher-15] DEBUG
AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: [actor] handled message
(1.498313 ms) AkkaMessage(RegisterBlockManager(BlockManagerId(0, 1.2.3.4,
52615),6667936727,AkkaRpcEndpointRef(Actor[akka.tcp://[email protected]:36140/user/BlockManagerEndpoint1#-216777735])),true)
from Actor[akka.tcp://[email protected]:36140/temp/$g]

...

308892 15/09/02 20:40:13 [sparkDriver-akka.actor.default-dispatcher-3] ERROR
TaskSchedulerImpl: Lost executor 0 on 1.2.3.4: worker lost

...

308903 15/09/02 20:40:13 [dag-scheduler-event-loop] INFO DAGScheduler:
Executor lost: 0 (epoch 178)

308904 15/09/02 20:40:13 [sparkDriver-akka.actor.default-dispatcher-3] DEBUG
AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: [actor] received
message AkkaMessage(RemoveExecutor(0),true) from
Actor[akka://sparkDriver/temp/$Jqb]

308904 15/09/02 20:40:13 [sparkDriver-akka.actor.default-dispatcher-3] DEBUG
AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: Received RPC message:
AkkaMessage(RemoveExecutor(0),true)

308904 15/09/02 20:40:13 [sparkDriver-akka.actor.default-dispatcher-3] INFO
BlockManagerMasterEndpoint: Trying to remove executor 0 from
BlockManagerMaster.

308906 15/09/02 20:40:13 [sparkDriver-akka.actor.default-dispatcher-3] INFO
BlockManagerMasterEndpoint: Removing block manager BlockManagerId(0,
1.2.3.4, 52615)

308907 15/09/02 20:40:13 [sparkDriver-akka.actor.default-dispatcher-3] DEBUG
AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: [actor] handled message
(2.913003 ms) AkkaMessage(RemoveExecutor(0),true) from
Actor[akka://sparkDriver/temp/$Jqb]

308908 15/09/02 20:40:13 [dag-scheduler-event-loop] INFO BlockManagerMaster:
Removed 0 successfully in removeExecutor

...

308987 15/09/02 20:40:13 [sparkDriver-akka.actor.default-dispatcher-3] DEBUG
AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: [actor] received
message AkkaMessage(RegisterBlockManager(BlockManagerId(0, 1.2.3.4,
52615),6667936727,AkkaRpcEndpointRef(Actor[akka.tcp://[email protected]:36140/user/BlockManagerEndpoint1#-216777735])),true)
from Actor[akka.tcp://[email protected]:36140/temp/$rob]

308987 15/09/02 20:40:13 [sparkDriver-akka.actor.default-dispatcher-3] DEBUG
AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: Received RPC message:
AkkaMessage(RegisterBlockManager(BlockManagerId(0, 1.2.3.4,
52615),6667936727,AkkaRpcEndpointRef(Actor[akka.tcp://[email protected]:36140/user/BlockManagerEndpoint1#-216777735])),true)

308988 15/09/02 20:40:13 [sparkDriver-akka.actor.default-dispatcher-3] INFO
BlockManagerMasterEndpoint: Registering block manager 1.2.3.4:52615 with 6.2
GB RAM, BlockManagerId(0, 1.2.3.4, 52615)

308988 15/09/02 20:40:13 [sparkDriver-akka.actor.default-dispatcher-3] DEBUG
AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: [actor] handled message
(0.324347 ms) AkkaMessage(RegisterBlockManager(BlockManagerId(0, 1.2.3.4,
52615),6667936727,AkkaRpcEndpointRef(Actor[akka.tcp://[email protected]:36140/user/BlockManagerEndpoint1#-216777735])),true)
from Actor[akka.tcp://[email protected]:36140/temp/$rob]

...

309003 15/09/02 20:40:13 [sparkDriver-akka.actor.default-dispatcher-15]
DEBUG AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: [actor] received
message AkkaMessage(UpdateBlockInfo(BlockManagerId(0, 1.2.3.4,
52615),rdd_1158_5,StorageLevel(false, true, false, true,
1),686264,0,0),true) from
Actor[akka.tcp://[email protected]:36140/temp/$sob]

309003 15/09/02 20:40:13 [sparkDriver-akka.actor.default-dispatcher-15]
DEBUG AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: Received RPC
message: AkkaMessage(UpdateBlockInfo(BlockManagerId(0, 1.2.3.4,
52615),rdd_1158_5,StorageLevel(false, true, false, true,
1),686264,0,0),true)

309003 15/09/02 20:40:13 [sparkDriver-akka.actor.default-dispatcher-15] INFO
BlockManagerInfo: Added rdd_1158_5 in memory on 1.2.3.4:52615 (size: 670.2
KB, free: 6.2 GB)

309003 15/09/02 20:40:13 [sparkDriver-akka.actor.default-dispatcher-15]
DEBUG AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: [actor] handled
message (0.327351 ms) AkkaMessage(UpdateBlockInfo(BlockManagerId(0, 1.2.3.4,
52615),rdd_1158_5,StorageLevel(false, true, false, true,
1),686264,0,0),true) from
Actor[akka.tcp://[email protected]:36140/temp/$sob]

...

332822 15/09/02 20:40:37 [sparkDriver-akka.actor.default-dispatcher-4] DEBUG
AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: [actor] received
message AkkaMessage(GetLocations(rdd_1158_5),true) from
Actor[akka.tcp://[email protected]:54406/temp/$VCb]

332822 15/09/02 20:40:37 [sparkDriver-akka.actor.default-dispatcher-4] DEBUG
AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: Received RPC message:
AkkaMessage(GetLocations(rdd_1158_5),true)

332822 15/09/02 20:40:37 [sparkDriver-akka.actor.default-dispatcher-4] DEBUG
AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: [actor] handled message
(0.068605 ms) AkkaMessage(GetLocations(rdd_1158_5),true) from
Actor[akka.tcp://[email protected]:54406/temp/$VCb]





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-intermittently-fails-to-recover-from-a-worker-failure-in-standalone-mode-tp24605.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to