We may no longer need to track disassociation and IMHO use the *improved* feature in akka 2.2.x called remote death watch. Which lets us acknowledge a remote death both in case of a natural demise and accidental deaths. This was not the case with remote death watch in previous akka releases. Please take a closer look at the patch at https://github.com/apache/incubator-spark/pull/163 and let us know.
This patch does not make disassociation disappear, they are added to akka as such but gives us sufficient knobs to tune things as to when they occur. Don't forget to tune those extra properties apart from other timeouts. On Sat, Nov 2, 2013 at 11:08 AM, Matei Zaharia <[email protected]>wrote: > Prashant, the problem seems to be that messages sent while we’re > disassociated are lost. I think we’d have to just prevent disassociation > altogether, or replace all remote actor refs with the reliable proxies > (which sounds painful). > > Matei > > On Nov 1, 2013, at 7:53 PM, Prashant Sharma <[email protected]> wrote: > > Hey Matei and Imran, > > I think may be we can solve the problem without downgrading to 2.1.0 may > by capturing dissociation and then setting a timeout if it associates again > we keep moving else we shutdown the executor. This timeout can ofcourse be > configurable. > > Thoughts ? > > > On Sat, Nov 2, 2013 at 3:29 AM, Matei Zaharia <[email protected]>wrote: > >> Hey Imran, >> >> Good to know that Akka 2.1 handles this — that at least will give us a >> start. >> >> In the old code, executors certainly did get flagged as “down” >> occasionally, but that was due to a timeout we controlled (we keep sending >> heartbeats back and forth to track them). The timeout used to be smaller >> and usually the reason to exceed it was GC. However, if Akka 2.2 can >> sometimes drop the connections itself, this is a problem and we either have >> to use the reliable proxies for everything or see if we can configure it >> otherwise. Anyway, we’ll definitely look into it. >> >> Matei >> >> On Nov 1, 2013, at 1:09 PM, Imran Rashid <[email protected]> wrote: >> >> I downgraded spark to akka 2.1.0, and everything seems to work now. I'm >> going to run my tests a few more times , but I'd really have expected to >> see a failure by now w/ the 2.2.3 version. >> >> I'll submit a patch shortly (need to fix some compile errors in streaming >> still). >> >> Matei -- I think I realize now that when you were talking about the >> expectation of a tcp connection staying alive, you were explaining why this >> is *not* a bug in the current release. You wouldn't end up in a situation >> where the executor thinks it finished the task, but the driver doesn't know >> about it, b/c if the connection dies, the executor wil get restarted. That >> makes sense. But, it seems like if we upgrade to akka 2.2.x, a lot of >> things change. I was probably wrong about seeing that problem in previous >> releases -- it was just a vague recollection, which fit my current >> theories, so I jumped to conclusions. >> >> thanks everyone >> >> >> >> On Fri, Nov 1, 2013 at 9:27 AM, Imran Rashid <[email protected]>wrote: >> >>> thanks everyone for all of the input. >>> >>> Matei: makes a lot more sense with your explanation of spark's expected >>> behavior of tcp, I can see why this makes sense now. But, to show my total >>> ignorance here, I'm wondering that when the connection does break, are you >>> sure all of your messages that you thought you sent before the break were >>> received? I'm guessing that you don't. Which is fine, if the response to >>> that is to have the executor just die completely, and restart. that was >>> the behavior I was initially observing with the code on the 2.10 branch, >>> where the executor handles a DisassociatedEvent explicitly, and dies. >>> >>> But -- is that the behavior we want? do we want it to be robust to tcp >>> connections breaking, without having to completely restart the executor? >>> you might say that dying & restarting will lead to correct behavior, even >>> if its inefficient. But sometimes, I've seen restarts so frequently that >>> no progress is made. >>> >>> I don't see why this changed w/ the different versions of akka -- I >>> don't see any relevant configuration settings that would change how >>> "strongly" tcp tries to keep the connection alive, but I may be missing >>> something. But it does seem like the netty configuration options have >>> changed completely between the two versions: >>> >>> http://doc.akka.io/docs/akka/2.2.3/scala/remoting.html#Remote_Configuration >>> vs >>> http://doc.akka.io/docs/akka/2.0.5/scala/remoting.html >>> >>> btw, akka 2.1.0 also has been built for scala 2.10: >>> >>> http://search.maven.org/#artifactdetails|com.typesafe.akka|akka-remote_2.10|2.1.0|bundle >>> and its netty configuration is closer to 2.0.5: >>> http://doc.akka.io/docs/akka/2.1.0/scala/remoting.html >>> >>> perhaps someone more knowledge then me about netty & tcp can look >>> through the changes and decide what the right changes are. >>> >>> Prashant said: >>> >Before we conclude something about reliable messaging, I want you to >>> for once consider other possibilities like >actual network reconnection and >>> may be a GC pause ? Try connecting something like jconsole (or alike ) and >>> >see what happens on the driver and executor. >>> > >>> >My doubt are since we are using standalone mode where even master and >>> worker are also actors then if we see >a weird behaviour on the executor >>> and driver then Why not on master and worker too ? They should also break >>> >away from each other. For this reason, I am doubting our conclusions and >>> may be if we narrow down the >problem first before we conclude something. >>> It is a regression in akka 2.2.3 it uses more memory than it used to >be in >>> 2.1.x. >>> >See https://github.com/akka/akka/issues/1810 >>> >>> >>> Well, there could easily be the same problem with dropped connections >>> between master & worker -- they just communicate so little, it doesn't >>> really matter. The odds that a message gets dropped between them is very >>> low, only because there are barely any messages. >>> >>> I completely agree that the problem could be because of a contention, or >>> gc pause, etc. In fact, I'm only giving spark 24 out of 32 cores available >>> on each box, and 90g out of 125g memory. I've looked at gc a little with >>> jstat, and I did see some gc pauses but nothing ridiculous. >>> >>> But, I think the question remains. Suppose it is gc pauses, etc. that >>> cause the disassociation events; what do we do to fix it? How can we >>> diagnose the problem, and figure out which of the configuration variables >>> to tune? clearly, there *will be* long gc pauses, and the networking layer >>> needs to be able to deal with them. >>> >>> still I understand your desire to see if that might be the cause of the >>> problem in this particular case, so I will dig a little more. >>> >>> >>> (btw, should I move this thread to the dev list now? it is getting into >>> the nitty-gritty of implementation ...) >>> >>> On Fri, Nov 1, 2013 at 1:15 AM, Matei Zaharia >>> <[email protected]>wrote: >>> >>>> Yes, so far they’ve been built on that assumption — not that Akka would >>>> *guarantee* delivery in that as soon as the send() call returns you know >>>> it’s delivered, but that Akka would act the same way as a TCP socket, >>>> allowing you to send a stream of messages in order and hear when the >>>> connection breaks. Maybe that isn’t what they want to provide, but I'd find >>>> it weird, because it’s very easy to write a server with this property. >>>> >>>> Matei >>>> >>>> On Oct 31, 2013, at 9:58 PM, Sriram Ramachandrasekaran < >>>> [email protected]> wrote: >>>> >>>> Sorry if I my understanding is wrong. May be, for this particular case >>>> it might be something to do with the load/network, but, in general, are you >>>> saying that, we build these communication channels(block manager >>>> communication, task events communication, etc) assuming akka would take >>>> care of it? I somehow feel that, it's being overly optimistic. Correct me >>>> if I am wrong. >>>> >>>> >>>> >>>> On Fri, Nov 1, 2013 at 10:08 AM, Matei Zaharia <[email protected] >>>> > wrote: >>>> >>>>> It’s true that Akka’s delivery guarantees are in general at-most-once, >>>>> but if you look at the text there it says that they differ by transport. >>>>> In >>>>> the previous version, I’m quite sure that except maybe in very rare >>>>> circumstances or cases where we had a bug, Akka’s remote layer always kept >>>>> connections up between each pair of hosts. So the guarantee was that as >>>>> long as you haven’t received a “disconnected” event, your messages are >>>>> being delivered, though of course when you do receive that event you don’t >>>>> know which messages have really made it through unless you acked them. But >>>>> that didn’t matter for our use case — from our point of view an executor >>>>> was either up or down. >>>>> >>>>> For this reason I still think it should be possible to configure Akka >>>>> to do the same on 2.2. Most likely some timeouts just got lower. With >>>>> large >>>>> heaps you can easily get a GC pause of 60 seconds, so these timeouts >>>>> should >>>>> be in the minutes. >>>>> >>>>> If for some reason this isn’t the case, then we have a bigger problem >>>>> — there are *lots* of messages beyond task-finished that need to be sent >>>>> reliably, including things like block manager events (a block was added / >>>>> removed on this node) and commands to tell the block manager to drop data. >>>>> It would be silly to implement acks at the application level for all >>>>> these. >>>>> But I doubt this is the case. Prashant’s observation that the standalone >>>>> cluster manager stayed up is a further sign that this might be due to GC. >>>>> >>>>> Matei >>>>> >>>>> On Oct 31, 2013, at 9:11 PM, Sriram Ramachandrasekaran < >>>>> [email protected]> wrote: >>>>> >>>>> Hi Imran, >>>>> Just to add, we've noticed dis-associations in a couple projects that >>>>> we built(using akka 2.2.x not spark). We went to some details to find out >>>>> what was happening. As Matei, suggested, Akka keeps the TCP connection >>>>> open >>>>> and uses that to talk to peers. We noticed that in our case, initially, we >>>>> were seeing dis-associations generally at the end of keep-alive duration. >>>>> So, when the keep-alive duration ends, at the TCP layer, a keep-alive >>>>> probe >>>>> gets sent to inform the peer on the other side that the connection is >>>>> still >>>>> alive/valid. For some reason, the probe dint renew the keep-alive >>>>> connection and we saw a lot of dis-associations during that time. Later, >>>>> we >>>>> realized this was not a pattern either. This >>>>> thread<https://groups.google.com/forum/#!msg/akka-user/RYxaPl_nby4/1USHDFIRgOkJ>contains >>>>> the full history of our discussions with the Akka team. It's still >>>>> open and unclear as to what was causing it for our case. >>>>> We tried tweaking various settings of akka(wrt heartbeats, failure >>>>> detector, even plugged-in our own failure detector with no effect). >>>>> >>>>> Imran - Just to clarify your point on message delivery - akka's >>>>> message delivery policy is at-most-once. However, there's no guarantee for >>>>> a message to be delivered to a peer. The documentation clearly explains >>>>> that. >>>>> http://doc.akka.io/docs/akka/2.0.2/general/message-send-semantics.html. >>>>> It's >>>>> the responsibility of the application developer to handle cases where >>>>> message is suspected to be not have been delivered. >>>>> >>>>> I hope this helps. >>>>> >>>>> >>>>> >>>>> >>>>> On Fri, Nov 1, 2013 at 8:35 AM, Imran Rashid <[email protected]>wrote: >>>>> >>>>>> >>>>>> unfortunately that change wasn't the silver bullet I was hoping for. >>>>>> Even with >>>>>> 1) ignoring DisassociatedEvent >>>>>> 2) executor uses ReliableProxy to send messages back to driver >>>>>> 3) turn up akka.remote.watch-failure-detector.threshold=12 >>>>>> >>>>>> >>>>>> there is a lot of weird behavior. First, there are a few >>>>>> DisassociatedEvents, but some that are followed by AssociatedEvents, so >>>>>> that seems ok. But sometimes the re-associations are immediately >>>>>> followed >>>>>> by this: >>>>>> >>>>>> 13/10/31 18:51:10 INFO executor.StandaloneExecutorBackend: got >>>>>> lifecycleevent: AssociationError >>>>>> [akka.tcp://sparkExecutor@<executor>:41441] >>>>>> -> [akka.tcp://spark@<driver>:41321]: Error [Invalid address: >>>>>> akka.tcp://spark@<driver>:41321] [ >>>>>> akka.remote.InvalidAssociation: Invalid address: >>>>>> akka.tcp://spark@<driver>:41321 >>>>>> Caused by: >>>>>> akka.remote.transport.Transport$InvalidAssociationException: The remote >>>>>> system has quarantined this system. No further associations to the remote >>>>>> system are possible until this system is restarted. >>>>>> ] >>>>>> >>>>>> On the driver, there are messages like: >>>>>> >>>>>> [INFO] [10/31/2013 18:51:07.838] >>>>>> [spark-akka.actor.default-dispatcher-3] [Remoting] Address [ >>>>>> akka.tcp://sparkExecutor@<executor>:46123] is now quarantined, all >>>>>> messages to this address will be delivered to dead letters. >>>>>> [WARN] [10/31/2013 18:51:10.845] >>>>>> [spark-akka.actor.default-dispatcher-20] >>>>>> [akka://spark/system/remote-watcher] >>>>>> Detected unreachable: [akka.tcp://sparkExecutor@<executor>:41441] >>>>>> >>>>>> >>>>>> and when the driver does decide that the executor has been >>>>>> terminated, it removes the executor, but doesn't start another one. >>>>>> >>>>>> there are a ton of messages also about messages to the block manager >>>>>> master ... I'm wondering if there are other parts of the system that need >>>>>> to use a reliable proxy (or some sort of acknowledgement). >>>>>> >>>>>> I really don't think this was working properly even w/ previous >>>>>> versions of spark / akka. I'm still learning about akka, but I think you >>>>>> always need an ack to be confident w/ remote communicate. Perhaps the >>>>>> old >>>>>> version of akka just had more robust defaults or something, but I bet it >>>>>> could still have the same problems. Even before, I have seen the driver >>>>>> thinking there were running tasks, but nothing happening on any executor >>>>>> -- >>>>>> it was just rare enough (and hard to reproduce) that I never bothered >>>>>> looking into it more. >>>>>> >>>>>> I will keep digging ... >>>>>> >>>>>> On Thu, Oct 31, 2013 at 4:36 PM, Matei Zaharia < >>>>>> [email protected]> wrote: >>>>>> >>>>>>> BTW the problem might be the Akka failure detector settings that >>>>>>> seem new in 2.2: >>>>>>> http://doc.akka.io/docs/akka/2.2.3/scala/remoting.html >>>>>>> >>>>>>> Their timeouts seem pretty aggressive by default — around 10 >>>>>>> seconds. This can easily be too little if you have large garbage >>>>>>> collections. We should make sure they are higher than our own node >>>>>>> failure >>>>>>> detection timeouts. >>>>>>> >>>>>>> Matei >>>>>>> >>>>>>> On Oct 31, 2013, at 1:33 PM, Imran Rashid <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>> pretty sure I found the problem -- two problems actually. And I >>>>>>> think one of them has been a general lurking problem w/ spark for a >>>>>>> while. >>>>>>> >>>>>>> 1) we should ignore disassociation events, as you suggested >>>>>>> earlier. They seem to just indicate a temporary problem, and can >>>>>>> generally >>>>>>> be ignored. I've found that they're regularly followed by >>>>>>> AssociatedEvents, and it seems communication really works fine at that >>>>>>> point. >>>>>>> >>>>>>> 2) Task finished messages get lost. When this message gets sent, we >>>>>>> dont' know it actually gets there: >>>>>>> >>>>>>> >>>>>>> https://github.com/apache/incubator-spark/blob/scala-2.10/core/src/main/scala/org/apache/spark/executor/StandaloneExecutorBackend.scala#L90 >>>>>>> >>>>>>> (this is so incredible, I feel I must be overlooking something -- >>>>>>> but there is no ack somewhere else that I'm overlooking, is there??) >>>>>>> So, >>>>>>> after the patch, spark wasn't hanging b/c of the unhandled >>>>>>> DisassociatedEvent. It hangs b/c the executor has sent some >>>>>>> taskFinished >>>>>>> messages that never get received by the driver. So the driver is >>>>>>> waiting >>>>>>> for some tasks to finish, but the executors think they are all done. >>>>>>> >>>>>>> I'm gonna add the reliable proxy pattern for this particular >>>>>>> interaction and see if its fixes the problem >>>>>>> >>>>>>> http://doc.akka.io/docs/akka/2.2.3/contrib/reliable-proxy.html#introducing-the-reliable-proxy >>>>>>> >>>>>>> imran >>>>>>> >>>>>>> >>>>> >>>>> >>>>> -- >>>>> It's just about how deep your longing is! >>>>> >>>>> >>>>> >>>> >>>> >>>> -- >>>> It's just about how deep your longing is! >>>> >>>> >> >> > > > -- > s > > > -- s
