Prashant, the problem seems to be that messages sent while we’re disassociated are lost. I think we’d have to just prevent disassociation altogether, or replace all remote actor refs with the reliable proxies (which sounds painful).
Matei On Nov 1, 2013, at 7:53 PM, Prashant Sharma <[email protected]> wrote: > Hey Matei and Imran, > > I think may be we can solve the problem without downgrading to 2.1.0 may by > capturing dissociation and then setting a timeout if it associates again we > keep moving else we shutdown the executor. This timeout can ofcourse be > configurable. > > Thoughts ? > > > On Sat, Nov 2, 2013 at 3:29 AM, Matei Zaharia <[email protected]> wrote: > Hey Imran, > > Good to know that Akka 2.1 handles this — that at least will give us a start. > > In the old code, executors certainly did get flagged as “down” occasionally, > but that was due to a timeout we controlled (we keep sending heartbeats back > and forth to track them). The timeout used to be smaller and usually the > reason to exceed it was GC. However, if Akka 2.2 can sometimes drop the > connections itself, this is a problem and we either have to use the reliable > proxies for everything or see if we can configure it otherwise. Anyway, we’ll > definitely look into it. > > Matei > > On Nov 1, 2013, at 1:09 PM, Imran Rashid <[email protected]> wrote: > >> I downgraded spark to akka 2.1.0, and everything seems to work now. I'm >> going to run my tests a few more times , but I'd really have expected to see >> a failure by now w/ the 2.2.3 version. >> >> I'll submit a patch shortly (need to fix some compile errors in streaming >> still). >> >> Matei -- I think I realize now that when you were talking about the >> expectation of a tcp connection staying alive, you were explaining why this >> is *not* a bug in the current release. You wouldn't end up in a situation >> where the executor thinks it finished the task, but the driver doesn't know >> about it, b/c if the connection dies, the executor wil get restarted. That >> makes sense. But, it seems like if we upgrade to akka 2.2.x, a lot of >> things change. I was probably wrong about seeing that problem in previous >> releases -- it was just a vague recollection, which fit my current theories, >> so I jumped to conclusions. >> >> thanks everyone >> >> >> >> On Fri, Nov 1, 2013 at 9:27 AM, Imran Rashid <[email protected]> wrote: >> thanks everyone for all of the input. >> >> Matei: makes a lot more sense with your explanation of spark's expected >> behavior of tcp, I can see why this makes sense now. But, to show my total >> ignorance here, I'm wondering that when the connection does break, are you >> sure all of your messages that you thought you sent before the break were >> received? I'm guessing that you don't. Which is fine, if the response to >> that is to have the executor just die completely, and restart. that was the >> behavior I was initially observing with the code on the 2.10 branch, where >> the executor handles a DisassociatedEvent explicitly, and dies. >> >> But -- is that the behavior we want? do we want it to be robust to tcp >> connections breaking, without having to completely restart the executor? >> you might say that dying & restarting will lead to correct behavior, even if >> its inefficient. But sometimes, I've seen restarts so frequently that no >> progress is made. >> >> I don't see why this changed w/ the different versions of akka -- I don't >> see any relevant configuration settings that would change how "strongly" tcp >> tries to keep the connection alive, but I may be missing something. But it >> does seem like the netty configuration options have changed completely >> between the two versions: >> http://doc.akka.io/docs/akka/2.2.3/scala/remoting.html#Remote_Configuration >> vs >> http://doc.akka.io/docs/akka/2.0.5/scala/remoting.html >> >> btw, akka 2.1.0 also has been built for scala 2.10: >> http://search.maven.org/#artifactdetails|com.typesafe.akka|akka-remote_2.10|2.1.0|bundle >> and its netty configuration is closer to 2.0.5: >> http://doc.akka.io/docs/akka/2.1.0/scala/remoting.html >> >> perhaps someone more knowledge then me about netty & tcp can look through >> the changes and decide what the right changes are. >> >> Prashant said: >> >Before we conclude something about reliable messaging, I want you to for >> >once consider other possibilities like >actual network reconnection and may >> >be a GC pause ? Try connecting something like jconsole (or alike ) and >see >> >what happens on the driver and executor. >> > >> >My doubt are since we are using standalone mode where even master and >> >worker are also actors then if we see >a weird behaviour on the executor >> >and driver then Why not on master and worker too ? They should also break >> >>away from each other. For this reason, I am doubting our conclusions and >> >may be if we narrow down the >problem first before we conclude something. >> >It is a regression in akka 2.2.3 it uses more memory than it used to >be in >> >2.1.x. >> >See https://github.com/akka/akka/issues/1810 >> >> >> Well, there could easily be the same problem with dropped connections >> between master & worker -- they just communicate so little, it doesn't >> really matter. The odds that a message gets dropped between them is very >> low, only because there are barely any messages. >> >> I completely agree that the problem could be because of a contention, or gc >> pause, etc. In fact, I'm only giving spark 24 out of 32 cores available on >> each box, and 90g out of 125g memory. I've looked at gc a little with >> jstat, and I did see some gc pauses but nothing ridiculous. >> >> But, I think the question remains. Suppose it is gc pauses, etc. that cause >> the disassociation events; what do we do to fix it? How can we diagnose the >> problem, and figure out which of the configuration variables to tune? >> clearly, there *will be* long gc pauses, and the networking layer needs to >> be able to deal with them. >> >> still I understand your desire to see if that might be the cause of the >> problem in this particular case, so I will dig a little more. >> >> >> (btw, should I move this thread to the dev list now? it is getting into the >> nitty-gritty of implementation ...) >> >> On Fri, Nov 1, 2013 at 1:15 AM, Matei Zaharia <[email protected]> >> wrote: >> Yes, so far they’ve been built on that assumption — not that Akka would >> *guarantee* delivery in that as soon as the send() call returns you know >> it’s delivered, but that Akka would act the same way as a TCP socket, >> allowing you to send a stream of messages in order and hear when the >> connection breaks. Maybe that isn’t what they want to provide, but I'd find >> it weird, because it’s very easy to write a server with this property. >> >> Matei >> >> On Oct 31, 2013, at 9:58 PM, Sriram Ramachandrasekaran >> <[email protected]> wrote: >> >>> Sorry if I my understanding is wrong. May be, for this particular case it >>> might be something to do with the load/network, but, in general, are you >>> saying that, we build these communication channels(block manager >>> communication, task events communication, etc) assuming akka would take >>> care of it? I somehow feel that, it's being overly optimistic. Correct me >>> if I am wrong. >>> >>> >>> >>> On Fri, Nov 1, 2013 at 10:08 AM, Matei Zaharia <[email protected]> >>> wrote: >>> It’s true that Akka’s delivery guarantees are in general at-most-once, but >>> if you look at the text there it says that they differ by transport. In the >>> previous version, I’m quite sure that except maybe in very rare >>> circumstances or cases where we had a bug, Akka’s remote layer always kept >>> connections up between each pair of hosts. So the guarantee was that as >>> long as you haven’t received a “disconnected” event, your messages are >>> being delivered, though of course when you do receive that event you don’t >>> know which messages have really made it through unless you acked them. But >>> that didn’t matter for our use case — from our point of view an executor >>> was either up or down. >>> >>> For this reason I still think it should be possible to configure Akka to do >>> the same on 2.2. Most likely some timeouts just got lower. With large heaps >>> you can easily get a GC pause of 60 seconds, so these timeouts should be in >>> the minutes. >>> >>> If for some reason this isn’t the case, then we have a bigger problem — >>> there are *lots* of messages beyond task-finished that need to be sent >>> reliably, including things like block manager events (a block was added / >>> removed on this node) and commands to tell the block manager to drop data. >>> It would be silly to implement acks at the application level for all these. >>> But I doubt this is the case. Prashant’s observation that the standalone >>> cluster manager stayed up is a further sign that this might be due to GC. >>> >>> Matei >>> >>> On Oct 31, 2013, at 9:11 PM, Sriram Ramachandrasekaran >>> <[email protected]> wrote: >>> >>>> Hi Imran, >>>> Just to add, we've noticed dis-associations in a couple projects that we >>>> built(using akka 2.2.x not spark). We went to some details to find out >>>> what was happening. As Matei, suggested, Akka keeps the TCP connection >>>> open and uses that to talk to peers. We noticed that in our case, >>>> initially, we were seeing dis-associations generally at the end of >>>> keep-alive duration. So, when the keep-alive duration ends, at the TCP >>>> layer, a keep-alive probe gets sent to inform the peer on the other side >>>> that the connection is still alive/valid. For some reason, the probe dint >>>> renew the keep-alive connection and we saw a lot of dis-associations >>>> during that time. Later, we realized this was not a pattern either. This >>>> thread contains the full history of our discussions with the Akka team. >>>> It's still open and unclear as to what was causing it for our case. >>>> We tried tweaking various settings of akka(wrt heartbeats, failure >>>> detector, even plugged-in our own failure detector with no effect). >>>> >>>> Imran - Just to clarify your point on message delivery - akka's message >>>> delivery policy is at-most-once. However, there's no guarantee for a >>>> message to be delivered to a peer. The documentation clearly explains >>>> that. >>>> http://doc.akka.io/docs/akka/2.0.2/general/message-send-semantics.html. >>>> It's the responsibility of the application developer to handle cases where >>>> message is suspected to be not have been delivered. >>>> >>>> I hope this helps. >>>> >>>> >>>> >>>> >>>> On Fri, Nov 1, 2013 at 8:35 AM, Imran Rashid <[email protected]> wrote: >>>> >>>> unfortunately that change wasn't the silver bullet I was hoping for. Even >>>> with >>>> 1) ignoring DisassociatedEvent >>>> 2) executor uses ReliableProxy to send messages back to driver >>>> 3) turn up akka.remote.watch-failure-detector.threshold=12 >>>> >>>> >>>> there is a lot of weird behavior. First, there are a few >>>> DisassociatedEvents, but some that are followed by AssociatedEvents, so >>>> that seems ok. But sometimes the re-associations are immediately followed >>>> by this: >>>> >>>> 13/10/31 18:51:10 INFO executor.StandaloneExecutorBackend: got >>>> lifecycleevent: AssociationError >>>> [akka.tcp://sparkExecutor@<executor>:41441] -> >>>> [akka.tcp://spark@<driver>:41321]: Error [Invalid address: >>>> akka.tcp://spark@<driver>:41321] [ >>>> akka.remote.InvalidAssociation: Invalid address: >>>> akka.tcp://spark@<driver>:41321 >>>> Caused by: akka.remote.transport.Transport$InvalidAssociationException: >>>> The remote system has quarantined this system. No further associations to >>>> the remote system are possible until this system is restarted. >>>> ] >>>> >>>> On the driver, there are messages like: >>>> >>>> [INFO] [10/31/2013 18:51:07.838] [spark-akka.actor.default-dispatcher-3] >>>> [Remoting] Address [akka.tcp://sparkExecutor@<executor>:46123] is now >>>> quarantined, all messages to this address will be delivered to dead >>>> letters. >>>> [WARN] [10/31/2013 18:51:10.845] [spark-akka.actor.default-dispatcher-20] >>>> [akka://spark/system/remote-watcher] Detected unreachable: >>>> [akka.tcp://sparkExecutor@<executor>:41441] >>>> >>>> >>>> and when the driver does decide that the executor has been terminated, it >>>> removes the executor, but doesn't start another one. >>>> >>>> there are a ton of messages also about messages to the block manager >>>> master ... I'm wondering if there are other parts of the system that need >>>> to use a reliable proxy (or some sort of acknowledgement). >>>> >>>> I really don't think this was working properly even w/ previous versions >>>> of spark / akka. I'm still learning about akka, but I think you always >>>> need an ack to be confident w/ remote communicate. Perhaps the old >>>> version of akka just had more robust defaults or something, but I bet it >>>> could still have the same problems. Even before, I have seen the driver >>>> thinking there were running tasks, but nothing happening on any executor >>>> -- it was just rare enough (and hard to reproduce) that I never bothered >>>> looking into it more. >>>> >>>> I will keep digging ... >>>> >>>> On Thu, Oct 31, 2013 at 4:36 PM, Matei Zaharia <[email protected]> >>>> wrote: >>>> BTW the problem might be the Akka failure detector settings that seem new >>>> in 2.2: http://doc.akka.io/docs/akka/2.2.3/scala/remoting.html >>>> >>>> Their timeouts seem pretty aggressive by default — around 10 seconds. This >>>> can easily be too little if you have large garbage collections. We should >>>> make sure they are higher than our own node failure detection timeouts. >>>> >>>> Matei >>>> >>>> On Oct 31, 2013, at 1:33 PM, Imran Rashid <[email protected]> wrote: >>>> >>>>> pretty sure I found the problem -- two problems actually. And I think >>>>> one of them has been a general lurking problem w/ spark for a while. >>>>> >>>>> 1) we should ignore disassociation events, as you suggested earlier. >>>>> They seem to just indicate a temporary problem, and can generally be >>>>> ignored. I've found that they're regularly followed by AssociatedEvents, >>>>> and it seems communication really works fine at that point. >>>>> >>>>> 2) Task finished messages get lost. When this message gets sent, we >>>>> dont' know it actually gets there: >>>>> >>>>> https://github.com/apache/incubator-spark/blob/scala-2.10/core/src/main/scala/org/apache/spark/executor/StandaloneExecutorBackend.scala#L90 >>>>> >>>>> (this is so incredible, I feel I must be overlooking something -- but >>>>> there is no ack somewhere else that I'm overlooking, is there??) So, >>>>> after the patch, spark wasn't hanging b/c of the unhandled >>>>> DisassociatedEvent. It hangs b/c the executor has sent some taskFinished >>>>> messages that never get received by the driver. So the driver is waiting >>>>> for some tasks to finish, but the executors think they are all done. >>>>> >>>>> I'm gonna add the reliable proxy pattern for this particular interaction >>>>> and see if its fixes the problem >>>>> http://doc.akka.io/docs/akka/2.2.3/contrib/reliable-proxy.html#introducing-the-reliable-proxy >>>>> >>>>> imran >>>> >>>> >>>> >>>> -- >>>> It's just about how deep your longing is! >>> >>> >>> >>> >>> -- >>> It's just about how deep your longing is! >> > > > > > -- > s
