We can set timeouts high enough ! same as connection timeout that we already set.
On Wed, Nov 13, 2013 at 11:37 PM, Matei Zaharia <[email protected]>wrote: > Hey Prashant, do messages still get lost while we’re dissociated? Or can > you set the timeouts high enough to proven that? > > Matei > > On Nov 13, 2013, at 12:39 AM, Prashant Sharma <[email protected]> > wrote: > > We may no longer need to track disassociation and IMHO use the *improved* > feature in akka 2.2.x called remote death watch. Which lets us acknowledge > a remote death both in case of a natural demise and accidental deaths. This > was not the case with remote death watch in previous akka releases. Please > take a closer look at the patch at > https://github.com/apache/incubator-spark/pull/163 and let us know. > > This patch does not make disassociation disappear, they are added to akka > as such but gives us sufficient knobs to tune things as to when they occur. > Don't forget to tune those extra properties apart from other timeouts. > > > > > > On Sat, Nov 2, 2013 at 11:08 AM, Matei Zaharia <[email protected]>wrote: > >> Prashant, the problem seems to be that messages sent while we’re >> disassociated are lost. I think we’d have to just prevent disassociation >> altogether, or replace all remote actor refs with the reliable proxies >> (which sounds painful). >> >> Matei >> >> On Nov 1, 2013, at 7:53 PM, Prashant Sharma <[email protected]> wrote: >> >> Hey Matei and Imran, >> >> I think may be we can solve the problem without downgrading to 2.1.0 may >> by capturing dissociation and then setting a timeout if it associates again >> we keep moving else we shutdown the executor. This timeout can ofcourse be >> configurable. >> >> Thoughts ? >> >> >> On Sat, Nov 2, 2013 at 3:29 AM, Matei Zaharia <[email protected]>wrote: >> >>> Hey Imran, >>> >>> Good to know that Akka 2.1 handles this — that at least will give us a >>> start. >>> >>> In the old code, executors certainly did get flagged as “down” >>> occasionally, but that was due to a timeout we controlled (we keep sending >>> heartbeats back and forth to track them). The timeout used to be smaller >>> and usually the reason to exceed it was GC. However, if Akka 2.2 can >>> sometimes drop the connections itself, this is a problem and we either have >>> to use the reliable proxies for everything or see if we can configure it >>> otherwise. Anyway, we’ll definitely look into it. >>> >>> Matei >>> >>> On Nov 1, 2013, at 1:09 PM, Imran Rashid <[email protected]> wrote: >>> >>> I downgraded spark to akka 2.1.0, and everything seems to work now. I'm >>> going to run my tests a few more times , but I'd really have expected to >>> see a failure by now w/ the 2.2.3 version. >>> >>> I'll submit a patch shortly (need to fix some compile errors in >>> streaming still). >>> >>> Matei -- I think I realize now that when you were talking about the >>> expectation of a tcp connection staying alive, you were explaining why this >>> is *not* a bug in the current release. You wouldn't end up in a situation >>> where the executor thinks it finished the task, but the driver doesn't know >>> about it, b/c if the connection dies, the executor wil get restarted. That >>> makes sense. But, it seems like if we upgrade to akka 2.2.x, a lot of >>> things change. I was probably wrong about seeing that problem in previous >>> releases -- it was just a vague recollection, which fit my current >>> theories, so I jumped to conclusions. >>> >>> thanks everyone >>> >>> >>> >>> On Fri, Nov 1, 2013 at 9:27 AM, Imran Rashid <[email protected]>wrote: >>> >>>> thanks everyone for all of the input. >>>> >>>> Matei: makes a lot more sense with your explanation of spark's expected >>>> behavior of tcp, I can see why this makes sense now. But, to show my total >>>> ignorance here, I'm wondering that when the connection does break, are you >>>> sure all of your messages that you thought you sent before the break were >>>> received? I'm guessing that you don't. Which is fine, if the response to >>>> that is to have the executor just die completely, and restart. that was >>>> the behavior I was initially observing with the code on the 2.10 branch, >>>> where the executor handles a DisassociatedEvent explicitly, and dies. >>>> >>>> But -- is that the behavior we want? do we want it to be robust to tcp >>>> connections breaking, without having to completely restart the executor? >>>> you might say that dying & restarting will lead to correct behavior, even >>>> if its inefficient. But sometimes, I've seen restarts so frequently that >>>> no progress is made. >>>> >>>> I don't see why this changed w/ the different versions of akka -- I >>>> don't see any relevant configuration settings that would change how >>>> "strongly" tcp tries to keep the connection alive, but I may be missing >>>> something. But it does seem like the netty configuration options have >>>> changed completely between the two versions: >>>> >>>> http://doc.akka.io/docs/akka/2.2.3/scala/remoting.html#Remote_Configuration >>>> vs >>>> http://doc.akka.io/docs/akka/2.0.5/scala/remoting.html >>>> >>>> btw, akka 2.1.0 also has been built for scala 2.10: >>>> >>>> http://search.maven.org/#artifactdetails|com.typesafe.akka|akka-remote_2.10|2.1.0|bundle >>>> and its netty configuration is closer to 2.0.5: >>>> http://doc.akka.io/docs/akka/2.1.0/scala/remoting.html >>>> >>>> perhaps someone more knowledge then me about netty & tcp can look >>>> through the changes and decide what the right changes are. >>>> >>>> Prashant said: >>>> >Before we conclude something about reliable messaging, I want you to >>>> for once consider other possibilities like >actual network reconnection and >>>> may be a GC pause ? Try connecting something like jconsole (or alike ) and >>>> >see what happens on the driver and executor. >>>> > >>>> >My doubt are since we are using standalone mode where even master and >>>> worker are also actors then if we see >a weird behaviour on the executor >>>> and driver then Why not on master and worker too ? They should also break >>>> >away from each other. For this reason, I am doubting our conclusions and >>>> may be if we narrow down the >problem first before we conclude something. >>>> It is a regression in akka 2.2.3 it uses more memory than it used to >be in >>>> 2.1.x. >>>> >See https://github.com/akka/akka/issues/1810 >>>> >>>> >>>> Well, there could easily be the same problem with dropped connections >>>> between master & worker -- they just communicate so little, it doesn't >>>> really matter. The odds that a message gets dropped between them is very >>>> low, only because there are barely any messages. >>>> >>>> I completely agree that the problem could be because of a contention, >>>> or gc pause, etc. In fact, I'm only giving spark 24 out of 32 cores >>>> available on each box, and 90g out of 125g memory. I've looked at gc a >>>> little with jstat, and I did see some gc pauses but nothing ridiculous. >>>> >>>> But, I think the question remains. Suppose it is gc pauses, etc. that >>>> cause the disassociation events; what do we do to fix it? How can we >>>> diagnose the problem, and figure out which of the configuration variables >>>> to tune? clearly, there *will be* long gc pauses, and the networking layer >>>> needs to be able to deal with them. >>>> >>>> still I understand your desire to see if that might be the cause of the >>>> problem in this particular case, so I will dig a little more. >>>> >>>> >>>> (btw, should I move this thread to the dev list now? it is getting >>>> into the nitty-gritty of implementation ...) >>>> >>>> On Fri, Nov 1, 2013 at 1:15 AM, Matei Zaharia >>>> <[email protected]>wrote: >>>> >>>>> Yes, so far they’ve been built on that assumption — not that Akka >>>>> would *guarantee* delivery in that as soon as the send() call returns you >>>>> know it’s delivered, but that Akka would act the same way as a TCP socket, >>>>> allowing you to send a stream of messages in order and hear when the >>>>> connection breaks. Maybe that isn’t what they want to provide, but I'd >>>>> find >>>>> it weird, because it’s very easy to write a server with this property. >>>>> >>>>> Matei >>>>> >>>>> On Oct 31, 2013, at 9:58 PM, Sriram Ramachandrasekaran < >>>>> [email protected]> wrote: >>>>> >>>>> Sorry if I my understanding is wrong. May be, for this particular case >>>>> it might be something to do with the load/network, but, in general, are >>>>> you >>>>> saying that, we build these communication channels(block manager >>>>> communication, task events communication, etc) assuming akka would take >>>>> care of it? I somehow feel that, it's being overly optimistic. Correct me >>>>> if I am wrong. >>>>> >>>>> >>>>> >>>>> On Fri, Nov 1, 2013 at 10:08 AM, Matei Zaharia < >>>>> [email protected]> wrote: >>>>> >>>>>> It’s true that Akka’s delivery guarantees are in general >>>>>> at-most-once, but if you look at the text there it says that they differ >>>>>> by >>>>>> transport. In the previous version, I’m quite sure that except maybe in >>>>>> very rare circumstances or cases where we had a bug, Akka’s remote layer >>>>>> always kept connections up between each pair of hosts. So the guarantee >>>>>> was >>>>>> that as long as you haven’t received a “disconnected” event, your >>>>>> messages >>>>>> are being delivered, though of course when you do receive that event you >>>>>> don’t know which messages have really made it through unless you acked >>>>>> them. But that didn’t matter for our use case — from our point of view an >>>>>> executor was either up or down. >>>>>> >>>>>> For this reason I still think it should be possible to configure Akka >>>>>> to do the same on 2.2. Most likely some timeouts just got lower. With >>>>>> large >>>>>> heaps you can easily get a GC pause of 60 seconds, so these timeouts >>>>>> should >>>>>> be in the minutes. >>>>>> >>>>>> If for some reason this isn’t the case, then we have a bigger problem >>>>>> — there are *lots* of messages beyond task-finished that need to be sent >>>>>> reliably, including things like block manager events (a block was added / >>>>>> removed on this node) and commands to tell the block manager to drop >>>>>> data. >>>>>> It would be silly to implement acks at the application level for all >>>>>> these. >>>>>> But I doubt this is the case. Prashant’s observation that the standalone >>>>>> cluster manager stayed up is a further sign that this might be due to GC. >>>>>> >>>>>> Matei >>>>>> >>>>>> On Oct 31, 2013, at 9:11 PM, Sriram Ramachandrasekaran < >>>>>> [email protected]> wrote: >>>>>> >>>>>> Hi Imran, >>>>>> Just to add, we've noticed dis-associations in a couple projects that >>>>>> we built(using akka 2.2.x not spark). We went to some details to find out >>>>>> what was happening. As Matei, suggested, Akka keeps the TCP connection >>>>>> open >>>>>> and uses that to talk to peers. We noticed that in our case, initially, >>>>>> we >>>>>> were seeing dis-associations generally at the end of keep-alive duration. >>>>>> So, when the keep-alive duration ends, at the TCP layer, a keep-alive >>>>>> probe >>>>>> gets sent to inform the peer on the other side that the connection is >>>>>> still >>>>>> alive/valid. For some reason, the probe dint renew the keep-alive >>>>>> connection and we saw a lot of dis-associations during that time. Later, >>>>>> we >>>>>> realized this was not a pattern either. This >>>>>> thread<https://groups.google.com/forum/#!msg/akka-user/RYxaPl_nby4/1USHDFIRgOkJ>contains >>>>>> the full history of our discussions with the Akka team. It's still >>>>>> open and unclear as to what was causing it for our case. >>>>>> We tried tweaking various settings of akka(wrt heartbeats, failure >>>>>> detector, even plugged-in our own failure detector with no effect). >>>>>> >>>>>> Imran - Just to clarify your point on message delivery - akka's >>>>>> message delivery policy is at-most-once. However, there's no guarantee >>>>>> for >>>>>> a message to be delivered to a peer. The documentation clearly explains >>>>>> that. >>>>>> http://doc.akka.io/docs/akka/2.0.2/general/message-send-semantics.html. >>>>>> It's >>>>>> the responsibility of the application developer to handle cases where >>>>>> message is suspected to be not have been delivered. >>>>>> >>>>>> I hope this helps. >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> On Fri, Nov 1, 2013 at 8:35 AM, Imran Rashid <[email protected]>wrote: >>>>>> >>>>>>> >>>>>>> unfortunately that change wasn't the silver bullet I was hoping >>>>>>> for. Even with >>>>>>> 1) ignoring DisassociatedEvent >>>>>>> 2) executor uses ReliableProxy to send messages back to driver >>>>>>> 3) turn up akka.remote.watch-failure-detector.threshold=12 >>>>>>> >>>>>>> >>>>>>> there is a lot of weird behavior. First, there are a few >>>>>>> DisassociatedEvents, but some that are followed by AssociatedEvents, so >>>>>>> that seems ok. But sometimes the re-associations are immediately >>>>>>> followed >>>>>>> by this: >>>>>>> >>>>>>> 13/10/31 18:51:10 INFO executor.StandaloneExecutorBackend: got >>>>>>> lifecycleevent: AssociationError >>>>>>> [akka.tcp://sparkExecutor@<executor>:41441] >>>>>>> -> [akka.tcp://spark@<driver>:41321]: Error [Invalid address: >>>>>>> akka.tcp://spark@<driver>:41321] [ >>>>>>> akka.remote.InvalidAssociation: Invalid address: >>>>>>> akka.tcp://spark@<driver>:41321 >>>>>>> Caused by: >>>>>>> akka.remote.transport.Transport$InvalidAssociationException: The remote >>>>>>> system has quarantined this system. No further associations to the >>>>>>> remote >>>>>>> system are possible until this system is restarted. >>>>>>> ] >>>>>>> >>>>>>> On the driver, there are messages like: >>>>>>> >>>>>>> [INFO] [10/31/2013 18:51:07.838] >>>>>>> [spark-akka.actor.default-dispatcher-3] [Remoting] Address [ >>>>>>> akka.tcp://sparkExecutor@<executor>:46123] is now quarantined, all >>>>>>> messages to this address will be delivered to dead letters. >>>>>>> [WARN] [10/31/2013 18:51:10.845] >>>>>>> [spark-akka.actor.default-dispatcher-20] >>>>>>> [akka://spark/system/remote-watcher] >>>>>>> Detected unreachable: [akka.tcp://sparkExecutor@<executor>:41441] >>>>>>> >>>>>>> >>>>>>> and when the driver does decide that the executor has been >>>>>>> terminated, it removes the executor, but doesn't start another one. >>>>>>> >>>>>>> there are a ton of messages also about messages to the block manager >>>>>>> master ... I'm wondering if there are other parts of the system that >>>>>>> need >>>>>>> to use a reliable proxy (or some sort of acknowledgement). >>>>>>> >>>>>>> I really don't think this was working properly even w/ previous >>>>>>> versions of spark / akka. I'm still learning about akka, but I think >>>>>>> you >>>>>>> always need an ack to be confident w/ remote communicate. Perhaps the >>>>>>> old >>>>>>> version of akka just had more robust defaults or something, but I bet it >>>>>>> could still have the same problems. Even before, I have seen the driver >>>>>>> thinking there were running tasks, but nothing happening on any >>>>>>> executor -- >>>>>>> it was just rare enough (and hard to reproduce) that I never bothered >>>>>>> looking into it more. >>>>>>> >>>>>>> I will keep digging ... >>>>>>> >>>>>>> On Thu, Oct 31, 2013 at 4:36 PM, Matei Zaharia < >>>>>>> [email protected]> wrote: >>>>>>> >>>>>>>> BTW the problem might be the Akka failure detector settings that >>>>>>>> seem new in 2.2: >>>>>>>> http://doc.akka.io/docs/akka/2.2.3/scala/remoting.html >>>>>>>> >>>>>>>> Their timeouts seem pretty aggressive by default — around 10 >>>>>>>> seconds. This can easily be too little if you have large garbage >>>>>>>> collections. We should make sure they are higher than our own node >>>>>>>> failure >>>>>>>> detection timeouts. >>>>>>>> >>>>>>>> Matei >>>>>>>> >>>>>>>> On Oct 31, 2013, at 1:33 PM, Imran Rashid <[email protected]> >>>>>>>> wrote: >>>>>>>> >>>>>>>> pretty sure I found the problem -- two problems actually. And I >>>>>>>> think one of them has been a general lurking problem w/ spark for a >>>>>>>> while. >>>>>>>> >>>>>>>> 1) we should ignore disassociation events, as you suggested >>>>>>>> earlier. They seem to just indicate a temporary problem, and can >>>>>>>> generally >>>>>>>> be ignored. I've found that they're regularly followed by >>>>>>>> AssociatedEvents, and it seems communication really works fine at that >>>>>>>> point. >>>>>>>> >>>>>>>> 2) Task finished messages get lost. When this message gets sent, >>>>>>>> we dont' know it actually gets there: >>>>>>>> >>>>>>>> >>>>>>>> https://github.com/apache/incubator-spark/blob/scala-2.10/core/src/main/scala/org/apache/spark/executor/StandaloneExecutorBackend.scala#L90 >>>>>>>> >>>>>>>> (this is so incredible, I feel I must be overlooking something -- >>>>>>>> but there is no ack somewhere else that I'm overlooking, is there??) >>>>>>>> So, >>>>>>>> after the patch, spark wasn't hanging b/c of the unhandled >>>>>>>> DisassociatedEvent. It hangs b/c the executor has sent some >>>>>>>> taskFinished >>>>>>>> messages that never get received by the driver. So the driver is >>>>>>>> waiting >>>>>>>> for some tasks to finish, but the executors think they are all done. >>>>>>>> >>>>>>>> I'm gonna add the reliable proxy pattern for this particular >>>>>>>> interaction and see if its fixes the problem >>>>>>>> >>>>>>>> http://doc.akka.io/docs/akka/2.2.3/contrib/reliable-proxy.html#introducing-the-reliable-proxy >>>>>>>> >>>>>>>> imran >>>>>>>> >>>>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> It's just about how deep your longing is! >>>>>> >>>>>> >>>>>> >>>>> >>>>> >>>>> -- >>>>> It's just about how deep your longing is! >>>>> >>>>> >>> >>> >> >> >> -- >> s >> >> >> > > > -- > s > > > -- s
