[ 
https://issues.apache.org/jira/browse/SPARK-16702?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Angus Gerry updated SPARK-16702:
--------------------------------
    Attachment: SparkThreadsBlocked.txt

> Driver hangs after executors are lost
> -------------------------------------
>
>                 Key: SPARK-16702
>                 URL: https://issues.apache.org/jira/browse/SPARK-16702
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 1.6.1, 1.6.2, 2.0.0
>            Reporter: Angus Gerry
>         Attachments: SparkThreadsBlocked.txt
>
>
> It's my first time, please be kind.
> I'm still trying to debug this error locally - at this stage I'm pretty 
> convinced that it's a weird deadlock/livelock problem due to the use of 
> {{scheduleAtFixedRate}} within {{ExecutorAllocationManager}}. This problem is 
> possibly tangentially related to the issues discussed in SPARK-1560 around 
> the use of blocking calls within locks.
> h4. Observed Behavior
> When running a spark job, and executors are lost, the job occassionally goes 
> into a state where it makes no progress with tasks. Most commonly it seems 
> that the issue occurs when executors are preempted by yarn, but I'm not 
> confident enough to state that it's restricted to just this scenario.
> Upon inspecting a thread dump from the driver, the following stack traces 
> seem noteworthy (a full thread dump is attached):
> {noformat:title=Thread 178: spark-dynamic-executor-allocation (TIMED_WAITING)}
> sun.misc.Unsafe.park(Native Method)
> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedNanos(AbstractQueuedSynchronizer.java:1033)
> java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1326)
> scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:208)
> scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218)
> scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
> scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
> scala.concurrent.Await$.result(package.scala:190)
> org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
> org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:101)
> org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:77)
> org.apache.spark.scheduler.cluster.YarnSchedulerBackend.doRequestTotalExecutors(YarnSchedulerBackend.scala:59)
> org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.requestTotalExecutors(CoarseGrainedSchedulerBackend.scala:447)
> org.apache.spark.SparkContext.requestTotalExecutors(SparkContext.scala:1423)
> org.apache.spark.ExecutorAllocationManager.addExecutors(ExecutorAllocationManager.scala:359)
> org.apache.spark.ExecutorAllocationManager.updateAndSyncNumExecutorsTarget(ExecutorAllocationManager.scala:310)
> org.apache.spark.ExecutorAllocationManager.org$apache$spark$ExecutorAllocationManager$$schedule(ExecutorAllocationManager.scala:264)
> org.apache.spark.ExecutorAllocationManager$$anon$2.run(ExecutorAllocationManager.scala:223)
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> java.lang.Thread.run(Thread.java:745)
> {noformat}
> {noformat:title=Thread 22: dispatcher-event-loop-10 (BLOCKED)}
> org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint.disableExecutor(CoarseGrainedSchedulerBackend.scala:289)
> org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnDriverEndpoint$$anonfun$onDisconnected$1.apply(YarnSchedulerBackend.scala:121)
> org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnDriverEndpoint$$anonfun$onDisconnected$1.apply(YarnSchedulerBackend.scala:120)
> scala.Option.foreach(Option.scala:257)
> org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnDriverEndpoint.onDisconnected(YarnSchedulerBackend.scala:120)
> org.apache.spark.rpc.netty.Inbox$$anonfun$process$1.apply$mcV$sp(Inbox.scala:142)
> org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:204)
> org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
> org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:215)
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> java.lang.Thread.run(Thread.java:745)
> {noformat}
> {noformat:title=Thread 640: kill-executor-thread (BLOCKED)}
> org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.killExecutors(CoarseGrainedSchedulerBackend.scala:488)
> org.apache.spark.SparkContext.killAndReplaceExecutor(SparkContext.scala:1499)
> org.apache.spark.HeartbeatReceiver$$anonfun$org$apache$spark$HeartbeatReceiver$$expireDeadHosts$3$$anon$3$$anonfun$run$3.apply$mcV$sp(HeartbeatReceiver.scala:206)
> org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1219)
> org.apache.spark.HeartbeatReceiver$$anonfun$org$apache$spark$HeartbeatReceiver$$expireDeadHosts$3$$anon$3.run(HeartbeatReceiver.scala:203)
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
> java.util.concurrent.FutureTask.run(FutureTask.java:262)
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> java.lang.Thread.run(Thread.java:745)
> {noformat}
> {noformat:title=Thread 21: dispatcher-event-loop-9 (TIMED_WAITING)}
> sun.misc.Unsafe.park(Native Method)
> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedNanos(AbstractQueuedSynchronizer.java:1033)
> java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1326)
> scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:208)
> scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218)
> scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
> scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
> scala.concurrent.Await$.result(package.scala:190)
> org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
> org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:101)
> org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:77)
> org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.removeExecutor(CoarseGrainedSchedulerBackend.scala:370)
> org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnSchedulerEndpoint$$anonfun$receive$1.applyOrElse(YarnSchedulerBackend.scala:176)
> org.apache.spark.rpc.netty.Inbox$$anonfun$process$1.apply$mcV$sp(Inbox.scala:116)
> org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:204)
> org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
> org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:215)
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> java.lang.Thread.run(Thread.java:745)
> {noformat}
> My theory is that the following components attempting to call and/or message 
> eachother simultaneously are causing a deadlock/livelock scenario.
> * {{CoarseGrainedSchedulerBackend (RequestExecutors) -> 
> YarnSchedulerEndpoint}}
> * {{YarnSchedulerEndpoint (RemoveExecutor) --> DriverEndpoint}}
> * {{DriverEndpoint (disableExecutor) --> CoarseGrainedSchedulerBackend}}
> This is where the use of {{scheduleAtFixedRate}} comes into play: the 
> deadlock should presumably be released when the blocking call to 
> {{YarnSchedulerEndpoint}} times out, however as soon as lock contention 
> causes a single execution of {{ExecutorAllocationManager.schedule}} to take 
> longer than the hard coded 100 milliseconds, then the possibility exists for 
> that thread to release and then immediately reaquire the lock on 
> {{CoarseGrainedSchedulerBackend}}
> h4. Proposed Solution
> A simple solution would be to have 
> {{YarnSchedulerEndpoint.doRequestTotalExecutors}} not make blocking calls, 
> similar to SPARK-15606. However I think it would also be wise to refactor 
> {{ExecutorAllocationManager}} to not use {{scheduleAtFixedRate}} and rather 
> to sleep for some interval of time.
> That's all I've got, I hope that it's been helpful. I plan on starting to 
> work on my proposed solution, and so would welcome any feedback on the 
> direction I've suggested.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to