I just realized something about the failing stages -- they generally occur
in steps like this:
rdd.mapPartitions{itr =>
val myCounters = initializeSomeDataStructure()
itr.foreach{
//update myCounter in here
...
}
myCounters.iterator.map{
//some other transformation here ...
}
}
that is, as a partition is processed, nothing gets output, we just
accumulate some values. Only at the end of the partition do we output some
accumulated values.
These stages don't always fail, and generally they do succeed after the
executor has died and a new one has started -- so I'm pretty confident its
not a problem w/ the code. But maybe we need to add something like a
periodic heartbeat in this kind of operation?
On Wed, Oct 30, 2013 at 8:56 AM, Imran Rashid <[email protected]> wrote:
> I'm gonna try turning on more akka debugging msgs as described at
> http://akka.io/faq/
> and
>
> http://doc.akka.io/docs/akka/current/scala/testing.html#Tracing_Actor_Invocations
>
> unfortunately that will require a patch to spark, but hopefully that will
> give us more info to go on ...
>
>
> On Wed, Oct 30, 2013 at 8:10 AM, Prashant Sharma <[email protected]>wrote:
>
>> I have things running (from scala 2.10 branch) for over 3-4 hours now
>> without a problem and my jobs write data about the same as you suggested.
>> My cluster size is 7 nodes and not *congested* for memory. I going to leave
>> jobs running all night long. Meanwhile I had encourage you to try to spot
>> the problem such that it is reproducible that can help a ton in fixing the
>> issue.
>>
>> Thanks for testing and reporting your experience. I still feel there is
>> something else wrong !. About tolerance for network connection timeouts,
>> setting those properties should work, but I am afraid about Disassociation
>> Event though. I will have to check this is indeed hard to reproduce bug if
>> it is, I mean how do I simulate network delays ?
>>
>>
>> On Wed, Oct 30, 2013 at 6:05 PM, Imran Rashid <[email protected]>wrote:
>>
>>> This is a spark-standalone setup (not mesos), on our own cluster.
>>>
>>> At first I thought it must be some temporary network problem too -- but
>>> the times between receiving task completion events from an executor and
>>> declaring it failed are really small, so I didn't think that could possibly
>>> be it. Plus we tried increasing various akka timeouts, but that didn't
>>> help. Or maybe there are some other spark / akka properities we should be
>>> setting? It certainly should be resilient to such a temporary network
>>> issue, if that is the problem.
>>>
>>> btw, I think I've noticed this happens most often during
>>> ShuffleMapTasks. The tasks write out very small amounts of data (64 MB
>>> total for the entire stage).
>>>
>>> thanks
>>>
>>> On Wed, Oct 30, 2013 at 6:47 AM, Prashant Sharma
>>> <[email protected]>wrote:
>>>
>>>> Are you using mesos ? I admit to have not properly tested things on
>>>> mesos though.
>>>>
>>>>
>>>> On Wed, Oct 30, 2013 at 11:31 AM, Prashant Sharma <[email protected]
>>>> > wrote:
>>>>
>>>>> Those log messages are new to the Akka 2.2 and are usually seen when a
>>>>> node is disassociated with other by either a network failure or even clean
>>>>> shutdown. This suggests some network issue to me, are you running on EC2 ?
>>>>> It might be a temporary thing in that case.
>>>>>
>>>>> I had like to have more details on the long jobs though, how long ?
>>>>>
>>>>>
>>>>> On Wed, Oct 30, 2013 at 1:29 AM, Imran Rashid <[email protected]>wrote:
>>>>>
>>>>>> We've been testing out the 2.10 branch of spark, and we're running
>>>>>> into some issues were akka disconnects from the executors after a while.
>>>>>> We ran some simple tests first, and all was well, so we started upgrading
>>>>>> our whole codebase to 2.10. Everything seemed to be working, but then we
>>>>>> noticed that when we run long jobs, and then things start failing.
>>>>>>
>>>>>>
>>>>>> The first suspicious thing is that we get akka warnings about
>>>>>> undeliverable messages sent to deadLetters:
>>>>>>
>>>>>> 22013-10-29 11:03:54,577 [spark-akka.actor.default-dispatcher-17]
>>>>>> INFO akka.actor.LocalActorRef - Message
>>>>>> [akka.remote.transport.ActorTransportAdapter$DisassociateUnderlying] from
>>>>>> Actor[akka://spark/deadLetters] to
>>>>>> Actor[akka://spark/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2Fspark%4010.10.5.81%3A46572-3#656094700]
>>>>>> was not delivered. [4] dead letters encountered. This logging can be
>>>>>> turned
>>>>>> off or adjusted with configuration settings 'akka.log-dead-letters' and
>>>>>> 'akka.log-dead-letters-during-shutdown'.
>>>>>>
>>>>>> 2013-10-29 11:03:54,579 [spark-akka.actor.default-dispatcher-19]
>>>>>> INFO akka.actor.LocalActorRef - Message
>>>>>> [akka.remote.transport.AssociationHandle$Disassociated] from
>>>>>> Actor[akka://spark/deadLetters] to
>>>>>> Actor[akka://spark/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2Fspark%4010.10.5.81%3A46572-3#656094700]
>>>>>> was not delivered. [5] dead letters encountered. This logging can be
>>>>>> turned
>>>>>> off or adjusted with configuration settings 'akka.log-dead-letters' and
>>>>>> 'akka.log-dead-letters-during-shutdown'.
>>>>>>
>>>>>>
>>>>>>
>>>>>> Generally within a few seconds after the first such message, there
>>>>>> are a bunch more, and then the executor is marked as failed, and a new
>>>>>> one
>>>>>> is started:
>>>>>>
>>>>>> 2013-10-29 11:03:58,775 [spark-akka.actor.default-dispatcher-3] INFO
>>>>>> akka.actor.LocalActorRef - Message
>>>>>> [akka.remote.transport.ActorTransportAdapter$DisassociateUnderlying] from
>>>>>> Actor[akka://spark/deadLetters] to
>>>>>> Actor[akka://spark/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FsparkExecutor%
>>>>>> 40dhd2.quantifind.com%3A45794-6#-890135716] was not delivered. [10]
>>>>>> dead letters encountered, no more dead letters will be logged. This
>>>>>> logging
>>>>>> can be turned off or adjusted with configuration settings
>>>>>> 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
>>>>>>
>>>>>> 2013-10-29 11:03:58,778 [spark-akka.actor.default-dispatcher-17]
>>>>>> INFO org.apache.spark.deploy.client.Client$ClientActor - Executor
>>>>>> updated:
>>>>>> app-20131029110000-0000/1 is now FAILED (Command exited with code 1)
>>>>>>
>>>>>> 2013-10-29 11:03:58,784 [spark-akka.actor.default-dispatcher-17]
>>>>>> INFO org.apache.spark.deploy.client.Client$ClientActor - Executor added:
>>>>>> app-20131029110000-0000/2 on
>>>>>> worker-20131029105824-dhd2.quantifind.com-51544 (
>>>>>> dhd2.quantifind.com:51544) with 24 cores
>>>>>>
>>>>>> 2013-10-29 11:03:58,784 [spark-akka.actor.default-dispatcher-18]
>>>>>> ERROR akka.remote.EndpointWriter - AssociationError [akka.tcp://
>>>>>> [email protected]:43068] -> [akka.tcp://
>>>>>> [email protected]:45794]: Error [Association failed
>>>>>> with [akka.tcp://[email protected]:45794]] [
>>>>>> akka.remote.EndpointAssociationException: Association failed with
>>>>>> [akka.tcp://[email protected]:45794]
>>>>>> Caused by:
>>>>>> akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2:
>>>>>> Connection refused: dhd2.quantifind.com/10.10.5.64:45794]
>>>>>>
>>>>>>
>>>>>>
>>>>>> Looking in the logs of the failed executor, there are some similar
>>>>>> messages about undeliverable messages, but I don't see any reason:
>>>>>>
>>>>>> 13/10/29 11:03:52 INFO executor.Executor: Finished task ID 943
>>>>>>
>>>>>> 13/10/29 11:03:53 INFO actor.LocalActorRef: Message
>>>>>> [akka.actor.FSM$Timer] from Actor[akka://sparkExecutor/deadLetters] to
>>>>>> Actor[akka://sparkExecutor/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2Fspark%
>>>>>> 40ddd0.quantifind.com%3A43068-1#772172548] was not delivered. [1]
>>>>>> dead letters encountered. This logging can be turned off or adjusted with
>>>>>> configuration settings 'akka.log-dead-letters' and
>>>>>> 'akka.log-dead-letters-during-shutdown'.
>>>>>>
>>>>>> 13/10/29 11:03:53 INFO actor.LocalActorRef: Message
>>>>>> [akka.remote.transport.AssociationHandle$Disassociated] from
>>>>>> Actor[akka://sparkExecutor/deadLetters] to
>>>>>> Actor[akka://sparkExecutor/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2Fspark%
>>>>>> 40ddd0.quantifind.com%3A43068-1#772172548] was not delivered. [2]
>>>>>> dead letters encountered. This logging can be turned off or adjusted with
>>>>>> configuration settings 'akka.log-dead-letters' and
>>>>>> 'akka.log-dead-letters-during-shutdown'.
>>>>>>
>>>>>> 13/10/29 11:03:53 INFO actor.LocalActorRef: Message
>>>>>> [akka.remote.transport.AssociationHandle$Disassociated] from
>>>>>> Actor[akka://sparkExecutor/deadLetters] to
>>>>>> Actor[akka://sparkExecutor/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2Fspark%
>>>>>> 40ddd0.quantifind.com%3A43068-1#772172548] was not delivered. [3]
>>>>>> dead letters encountered. This logging can be turned off or adjusted with
>>>>>> configuration settings 'akka.log-dead-letters' and
>>>>>> 'akka.log-dead-letters-during-shutdown'.
>>>>>>
>>>>>> 13/10/29 11:03:53 ERROR executor.StandaloneExecutorBackend: Driver
>>>>>> terminated or disconnected! Shutting down.
>>>>>>
>>>>>> 13/10/29 11:03:53 INFO actor.LocalActorRef: Message
>>>>>> [akka.remote.transport.ActorTransportAdapter$DisassociateUnderlying] from
>>>>>> Actor[akka://sparkExecutor/deadLetters] to
>>>>>> Actor[akka://sparkExecutor/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2Fspark%
>>>>>> 40ddd0.quantifind.com%3A43068-1#772172548] was not delivered. [4]
>>>>>> dead letters encountered. This logging can be turned off or adjusted with
>>>>>> configuration settings 'akka.log-dead-letters' and
>>>>>> 'akka.log-dead-letters-during-shutdown'.
>>>>>>
>>>>>>
>>>>>> After this happens, spark does launch a new executor successfully,
>>>>>> and continue the job. Sometimes, the job just continues happily and
>>>>>> there
>>>>>> aren't any other problems. However, that executor may have to run a
>>>>>> bunch
>>>>>> of steps to re-compute some cached RDDs -- and during that time, another
>>>>>> executor may crash similarly, and then we end up in a never ending loop,
>>>>>> of
>>>>>> one executor crashing, then trying to reload data, while the others sit
>>>>>> around.
>>>>>>
>>>>>> I have no idea what is triggering this behavior -- there isn't any
>>>>>> particular point in the job that it regularly occurs at. Certain steps
>>>>>> seem more prone to this, but there isn't any step which regularly causes
>>>>>> the problem. In a long pipeline of steps, though, that loop becomes very
>>>>>> likely. I don't think its a timeout issue -- the initial failing
>>>>>> executors
>>>>>> can be actively completing stages just seconds before this failure
>>>>>> happens. We did try adjusting some of the spark / akka timeouts:
>>>>>>
>>>>>> -Dspark.storage.blockManagerHeartBeatMs=300000
>>>>>> -Dspark.akka.frameSize=150
>>>>>> -Dspark.akka.timeout=120
>>>>>> -Dspark.akka.askTimeout=30
>>>>>> -Dspark.akka.logLifecycleEvents=true
>>>>>>
>>>>>> but those settings didn't seem to help the problem at all. I figure
>>>>>> it must be some configuration with the new version of akka that we're
>>>>>> missing, but we haven't found anything. Any ideas?
>>>>>>
>>>>>> our code works fine w/ the 0.8.0 release on scala 2.9.3. The
>>>>>> failures occur on the tip of the scala-2.10 branch (5429d62d)
>>>>>>
>>>>>> thanks,
>>>>>> Imran
>>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> s
>>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> s
>>>>
>>>
>>>
>>
>>
>> --
>> s
>>
>
>