Sorry if I my understanding is wrong. May be, for this particular case it might be something to do with the load/network, but, in general, are you saying that, we build these communication channels(block manager communication, task events communication, etc) assuming akka would take care of it? I somehow feel that, it's being overly optimistic. Correct me if I am wrong.
On Fri, Nov 1, 2013 at 10:08 AM, Matei Zaharia <[email protected]>wrote: > It’s true that Akka’s delivery guarantees are in general at-most-once, but > if you look at the text there it says that they differ by transport. In the > previous version, I’m quite sure that except maybe in very rare > circumstances or cases where we had a bug, Akka’s remote layer always kept > connections up between each pair of hosts. So the guarantee was that as > long as you haven’t received a “disconnected” event, your messages are > being delivered, though of course when you do receive that event you don’t > know which messages have really made it through unless you acked them. But > that didn’t matter for our use case — from our point of view an executor > was either up or down. > > For this reason I still think it should be possible to configure Akka to > do the same on 2.2. Most likely some timeouts just got lower. With large > heaps you can easily get a GC pause of 60 seconds, so these timeouts should > be in the minutes. > > If for some reason this isn’t the case, then we have a bigger problem — > there are *lots* of messages beyond task-finished that need to be sent > reliably, including things like block manager events (a block was added / > removed on this node) and commands to tell the block manager to drop data. > It would be silly to implement acks at the application level for all these. > But I doubt this is the case. Prashant’s observation that the standalone > cluster manager stayed up is a further sign that this might be due to GC. > > Matei > > On Oct 31, 2013, at 9:11 PM, Sriram Ramachandrasekaran < > [email protected]> wrote: > > Hi Imran, > Just to add, we've noticed dis-associations in a couple projects that we > built(using akka 2.2.x not spark). We went to some details to find out what > was happening. As Matei, suggested, Akka keeps the TCP connection open and > uses that to talk to peers. We noticed that in our case, initially, we were > seeing dis-associations generally at the end of keep-alive duration. So, > when the keep-alive duration ends, at the TCP layer, a keep-alive probe > gets sent to inform the peer on the other side that the connection is still > alive/valid. For some reason, the probe dint renew the keep-alive > connection and we saw a lot of dis-associations during that time. Later, we > realized this was not a pattern either. This > thread<https://groups.google.com/forum/#!msg/akka-user/RYxaPl_nby4/1USHDFIRgOkJ>contains > the full history of our discussions with the Akka team. It's still > open and unclear as to what was causing it for our case. > We tried tweaking various settings of akka(wrt heartbeats, failure > detector, even plugged-in our own failure detector with no effect). > > Imran - Just to clarify your point on message delivery - akka's message > delivery policy is at-most-once. However, there's no guarantee for a > message to be delivered to a peer. The documentation clearly explains that. > http://doc.akka.io/docs/akka/2.0.2/general/message-send-semantics.html. It's > the responsibility of the application developer to handle cases where > message is suspected to be not have been delivered. > > I hope this helps. > > > > > On Fri, Nov 1, 2013 at 8:35 AM, Imran Rashid <[email protected]> wrote: > >> >> unfortunately that change wasn't the silver bullet I was hoping for. >> Even with >> 1) ignoring DisassociatedEvent >> 2) executor uses ReliableProxy to send messages back to driver >> 3) turn up akka.remote.watch-failure-detector.threshold=12 >> >> >> there is a lot of weird behavior. First, there are a few >> DisassociatedEvents, but some that are followed by AssociatedEvents, so >> that seems ok. But sometimes the re-associations are immediately followed >> by this: >> >> 13/10/31 18:51:10 INFO executor.StandaloneExecutorBackend: got >> lifecycleevent: AssociationError [akka.tcp://sparkExecutor@<executor>:41441] >> -> [akka.tcp://spark@<driver>:41321]: Error [Invalid address: >> akka.tcp://spark@<driver>:41321] [ >> akka.remote.InvalidAssociation: Invalid address: >> akka.tcp://spark@<driver>:41321 >> Caused by: akka.remote.transport.Transport$InvalidAssociationException: >> The remote system has quarantined this system. No further associations to >> the remote system are possible until this system is restarted. >> ] >> >> On the driver, there are messages like: >> >> [INFO] [10/31/2013 18:51:07.838] [spark-akka.actor.default-dispatcher-3] >> [Remoting] Address [akka.tcp://sparkExecutor@<executor>:46123] is now >> quarantined, all messages to this address will be delivered to dead letters. >> [WARN] [10/31/2013 18:51:10.845] [spark-akka.actor.default-dispatcher-20] >> [akka://spark/system/remote-watcher] Detected unreachable: >> [akka.tcp://sparkExecutor@<executor>:41441] >> >> >> and when the driver does decide that the executor has been terminated, it >> removes the executor, but doesn't start another one. >> >> there are a ton of messages also about messages to the block manager >> master ... I'm wondering if there are other parts of the system that need >> to use a reliable proxy (or some sort of acknowledgement). >> >> I really don't think this was working properly even w/ previous versions >> of spark / akka. I'm still learning about akka, but I think you always >> need an ack to be confident w/ remote communicate. Perhaps the old version >> of akka just had more robust defaults or something, but I bet it could >> still have the same problems. Even before, I have seen the driver thinking >> there were running tasks, but nothing happening on any executor -- it was >> just rare enough (and hard to reproduce) that I never bothered looking into >> it more. >> >> I will keep digging ... >> >> On Thu, Oct 31, 2013 at 4:36 PM, Matei Zaharia >> <[email protected]>wrote: >> >>> BTW the problem might be the Akka failure detector settings that seem >>> new in 2.2: http://doc.akka.io/docs/akka/2.2.3/scala/remoting.html >>> >>> Their timeouts seem pretty aggressive by default — around 10 seconds. >>> This can easily be too little if you have large garbage collections. We >>> should make sure they are higher than our own node failure detection >>> timeouts. >>> >>> Matei >>> >>> On Oct 31, 2013, at 1:33 PM, Imran Rashid <[email protected]> wrote: >>> >>> pretty sure I found the problem -- two problems actually. And I think >>> one of them has been a general lurking problem w/ spark for a while. >>> >>> 1) we should ignore disassociation events, as you suggested earlier. >>> They seem to just indicate a temporary problem, and can generally be >>> ignored. I've found that they're regularly followed by AssociatedEvents, >>> and it seems communication really works fine at that point. >>> >>> 2) Task finished messages get lost. When this message gets sent, we >>> dont' know it actually gets there: >>> >>> >>> https://github.com/apache/incubator-spark/blob/scala-2.10/core/src/main/scala/org/apache/spark/executor/StandaloneExecutorBackend.scala#L90 >>> >>> (this is so incredible, I feel I must be overlooking something -- but >>> there is no ack somewhere else that I'm overlooking, is there??) So, after >>> the patch, spark wasn't hanging b/c of the unhandled DisassociatedEvent. >>> It hangs b/c the executor has sent some taskFinished messages that never >>> get received by the driver. So the driver is waiting for some tasks to >>> finish, but the executors think they are all done. >>> >>> I'm gonna add the reliable proxy pattern for this particular interaction >>> and see if its fixes the problem >>> >>> http://doc.akka.io/docs/akka/2.2.3/contrib/reliable-proxy.html#introducing-the-reliable-proxy >>> >>> imran >>> >>> > > > -- > It's just about how deep your longing is! > > > -- It's just about how deep your longing is!
