Yes, so far they’ve been built on that assumption — not that Akka would 
*guarantee* delivery in that as soon as the send() call returns you know it’s 
delivered, but that Akka would act the same way as a TCP socket, allowing you 
to send a stream of messages in order and hear when the connection breaks. 
Maybe that isn’t what they want to provide, but I'd find it weird, because it’s 
very easy to write a server with this property.

Matei

On Oct 31, 2013, at 9:58 PM, Sriram Ramachandrasekaran <sri.ram...@gmail.com> 
wrote:

> Sorry if I my understanding is wrong. May be, for this particular case it 
> might be something to do with the load/network, but, in general, are you 
> saying that, we build these communication channels(block manager 
> communication, task events communication, etc) assuming akka would take care 
> of it? I somehow feel that, it's being overly optimistic. Correct me if I am 
> wrong.
> 
> 
> 
> On Fri, Nov 1, 2013 at 10:08 AM, Matei Zaharia <matei.zaha...@gmail.com> 
> wrote:
> It’s true that Akka’s delivery guarantees are in general at-most-once, but if 
> you look at the text there it says that they differ by transport. In the 
> previous version, I’m quite sure that except maybe in very rare circumstances 
> or cases where we had a bug, Akka’s remote layer always kept connections up 
> between each pair of hosts. So the guarantee was that as long as you haven’t 
> received a “disconnected” event, your messages are being delivered, though of 
> course when you do receive that event you don’t know which messages have 
> really made it through unless you acked them. But that didn’t matter for our 
> use case — from our point of view an executor was either up or down.
> 
> For this reason I still think it should be possible to configure Akka to do 
> the same on 2.2. Most likely some timeouts just got lower. With large heaps 
> you can easily get a GC pause of 60 seconds, so these timeouts should be in 
> the minutes.
> 
> If for some reason this isn’t the case, then we have a bigger problem — there 
> are *lots* of messages beyond task-finished that need to be sent reliably, 
> including things like block manager events (a block was added / removed on 
> this node) and commands to tell the block manager to drop data. It would be 
> silly to implement acks at the application level for all these. But I doubt 
> this is the case. Prashant’s observation that the standalone cluster manager 
> stayed up is a further sign that this might be due to GC.
> 
> Matei
> 
> On Oct 31, 2013, at 9:11 PM, Sriram Ramachandrasekaran <sri.ram...@gmail.com> 
> wrote:
> 
>> Hi Imran,
>> Just to add, we've noticed dis-associations in a couple projects that we 
>> built(using akka 2.2.x not spark). We went to some details to find out what 
>> was happening. As Matei, suggested, Akka keeps the TCP connection open and 
>> uses that to talk to peers. We noticed that in our case, initially, we were 
>> seeing dis-associations generally at the end of keep-alive duration. So, 
>> when the keep-alive duration ends, at the TCP layer, a keep-alive probe gets 
>> sent to inform the peer on the other side that the connection is still 
>> alive/valid. For some reason, the probe dint renew the keep-alive connection 
>> and we saw a lot of dis-associations during that time. Later, we realized 
>> this was not a pattern either. This thread contains the full history of our 
>> discussions with the Akka team. It's still open and unclear as to what was 
>> causing it for our case. 
>> We tried tweaking various settings of akka(wrt heartbeats, failure detector, 
>> even plugged-in our own failure detector with no effect).
>> 
>> Imran - Just to clarify your point on message delivery - akka's message 
>> delivery policy is at-most-once. However, there's no guarantee for a message 
>> to be delivered to a peer. The documentation clearly explains that. 
>> http://doc.akka.io/docs/akka/2.0.2/general/message-send-semantics.html. It's 
>> the responsibility of the application developer to handle cases where 
>> message is suspected to be not have been delivered. 
>> 
>> I hope this helps.
>> 
>> 
>> 
>> 
>> On Fri, Nov 1, 2013 at 8:35 AM, Imran Rashid <im...@quantifind.com> wrote:
>> 
>> unfortunately that change wasn't the silver bullet I was hoping for.  Even 
>> with
>> 1) ignoring DisassociatedEvent
>> 2) executor uses ReliableProxy to send messages back to driver
>> 3) turn up akka.remote.watch-failure-detector.threshold=12
>> 
>> 
>> there is a lot of weird behavior.  First, there are a few 
>> DisassociatedEvents, but some that are followed by AssociatedEvents, so that 
>> seems ok.  But sometimes the re-associations are immediately followed by 
>> this:
>> 
>> 13/10/31 18:51:10 INFO executor.StandaloneExecutorBackend: got 
>> lifecycleevent: AssociationError [akka.tcp://sparkExecutor@<executor>:41441] 
>> -> [akka.tcp://spark@<driver>:41321]: Error [Invalid address: 
>> akka.tcp://spark@<driver>:41321] [
>> akka.remote.InvalidAssociation: Invalid address: 
>> akka.tcp://spark@<driver>:41321
>> Caused by: akka.remote.transport.Transport$InvalidAssociationException: The 
>> remote system has quarantined this system. No further associations to the 
>> remote system are possible until this system is restarted.
>> ]
>> 
>> On the driver, there are messages like:
>> 
>> [INFO] [10/31/2013 18:51:07.838] [spark-akka.actor.default-dispatcher-3] 
>> [Remoting] Address [akka.tcp://sparkExecutor@<executor>:46123] is now 
>> quarantined, all messages to this address will be delivered to dead letters.
>> [WARN] [10/31/2013 18:51:10.845] [spark-akka.actor.default-dispatcher-20] 
>> [akka://spark/system/remote-watcher] Detected unreachable: 
>> [akka.tcp://sparkExecutor@<executor>:41441]
>> 
>> 
>> and when the driver does decide that the executor has been terminated, it 
>> removes the executor, but doesn't start another one.
>> 
>> there are a ton of messages also about messages to the block manager master 
>> ... I'm wondering if there are other parts of the system that need to use a 
>> reliable proxy (or some sort of acknowledgement).
>> 
>> I really don't think this was working properly even w/ previous versions of 
>> spark / akka.  I'm still learning about akka, but I think you always need an 
>> ack to be confident w/ remote communicate.  Perhaps the old version of akka 
>> just had more robust defaults or something, but I bet it could still have 
>> the same problems.  Even before, I have seen the driver thinking there were 
>> running tasks, but nothing happening on any executor -- it was just rare 
>> enough (and hard to reproduce) that I never bothered looking into it more.
>> 
>> I will keep digging ...
>> 
>> On Thu, Oct 31, 2013 at 4:36 PM, Matei Zaharia <matei.zaha...@gmail.com> 
>> wrote:
>> BTW the problem might be the Akka failure detector settings that seem new in 
>> 2.2: http://doc.akka.io/docs/akka/2.2.3/scala/remoting.html
>> 
>> Their timeouts seem pretty aggressive by default — around 10 seconds. This 
>> can easily be too little if you have large garbage collections. We should 
>> make sure they are higher than our own node failure detection timeouts.
>> 
>> Matei
>> 
>> On Oct 31, 2013, at 1:33 PM, Imran Rashid <im...@quantifind.com> wrote:
>> 
>>> pretty sure I found the problem -- two problems actually.  And I think one 
>>> of them has been a general lurking problem w/ spark for a while.
>>> 
>>> 1)  we should ignore disassociation events, as you suggested earlier.  They 
>>> seem to just indicate a temporary problem, and can generally be ignored.  
>>> I've found that they're regularly followed by AssociatedEvents, and it 
>>> seems communication really works fine at that point.
>>> 
>>> 2) Task finished messages get lost.  When this message gets sent, we dont' 
>>> know it actually gets there:
>>> 
>>> https://github.com/apache/incubator-spark/blob/scala-2.10/core/src/main/scala/org/apache/spark/executor/StandaloneExecutorBackend.scala#L90
>>> 
>>> (this is so incredible, I feel I must be overlooking something -- but there 
>>> is no ack somewhere else that I'm overlooking, is there??)  So, after the 
>>> patch, spark wasn't hanging b/c of the unhandled DisassociatedEvent.  It 
>>> hangs b/c the executor has sent some taskFinished messages that never get 
>>> received by the driver.  So the driver is waiting for some tasks to finish, 
>>> but the executors think they are all done.
>>> 
>>> I'm gonna add the reliable proxy pattern for this particular interaction 
>>> and see if its fixes the problem
>>> http://doc.akka.io/docs/akka/2.2.3/contrib/reliable-proxy.html#introducing-the-reliable-proxy
>>> 
>>> imran
>> 
>> 
>> 
>> -- 
>> It's just about how deep your longing is!
> 
> 
> 
> 
> -- 
> It's just about how deep your longing is!

Reply via email to