ok, so I applied a few patches

https://github.com/quantifind/incubator-spark/pull/1/files

and ran it again, with these options:

-Dspark.akka.stdout-loglevel=DEBUG \
  -Dspark.akkaExtra.akka.logLevel=DEBUG\
  -Dspark.akkaExtra.akka.actor.debug.receive=on \
-Dspark.akkaExtra.akka.actor.debug.autoreceive=on \
  -Dspark.akkaExtra.akka.actor.debug.lifecycle=on \
  -Dspark.akkaExtra.akka.remote.log-sent-messages=on \
  -Dspark.akkaExtra.akka.remote.log-received-messages=on\
  -Dspark.akkaExtra.akka.log-config-on-start=on

On the driver, I see:

2013-10-30 08:44:31,034 [spark-akka.actor.default-dispatcher-19] INFO
akka.actor.LocalActorRef - Message
[akka.remote.transport.AssociationHandle$Disassociated] from
Actor[akka://spark/deadLetters] to
Actor[akka://spark/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2Fspark%4010.10.5.64%3A52400-2#-837892141]
was not delivered. [1] dead letters encountered. This logging can be turned
off or adjusted with configuration settings 'akka.log-dead-letters' and
'akka.log-dead-letters-during-shutdown'.

2013-10-30 08:44:31,058 [spark-akka.actor.default-dispatcher-13] INFO
org.apache.spark.scheduler.cluster.SparkDeploySchedulerBackend - Executor 1
disconnected, so removing it, reason:remote Akka client disconnected

2013-10-30 08:44:31,059 [spark-akka.actor.default-dispatcher-13] ERROR
org.apache.spark.scheduler.cluster.ClusterScheduler - Lost executor 1 on
dhd2.quantifind.com: remote Akka client disconnected


on the worker, stderr:

13/10/30 08:44:28 INFO executor.Executor: Finished task ID 934

13/10/30 08:44:31 ERROR executor.StandaloneExecutorBackend: Driver
terminated or disconnected! Shutting down.Disassociated [akka.tcp://
[email protected]:38021] -> [akka.tcp://
[email protected]:36730]

and unfortunately, all those akka debug options give me *no* useful info in
the worker stdout:

Starting akka system "sparkExecutor" using config:

      akka.daemonic = on
      akka.loggers = [""akka.event.slf4j.Slf4jLogger""]
      akka.stdout-loglevel = "DEBUG"
      akka.actor.provider = "akka.remote.RemoteActorRefProvider"
      akka.remote.netty.tcp.transport-class =
"akka.remote.transport.netty.NettyTransport"
      akka.remote.netty.tcp.hostname = "dhd2.quantifind.com"
      akka.remote.netty.tcp.port = 0
      akka.remote.netty.tcp.connection-timeout = 60 s
      akka.remote.netty.tcp.maximum-frame-size = 10MiB
      akka.remote.netty.tcp.execution-pool-size = 4
      akka.actor.default-dispatcher.throughput = 15
      akka.remote.log-remote-lifecycle-events = off
                       akka.remote.log-sent-messages = on
akka.remote.log-received-messages = on
akka.logLevel = DEBUG
akka.actor.debug.autoreceive = on
akka.actor.debug.lifecycle = on
akka.actor.debug.receive = on
akka.log-config-on-start = on
akka.remote.quarantine-systems-for = off
[DEBUG] [10/30/2013 08:40:30.230] [main] [EventStream] StandardOutLogger
started
[DEBUG] [10/30/2013 08:40:30.438]
[sparkExecutor-akka.actor.default-dispatcher-2] [akka://sparkExecutor/]
started (akka.actor.LocalActorRefProvider$Guardian@4bf54c5f)
[DEBUG] [10/30/2013 08:40:30.446]
[sparkExecutor-akka.actor.default-dispatcher-3] [akka://sparkExecutor/user]
started (akka.actor.LocalActorRefProvider$Guardian@72608760)
[DEBUG] [10/30/2013 08:40:30.447]
[sparkExecutor-akka.actor.default-dispatcher-4]
[akka://sparkExecutor/system] started
(akka.actor.LocalActorRefProvider$SystemGuardian@1f57ea4a)
[DEBUG] [10/30/2013 08:40:30.454]
[sparkExecutor-akka.actor.default-dispatcher-2] [akka://sparkExecutor/] now
supervising Actor[akka://sparkExecutor/user]
[DEBUG] [10/30/2013 08:40:30.454]
[sparkExecutor-akka.actor.default-dispatcher-2] [akka://sparkExecutor/] now
supervising Actor[akka://sparkExecutor/system]
[DEBUG] [10/30/2013 08:40:30.468]
[sparkExecutor-akka.actor.default-dispatcher-3] [akka://sparkExecutor/user]
now monitoring Actor[akka://sparkExecutor/system]
[DEBUG] [10/30/2013 08:40:30.468]
[sparkExecutor-akka.actor.default-dispatcher-4]
[akka://sparkExecutor/system] now monitoring Actor[akka://sparkExecutor/]
[DEBUG] [10/30/2013 08:40:30.476]
[sparkExecutor-akka.actor.default-dispatcher-3]
[akka://sparkExecutor/system/log1-Slf4jLogger] started
(akka.event.slf4j.Slf4jLogger@24988707)
[DEBUG] [10/30/2013 08:40:30.477]
[sparkExecutor-akka.actor.default-dispatcher-4]
[akka://sparkExecutor/system] now supervising
Actor[akka://sparkExecutor/system/log1-Slf4jLogger#719056881]

(followed by similar mesages for the "spark" system)

I dunno if this means much more to you, but it seems to me that for some
reason the executor decides to disconnect from the master -- unfortunately
we don't know why.  I think my logging configuration is not getting applied
correctly, or "log-sent-messages" & "log-received-messages" don't do what I
think they do ... something conflicting must be turing that logging off.
There are a zillion different remoting settings:
http://doc.akka.io/docs/akka/snapshot/scala/remoting.html

I feel like I really need to get those messages on why it disconnected to
know which ones to play with.  Any ideas for config changes to see those
messages?

thanks




On Wed, Oct 30, 2013 at 10:09 AM, Prashant Sharma <[email protected]>wrote:

> Can you apply this patch too and check the logs of Driver and worker.
>
> diff --git
> a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
> b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
> index b6f0ec9..ad0ebf7 100644
> ---
> a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
> +++
> b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
> @@ -132,7 +132,7 @@ class StandaloneSchedulerBackend(scheduler:
> ClusterScheduler, actorSystem: Actor
>      // Remove a disconnected slave from the cluster
>      def removeExecutor(executorId: String, reason: String) {
>        if (executorActor.contains(executorId)) {
> -        logInfo("Executor " + executorId + " disconnected, so removing
> it")
> +        logInfo("Executor " + executorId + " disconnected, so removing
> it, reason:" + reason)
>          val numCores = freeCores(executorId)
>          actorToExecutorId -= executorActor(executorId)
>          addressToExecutorId -= executorAddress(executorId)
>
>
>
>
> On Wed, Oct 30, 2013 at 8:18 PM, Imran Rashid <[email protected]>wrote:
>
>> I just realized something about the failing stages -- they generally
>> occur in steps like this:
>>
>> rdd.mapPartitions{itr =>
>>   val myCounters = initializeSomeDataStructure()
>>   itr.foreach{
>>     //update myCounter in here
>>     ...
>>   }
>>
>>   myCounters.iterator.map{
>>     //some other transformation here ...
>>   }
>> }
>>
>> that is, as a partition is processed, nothing gets output, we just
>> accumulate some values.  Only at the end of the partition do we output some
>> accumulated values.
>>
>> These stages don't always fail, and generally they do succeed after the
>> executor has died and a new one has started -- so I'm pretty confident its
>> not a problem w/ the code.  But maybe we need to add something like a
>> periodic heartbeat in this kind of operation?
>>
>>
>>
>> On Wed, Oct 30, 2013 at 8:56 AM, Imran Rashid <[email protected]>wrote:
>>
>>> I'm gonna try turning on more akka debugging msgs as described at
>>> http://akka.io/faq/
>>> and
>>>
>>> http://doc.akka.io/docs/akka/current/scala/testing.html#Tracing_Actor_Invocations
>>>
>>> unfortunately that will require a patch to spark, but hopefully that
>>> will give us more info to go on ...
>>>
>>>
>>> On Wed, Oct 30, 2013 at 8:10 AM, Prashant Sharma 
>>> <[email protected]>wrote:
>>>
>>>> I have things running (from scala 2.10 branch) for over 3-4 hours now
>>>> without a problem and my jobs write data about the same as you suggested.
>>>> My cluster size is 7 nodes and not *congested* for memory. I going to leave
>>>> jobs running all night long. Meanwhile I had encourage you to try to spot
>>>> the problem such that it is reproducible that can help a ton in fixing the
>>>> issue.
>>>>
>>>> Thanks for testing and reporting your experience. I still feel there is
>>>> something else wrong !. About tolerance for network connection timeouts,
>>>> setting those properties should work, but I am afraid about Disassociation
>>>> Event though. I will have to check this is indeed hard to reproduce bug if
>>>> it is, I mean how do I simulate network delays ?
>>>>
>>>>
>>>> On Wed, Oct 30, 2013 at 6:05 PM, Imran Rashid <[email protected]>wrote:
>>>>
>>>>> This is a spark-standalone setup (not mesos), on our own cluster.
>>>>>
>>>>> At first I thought it must be some temporary network problem too --
>>>>> but the times between receiving task completion events from an executor 
>>>>> and
>>>>> declaring it failed are really small, so I didn't think that could 
>>>>> possibly
>>>>> be it.  Plus we tried increasing various akka timeouts, but that didn't
>>>>> help.  Or maybe there are some other spark / akka properities we should be
>>>>> setting?  It certainly should be resilient to such a temporary network
>>>>> issue, if that is the problem.
>>>>>
>>>>> btw, I think I've noticed this happens most often during
>>>>> ShuffleMapTasks.  The tasks write out very small amounts of data (64 MB
>>>>> total for the entire stage).
>>>>>
>>>>> thanks
>>>>>
>>>>> On Wed, Oct 30, 2013 at 6:47 AM, Prashant Sharma <[email protected]
>>>>> > wrote:
>>>>>
>>>>>> Are you using mesos ? I admit to have not properly tested things on
>>>>>> mesos though.
>>>>>>
>>>>>>
>>>>>> On Wed, Oct 30, 2013 at 11:31 AM, Prashant Sharma <
>>>>>> [email protected]> wrote:
>>>>>>
>>>>>>> Those log messages are new to the Akka 2.2 and are usually seen when
>>>>>>> a node is disassociated with other by either a network failure or even
>>>>>>> clean shutdown. This suggests some network issue to me, are you running 
>>>>>>> on
>>>>>>> EC2 ? It might be a temporary thing in that case.
>>>>>>>
>>>>>>> I had like to have more details on the long jobs though, how long ?
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Oct 30, 2013 at 1:29 AM, Imran Rashid 
>>>>>>> <[email protected]>wrote:
>>>>>>>
>>>>>>>> We've been testing out the 2.10 branch of spark, and we're running
>>>>>>>> into some issues were akka disconnects from the executors after a 
>>>>>>>> while.
>>>>>>>> We ran some simple tests first, and all was well, so we started 
>>>>>>>> upgrading
>>>>>>>> our whole codebase to 2.10.  Everything seemed to be working, but then 
>>>>>>>> we
>>>>>>>> noticed that when we run long jobs, and then things start failing.
>>>>>>>>
>>>>>>>>
>>>>>>>> The first suspicious thing is that we get akka warnings about
>>>>>>>> undeliverable messages sent to deadLetters:
>>>>>>>>
>>>>>>>> 22013-10-29 11:03:54,577 [spark-akka.actor.default-dispatcher-17]
>>>>>>>> INFO  akka.actor.LocalActorRef - Message
>>>>>>>> [akka.remote.transport.ActorTransportAdapter$DisassociateUnderlying] 
>>>>>>>> from
>>>>>>>> Actor[akka://spark/deadLetters] to
>>>>>>>> Actor[akka://spark/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2Fspark%4010.10.5.81%3A46572-3#656094700]
>>>>>>>> was not delivered. [4] dead letters encountered. This logging can be 
>>>>>>>> turned
>>>>>>>> off or adjusted with configuration settings 'akka.log-dead-letters' and
>>>>>>>> 'akka.log-dead-letters-during-shutdown'.
>>>>>>>>
>>>>>>>> 2013-10-29 11:03:54,579 [spark-akka.actor.default-dispatcher-19]
>>>>>>>> INFO  akka.actor.LocalActorRef - Message
>>>>>>>> [akka.remote.transport.AssociationHandle$Disassociated] from
>>>>>>>> Actor[akka://spark/deadLetters] to
>>>>>>>> Actor[akka://spark/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2Fspark%4010.10.5.81%3A46572-3#656094700]
>>>>>>>> was not delivered. [5] dead letters encountered. This logging can be 
>>>>>>>> turned
>>>>>>>> off or adjusted with configuration settings 'akka.log-dead-letters' and
>>>>>>>> 'akka.log-dead-letters-during-shutdown'.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Generally within a few seconds after the first such message, there
>>>>>>>> are a bunch more, and then the executor is marked as failed, and a new 
>>>>>>>> one
>>>>>>>> is started:
>>>>>>>>
>>>>>>>> 2013-10-29 11:03:58,775 [spark-akka.actor.default-dispatcher-3]
>>>>>>>> INFO  akka.actor.LocalActorRef - Message
>>>>>>>> [akka.remote.transport.ActorTransportAdapter$DisassociateUnderlying] 
>>>>>>>> from
>>>>>>>> Actor[akka://spark/deadLetters] to
>>>>>>>> Actor[akka://spark/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FsparkExecutor%
>>>>>>>> 40dhd2.quantifind.com%3A45794-6#-890135716] was not delivered.
>>>>>>>> [10] dead letters encountered, no more dead letters will be logged. 
>>>>>>>> This
>>>>>>>> logging can be turned off or adjusted with configuration settings
>>>>>>>> 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
>>>>>>>>
>>>>>>>> 2013-10-29 11:03:58,778 [spark-akka.actor.default-dispatcher-17]
>>>>>>>> INFO  org.apache.spark.deploy.client.Client$ClientActor - Executor 
>>>>>>>> updated:
>>>>>>>> app-20131029110000-0000/1 is now FAILED (Command exited with code 1)
>>>>>>>>
>>>>>>>> 2013-10-29 11:03:58,784 [spark-akka.actor.default-dispatcher-17]
>>>>>>>> INFO  org.apache.spark.deploy.client.Client$ClientActor - Executor 
>>>>>>>> added:
>>>>>>>> app-20131029110000-0000/2 on
>>>>>>>> worker-20131029105824-dhd2.quantifind.com-51544 (
>>>>>>>> dhd2.quantifind.com:51544) with 24 cores
>>>>>>>>
>>>>>>>> 2013-10-29 11:03:58,784 [spark-akka.actor.default-dispatcher-18]
>>>>>>>> ERROR akka.remote.EndpointWriter - AssociationError [akka.tcp://
>>>>>>>> [email protected]:43068] -> [akka.tcp://
>>>>>>>> [email protected]:45794]: Error [Association
>>>>>>>> failed with [akka.tcp://[email protected]:45794]] [
>>>>>>>> akka.remote.EndpointAssociationException: Association failed with
>>>>>>>> [akka.tcp://[email protected]:45794]
>>>>>>>> Caused by:
>>>>>>>> akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2:
>>>>>>>> Connection refused: dhd2.quantifind.com/10.10.5.64:45794]
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Looking in the logs of the failed executor, there are some similar
>>>>>>>> messages about undeliverable messages, but I don't see any reason:
>>>>>>>>
>>>>>>>> 13/10/29 11:03:52 INFO executor.Executor: Finished task ID 943
>>>>>>>>
>>>>>>>> 13/10/29 11:03:53 INFO actor.LocalActorRef: Message
>>>>>>>> [akka.actor.FSM$Timer] from Actor[akka://sparkExecutor/deadLetters] to
>>>>>>>> Actor[akka://sparkExecutor/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2Fspark%
>>>>>>>> 40ddd0.quantifind.com%3A43068-1#772172548] was not delivered. [1]
>>>>>>>> dead letters encountered. This logging can be turned off or adjusted 
>>>>>>>> with
>>>>>>>> configuration settings 'akka.log-dead-letters' and
>>>>>>>> 'akka.log-dead-letters-during-shutdown'.
>>>>>>>>
>>>>>>>> 13/10/29 11:03:53 INFO actor.LocalActorRef: Message
>>>>>>>> [akka.remote.transport.AssociationHandle$Disassociated] from
>>>>>>>> Actor[akka://sparkExecutor/deadLetters] to
>>>>>>>> Actor[akka://sparkExecutor/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2Fspark%
>>>>>>>> 40ddd0.quantifind.com%3A43068-1#772172548] was not delivered. [2]
>>>>>>>> dead letters encountered. This logging can be turned off or adjusted 
>>>>>>>> with
>>>>>>>> configuration settings 'akka.log-dead-letters' and
>>>>>>>> 'akka.log-dead-letters-during-shutdown'.
>>>>>>>>
>>>>>>>> 13/10/29 11:03:53 INFO actor.LocalActorRef: Message
>>>>>>>> [akka.remote.transport.AssociationHandle$Disassociated] from
>>>>>>>> Actor[akka://sparkExecutor/deadLetters] to
>>>>>>>> Actor[akka://sparkExecutor/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2Fspark%
>>>>>>>> 40ddd0.quantifind.com%3A43068-1#772172548] was not delivered. [3]
>>>>>>>> dead letters encountered. This logging can be turned off or adjusted 
>>>>>>>> with
>>>>>>>> configuration settings 'akka.log-dead-letters' and
>>>>>>>> 'akka.log-dead-letters-during-shutdown'.
>>>>>>>>
>>>>>>>> 13/10/29 11:03:53 ERROR executor.StandaloneExecutorBackend: Driver
>>>>>>>> terminated or disconnected! Shutting down.
>>>>>>>>
>>>>>>>> 13/10/29 11:03:53 INFO actor.LocalActorRef: Message
>>>>>>>> [akka.remote.transport.ActorTransportAdapter$DisassociateUnderlying] 
>>>>>>>> from
>>>>>>>> Actor[akka://sparkExecutor/deadLetters] to
>>>>>>>> Actor[akka://sparkExecutor/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2Fspark%
>>>>>>>> 40ddd0.quantifind.com%3A43068-1#772172548] was not delivered. [4]
>>>>>>>> dead letters encountered. This logging can be turned off or adjusted 
>>>>>>>> with
>>>>>>>> configuration settings 'akka.log-dead-letters' and
>>>>>>>> 'akka.log-dead-letters-during-shutdown'.
>>>>>>>>
>>>>>>>>
>>>>>>>> After this happens, spark does launch a new executor successfully,
>>>>>>>> and continue the job.  Sometimes, the job just continues happily and 
>>>>>>>> there
>>>>>>>> aren't any other problems.  However, that executor may have to run a 
>>>>>>>> bunch
>>>>>>>> of steps to re-compute some cached RDDs -- and during that time, 
>>>>>>>> another
>>>>>>>> executor may crash similarly, and then we end up in a never ending 
>>>>>>>> loop, of
>>>>>>>> one executor crashing, then trying to reload data, while the others sit
>>>>>>>> around.
>>>>>>>>
>>>>>>>> I have no idea what is triggering this behavior -- there isn't any
>>>>>>>> particular point in the job that it regularly occurs at.  Certain steps
>>>>>>>> seem more prone to this, but there isn't any step which regularly 
>>>>>>>> causes
>>>>>>>> the problem.  In a long pipeline of steps, though, that loop becomes 
>>>>>>>> very
>>>>>>>> likely.  I don't think its a timeout issue -- the initial failing 
>>>>>>>> executors
>>>>>>>> can be actively completing stages just seconds before this failure
>>>>>>>> happens.  We did try adjusting some of the spark / akka timeouts:
>>>>>>>>
>>>>>>>>     -Dspark.storage.blockManagerHeartBeatMs=300000
>>>>>>>>     -Dspark.akka.frameSize=150
>>>>>>>>     -Dspark.akka.timeout=120
>>>>>>>>     -Dspark.akka.askTimeout=30
>>>>>>>>     -Dspark.akka.logLifecycleEvents=true
>>>>>>>>
>>>>>>>> but those settings didn't seem to help the problem at all.  I
>>>>>>>> figure it must be some configuration with the new version of akka that
>>>>>>>> we're missing, but we haven't found anything.  Any ideas?
>>>>>>>>
>>>>>>>> our code works fine w/ the 0.8.0 release on scala 2.9.3.  The
>>>>>>>> failures occur on the tip of the scala-2.10 branch (5429d62d)
>>>>>>>>
>>>>>>>> thanks,
>>>>>>>> Imran
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> s
>>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> s
>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> s
>>>>
>>>
>>>
>>
>
>
> --
> s
>

Reply via email to