It’s true that Akka’s delivery guarantees are in general at-most-once, but if 
you look at the text there it says that they differ by transport. In the 
previous version, I’m quite sure that except maybe in very rare circumstances 
or cases where we had a bug, Akka’s remote layer always kept connections up 
between each pair of hosts. So the guarantee was that as long as you haven’t 
received a “disconnected” event, your messages are being delivered, though of 
course when you do receive that event you don’t know which messages have really 
made it through unless you acked them. But that didn’t matter for our use case 
— from our point of view an executor was either up or down.

For this reason I still think it should be possible to configure Akka to do the 
same on 2.2. Most likely some timeouts just got lower. With large heaps you can 
easily get a GC pause of 60 seconds, so these timeouts should be in the minutes.

If for some reason this isn’t the case, then we have a bigger problem — there 
are *lots* of messages beyond task-finished that need to be sent reliably, 
including things like block manager events (a block was added / removed on this 
node) and commands to tell the block manager to drop data. It would be silly to 
implement acks at the application level for all these. But I doubt this is the 
case. Prashant’s observation that the standalone cluster manager stayed up is a 
further sign that this might be due to GC.

Matei

On Oct 31, 2013, at 9:11 PM, Sriram Ramachandrasekaran <[email protected]> 
wrote:

> Hi Imran,
> Just to add, we've noticed dis-associations in a couple projects that we 
> built(using akka 2.2.x not spark). We went to some details to find out what 
> was happening. As Matei, suggested, Akka keeps the TCP connection open and 
> uses that to talk to peers. We noticed that in our case, initially, we were 
> seeing dis-associations generally at the end of keep-alive duration. So, when 
> the keep-alive duration ends, at the TCP layer, a keep-alive probe gets sent 
> to inform the peer on the other side that the connection is still 
> alive/valid. For some reason, the probe dint renew the keep-alive connection 
> and we saw a lot of dis-associations during that time. Later, we realized 
> this was not a pattern either. This thread contains the full history of our 
> discussions with the Akka team. It's still open and unclear as to what was 
> causing it for our case. 
> We tried tweaking various settings of akka(wrt heartbeats, failure detector, 
> even plugged-in our own failure detector with no effect).
> 
> Imran - Just to clarify your point on message delivery - akka's message 
> delivery policy is at-most-once. However, there's no guarantee for a message 
> to be delivered to a peer. The documentation clearly explains that. 
> http://doc.akka.io/docs/akka/2.0.2/general/message-send-semantics.html. It's 
> the responsibility of the application developer to handle cases where message 
> is suspected to be not have been delivered. 
> 
> I hope this helps.
> 
> 
> 
> 
> On Fri, Nov 1, 2013 at 8:35 AM, Imran Rashid <[email protected]> wrote:
> 
> unfortunately that change wasn't the silver bullet I was hoping for.  Even 
> with
> 1) ignoring DisassociatedEvent
> 2) executor uses ReliableProxy to send messages back to driver
> 3) turn up akka.remote.watch-failure-detector.threshold=12
> 
> 
> there is a lot of weird behavior.  First, there are a few 
> DisassociatedEvents, but some that are followed by AssociatedEvents, so that 
> seems ok.  But sometimes the re-associations are immediately followed by this:
> 
> 13/10/31 18:51:10 INFO executor.StandaloneExecutorBackend: got 
> lifecycleevent: AssociationError [akka.tcp://sparkExecutor@<executor>:41441] 
> -> [akka.tcp://spark@<driver>:41321]: Error [Invalid address: 
> akka.tcp://spark@<driver>:41321] [
> akka.remote.InvalidAssociation: Invalid address: 
> akka.tcp://spark@<driver>:41321
> Caused by: akka.remote.transport.Transport$InvalidAssociationException: The 
> remote system has quarantined this system. No further associations to the 
> remote system are possible until this system is restarted.
> ]
> 
> On the driver, there are messages like:
> 
> [INFO] [10/31/2013 18:51:07.838] [spark-akka.actor.default-dispatcher-3] 
> [Remoting] Address [akka.tcp://sparkExecutor@<executor>:46123] is now 
> quarantined, all messages to this address will be delivered to dead letters.
> [WARN] [10/31/2013 18:51:10.845] [spark-akka.actor.default-dispatcher-20] 
> [akka://spark/system/remote-watcher] Detected unreachable: 
> [akka.tcp://sparkExecutor@<executor>:41441]
> 
> 
> and when the driver does decide that the executor has been terminated, it 
> removes the executor, but doesn't start another one.
> 
> there are a ton of messages also about messages to the block manager master 
> ... I'm wondering if there are other parts of the system that need to use a 
> reliable proxy (or some sort of acknowledgement).
> 
> I really don't think this was working properly even w/ previous versions of 
> spark / akka.  I'm still learning about akka, but I think you always need an 
> ack to be confident w/ remote communicate.  Perhaps the old version of akka 
> just had more robust defaults or something, but I bet it could still have the 
> same problems.  Even before, I have seen the driver thinking there were 
> running tasks, but nothing happening on any executor -- it was just rare 
> enough (and hard to reproduce) that I never bothered looking into it more.
> 
> I will keep digging ...
> 
> On Thu, Oct 31, 2013 at 4:36 PM, Matei Zaharia <[email protected]> 
> wrote:
> BTW the problem might be the Akka failure detector settings that seem new in 
> 2.2: http://doc.akka.io/docs/akka/2.2.3/scala/remoting.html
> 
> Their timeouts seem pretty aggressive by default — around 10 seconds. This 
> can easily be too little if you have large garbage collections. We should 
> make sure they are higher than our own node failure detection timeouts.
> 
> Matei
> 
> On Oct 31, 2013, at 1:33 PM, Imran Rashid <[email protected]> wrote:
> 
>> pretty sure I found the problem -- two problems actually.  And I think one 
>> of them has been a general lurking problem w/ spark for a while.
>> 
>> 1)  we should ignore disassociation events, as you suggested earlier.  They 
>> seem to just indicate a temporary problem, and can generally be ignored.  
>> I've found that they're regularly followed by AssociatedEvents, and it seems 
>> communication really works fine at that point.
>> 
>> 2) Task finished messages get lost.  When this message gets sent, we dont' 
>> know it actually gets there:
>> 
>> https://github.com/apache/incubator-spark/blob/scala-2.10/core/src/main/scala/org/apache/spark/executor/StandaloneExecutorBackend.scala#L90
>> 
>> (this is so incredible, I feel I must be overlooking something -- but there 
>> is no ack somewhere else that I'm overlooking, is there??)  So, after the 
>> patch, spark wasn't hanging b/c of the unhandled DisassociatedEvent.  It 
>> hangs b/c the executor has sent some taskFinished messages that never get 
>> received by the driver.  So the driver is waiting for some tasks to finish, 
>> but the executors think they are all done.
>> 
>> I'm gonna add the reliable proxy pattern for this particular interaction and 
>> see if its fixes the problem
>> http://doc.akka.io/docs/akka/2.2.3/contrib/reliable-proxy.html#introducing-the-reliable-proxy
>> 
>> imran
> 
> 
> 
> -- 
> It's just about how deep your longing is!

Reply via email to