Hey Prashant, do messages still get lost while we’re dissociated? Or can you 
set the timeouts high enough to proven that?

Matei

On Nov 13, 2013, at 12:39 AM, Prashant Sharma <[email protected]> wrote:

> We may no longer need to track disassociation and IMHO use the *improved* 
> feature in akka 2.2.x called remote death watch. Which lets us acknowledge a 
> remote death both in case of a natural demise and accidental deaths. This was 
> not the case with remote death watch in previous akka releases. Please take a 
> closer look at the patch at 
> https://github.com/apache/incubator-spark/pull/163 and let us know.
> 
> This patch does not make disassociation disappear, they are added to akka as 
> such but gives us sufficient knobs to tune things as to when they occur. 
> Don't forget to tune those extra properties apart from other timeouts. 
> 
> 
> 
> 
> 
> On Sat, Nov 2, 2013 at 11:08 AM, Matei Zaharia <[email protected]> 
> wrote:
> Prashant, the problem seems to be that messages sent while we’re 
> disassociated are lost. I think we’d have to just prevent disassociation 
> altogether, or replace all remote actor refs with the reliable proxies (which 
> sounds painful).
> 
> Matei
> 
> On Nov 1, 2013, at 7:53 PM, Prashant Sharma <[email protected]> wrote:
> 
>> Hey Matei and Imran,
>> 
>> I think may be we can solve the problem without downgrading to 2.1.0 may by 
>> capturing dissociation and then setting a timeout if it associates again we 
>> keep moving else we shutdown the executor. This timeout can ofcourse be 
>> configurable. 
>> 
>> Thoughts ?
>> 
>> 
>> On Sat, Nov 2, 2013 at 3:29 AM, Matei Zaharia <[email protected]> 
>> wrote:
>> Hey Imran,
>> 
>> Good to know that Akka 2.1 handles this — that at least will give us a start.
>> 
>> In the old code, executors certainly did get flagged as “down” occasionally, 
>> but that was due to a timeout we controlled (we keep sending heartbeats back 
>> and forth to track them). The timeout used to be smaller and usually the 
>> reason to exceed it was GC. However, if Akka 2.2 can sometimes drop the 
>> connections itself, this is a problem and we either have to use the reliable 
>> proxies for everything or see if we can configure it otherwise. Anyway, 
>> we’ll definitely look into it.
>> 
>> Matei
>> 
>> On Nov 1, 2013, at 1:09 PM, Imran Rashid <[email protected]> wrote:
>> 
>>> I downgraded spark to akka 2.1.0, and everything seems to work now.  I'm 
>>> going to run my tests a few more times , but I'd really have expected to 
>>> see a failure by now w/ the 2.2.3 version.
>>> 
>>> I'll submit a patch shortly (need to fix some compile errors in streaming 
>>> still).
>>> 
>>> Matei -- I think I realize now that when you were talking about the 
>>> expectation of a tcp connection staying alive, you were explaining why this 
>>> is *not* a bug in the current release.  You wouldn't end up in a situation 
>>> where the executor thinks it finished the task, but the driver doesn't know 
>>> about it, b/c if the connection dies, the executor wil get restarted.  That 
>>> makes sense.  But, it seems like if we upgrade to akka 2.2.x, a lot of 
>>> things change.  I was probably wrong about seeing that problem in previous 
>>> releases -- it was just a vague recollection, which fit my current 
>>> theories, so I jumped to conclusions.
>>> 
>>> thanks everyone
>>> 
>>> 
>>> 
>>> On Fri, Nov 1, 2013 at 9:27 AM, Imran Rashid <[email protected]> wrote:
>>> thanks everyone for all of the input.
>>> 
>>> Matei: makes a lot more sense with your explanation of spark's expected 
>>> behavior of tcp, I can see why this makes sense now.  But, to show my total 
>>> ignorance here, I'm wondering that when the connection does break, are you 
>>> sure all of your messages that you thought you sent before the break were 
>>> received?  I'm guessing that you don't.  Which is fine, if the response to 
>>> that is to have the executor just die completely, and restart.  that was 
>>> the behavior I was initially observing with the code on the 2.10 branch, 
>>> where the executor handles a DisassociatedEvent explicitly, and dies.
>>> 
>>> But -- is that the behavior we want?  do we want it to be robust to tcp 
>>> connections breaking, without having to completely restart the executor?  
>>> you might say that dying & restarting will lead to correct behavior, even 
>>> if its inefficient.  But sometimes, I've seen restarts so frequently that 
>>> no progress is made.
>>> 
>>> I don't see why this changed w/ the different versions of akka -- I don't 
>>> see any relevant configuration settings that would change how "strongly" 
>>> tcp tries to keep the connection alive, but I may be missing something.  
>>> But it does seem like the netty configuration options have changed 
>>> completely between the two versions:
>>> http://doc.akka.io/docs/akka/2.2.3/scala/remoting.html#Remote_Configuration
>>> vs
>>> http://doc.akka.io/docs/akka/2.0.5/scala/remoting.html
>>> 
>>> btw, akka 2.1.0 also has been built for scala 2.10:
>>> http://search.maven.org/#artifactdetails|com.typesafe.akka|akka-remote_2.10|2.1.0|bundle
>>> and its netty configuration is closer to 2.0.5:
>>> http://doc.akka.io/docs/akka/2.1.0/scala/remoting.html
>>> 
>>> perhaps someone more knowledge then me about netty & tcp can look through 
>>> the changes and decide what the right changes are.
>>> 
>>> Prashant said:
>>> >Before we conclude something about reliable messaging, I want you to for 
>>> >once consider other possibilities like >actual network reconnection and 
>>> >may be a GC pause ? Try connecting something like jconsole (or alike ) and 
>>> >>see what happens on the driver and executor.
>>> >
>>> >My doubt are since we are using standalone mode where even master and 
>>> >worker are also actors then if we see >a weird behaviour on the executor 
>>> >and driver then Why not on master and worker too ? They should also break 
>>> >>away from each other. For this reason, I am doubting our conclusions and 
>>> >may be if we narrow down the >problem first before we conclude something. 
>>> >It is a regression in akka 2.2.3 it uses more memory than it used to >be 
>>> >in 2.1.x.  
>>> >See https://github.com/akka/akka/issues/1810 
>>> 
>>> 
>>> Well, there could easily be the same problem with dropped connections 
>>> between master & worker -- they just communicate so little, it doesn't 
>>> really matter.  The odds that a message gets dropped between them is very 
>>> low, only because there are barely any messages.
>>> 
>>> I completely agree that the problem could be because of a contention, or gc 
>>> pause, etc.  In fact, I'm only giving spark 24 out of 32 cores available on 
>>> each box, and 90g out of 125g memory.  I've looked at gc a little with 
>>> jstat, and I did see some gc pauses but nothing ridiculous.
>>> 
>>> But, I think the question remains.  Suppose it is gc pauses, etc. that 
>>> cause the disassociation events; what do we do to fix it?  How can we 
>>> diagnose the problem, and figure out which of the configuration variables 
>>> to tune?  clearly, there *will be* long gc pauses, and the networking layer 
>>> needs to be able to deal with them.
>>> 
>>> still I understand your desire to see if that might be the cause of the 
>>> problem in this particular case, so I will dig a little more.
>>> 
>>> 
>>> (btw, should I move this thread to the dev list now?  it is getting into 
>>> the nitty-gritty of implementation ...)
>>> 
>>> On Fri, Nov 1, 2013 at 1:15 AM, Matei Zaharia <[email protected]> 
>>> wrote:
>>> Yes, so far they’ve been built on that assumption — not that Akka would 
>>> *guarantee* delivery in that as soon as the send() call returns you know 
>>> it’s delivered, but that Akka would act the same way as a TCP socket, 
>>> allowing you to send a stream of messages in order and hear when the 
>>> connection breaks. Maybe that isn’t what they want to provide, but I'd find 
>>> it weird, because it’s very easy to write a server with this property.
>>> 
>>> Matei
>>> 
>>> On Oct 31, 2013, at 9:58 PM, Sriram Ramachandrasekaran 
>>> <[email protected]> wrote:
>>> 
>>>> Sorry if I my understanding is wrong. May be, for this particular case it 
>>>> might be something to do with the load/network, but, in general, are you 
>>>> saying that, we build these communication channels(block manager 
>>>> communication, task events communication, etc) assuming akka would take 
>>>> care of it? I somehow feel that, it's being overly optimistic. Correct me 
>>>> if I am wrong.
>>>> 
>>>> 
>>>> 
>>>> On Fri, Nov 1, 2013 at 10:08 AM, Matei Zaharia <[email protected]> 
>>>> wrote:
>>>> It’s true that Akka’s delivery guarantees are in general at-most-once, but 
>>>> if you look at the text there it says that they differ by transport. In 
>>>> the previous version, I’m quite sure that except maybe in very rare 
>>>> circumstances or cases where we had a bug, Akka’s remote layer always kept 
>>>> connections up between each pair of hosts. So the guarantee was that as 
>>>> long as you haven’t received a “disconnected” event, your messages are 
>>>> being delivered, though of course when you do receive that event you don’t 
>>>> know which messages have really made it through unless you acked them. But 
>>>> that didn’t matter for our use case — from our point of view an executor 
>>>> was either up or down.
>>>> 
>>>> For this reason I still think it should be possible to configure Akka to 
>>>> do the same on 2.2. Most likely some timeouts just got lower. With large 
>>>> heaps you can easily get a GC pause of 60 seconds, so these timeouts 
>>>> should be in the minutes.
>>>> 
>>>> If for some reason this isn’t the case, then we have a bigger problem — 
>>>> there are *lots* of messages beyond task-finished that need to be sent 
>>>> reliably, including things like block manager events (a block was added / 
>>>> removed on this node) and commands to tell the block manager to drop data. 
>>>> It would be silly to implement acks at the application level for all 
>>>> these. But I doubt this is the case. Prashant’s observation that the 
>>>> standalone cluster manager stayed up is a further sign that this might be 
>>>> due to GC.
>>>> 
>>>> Matei
>>>> 
>>>> On Oct 31, 2013, at 9:11 PM, Sriram Ramachandrasekaran 
>>>> <[email protected]> wrote:
>>>> 
>>>>> Hi Imran,
>>>>> Just to add, we've noticed dis-associations in a couple projects that we 
>>>>> built(using akka 2.2.x not spark). We went to some details to find out 
>>>>> what was happening. As Matei, suggested, Akka keeps the TCP connection 
>>>>> open and uses that to talk to peers. We noticed that in our case, 
>>>>> initially, we were seeing dis-associations generally at the end of 
>>>>> keep-alive duration. So, when the keep-alive duration ends, at the TCP 
>>>>> layer, a keep-alive probe gets sent to inform the peer on the other side 
>>>>> that the connection is still alive/valid. For some reason, the probe dint 
>>>>> renew the keep-alive connection and we saw a lot of dis-associations 
>>>>> during that time. Later, we realized this was not a pattern either. This 
>>>>> thread contains the full history of our discussions with the Akka team. 
>>>>> It's still open and unclear as to what was causing it for our case. 
>>>>> We tried tweaking various settings of akka(wrt heartbeats, failure 
>>>>> detector, even plugged-in our own failure detector with no effect).
>>>>> 
>>>>> Imran - Just to clarify your point on message delivery - akka's message 
>>>>> delivery policy is at-most-once. However, there's no guarantee for a 
>>>>> message to be delivered to a peer. The documentation clearly explains 
>>>>> that. 
>>>>> http://doc.akka.io/docs/akka/2.0.2/general/message-send-semantics.html. 
>>>>> It's the responsibility of the application developer to handle cases 
>>>>> where message is suspected to be not have been delivered. 
>>>>> 
>>>>> I hope this helps.
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> On Fri, Nov 1, 2013 at 8:35 AM, Imran Rashid <[email protected]> wrote:
>>>>> 
>>>>> unfortunately that change wasn't the silver bullet I was hoping for.  
>>>>> Even with
>>>>> 1) ignoring DisassociatedEvent
>>>>> 2) executor uses ReliableProxy to send messages back to driver
>>>>> 3) turn up akka.remote.watch-failure-detector.threshold=12
>>>>> 
>>>>> 
>>>>> there is a lot of weird behavior.  First, there are a few 
>>>>> DisassociatedEvents, but some that are followed by AssociatedEvents, so 
>>>>> that seems ok.  But sometimes the re-associations are immediately 
>>>>> followed by this:
>>>>> 
>>>>> 13/10/31 18:51:10 INFO executor.StandaloneExecutorBackend: got 
>>>>> lifecycleevent: AssociationError 
>>>>> [akka.tcp://sparkExecutor@<executor>:41441] -> 
>>>>> [akka.tcp://spark@<driver>:41321]: Error [Invalid address: 
>>>>> akka.tcp://spark@<driver>:41321] [
>>>>> akka.remote.InvalidAssociation: Invalid address: 
>>>>> akka.tcp://spark@<driver>:41321
>>>>> Caused by: akka.remote.transport.Transport$InvalidAssociationException: 
>>>>> The remote system has quarantined this system. No further associations to 
>>>>> the remote system are possible until this system is restarted.
>>>>> ]
>>>>> 
>>>>> On the driver, there are messages like:
>>>>> 
>>>>> [INFO] [10/31/2013 18:51:07.838] [spark-akka.actor.default-dispatcher-3] 
>>>>> [Remoting] Address [akka.tcp://sparkExecutor@<executor>:46123] is now 
>>>>> quarantined, all messages to this address will be delivered to dead 
>>>>> letters.
>>>>> [WARN] [10/31/2013 18:51:10.845] [spark-akka.actor.default-dispatcher-20] 
>>>>> [akka://spark/system/remote-watcher] Detected unreachable: 
>>>>> [akka.tcp://sparkExecutor@<executor>:41441]
>>>>> 
>>>>> 
>>>>> and when the driver does decide that the executor has been terminated, it 
>>>>> removes the executor, but doesn't start another one.
>>>>> 
>>>>> there are a ton of messages also about messages to the block manager 
>>>>> master ... I'm wondering if there are other parts of the system that need 
>>>>> to use a reliable proxy (or some sort of acknowledgement).
>>>>> 
>>>>> I really don't think this was working properly even w/ previous versions 
>>>>> of spark / akka.  I'm still learning about akka, but I think you always 
>>>>> need an ack to be confident w/ remote communicate.  Perhaps the old 
>>>>> version of akka just had more robust defaults or something, but I bet it 
>>>>> could still have the same problems.  Even before, I have seen the driver 
>>>>> thinking there were running tasks, but nothing happening on any executor 
>>>>> -- it was just rare enough (and hard to reproduce) that I never bothered 
>>>>> looking into it more.
>>>>> 
>>>>> I will keep digging ...
>>>>> 
>>>>> On Thu, Oct 31, 2013 at 4:36 PM, Matei Zaharia <[email protected]> 
>>>>> wrote:
>>>>> BTW the problem might be the Akka failure detector settings that seem new 
>>>>> in 2.2: http://doc.akka.io/docs/akka/2.2.3/scala/remoting.html
>>>>> 
>>>>> Their timeouts seem pretty aggressive by default — around 10 seconds. 
>>>>> This can easily be too little if you have large garbage collections. We 
>>>>> should make sure they are higher than our own node failure detection 
>>>>> timeouts.
>>>>> 
>>>>> Matei
>>>>> 
>>>>> On Oct 31, 2013, at 1:33 PM, Imran Rashid <[email protected]> wrote:
>>>>> 
>>>>>> pretty sure I found the problem -- two problems actually.  And I think 
>>>>>> one of them has been a general lurking problem w/ spark for a while.
>>>>>> 
>>>>>> 1)  we should ignore disassociation events, as you suggested earlier.  
>>>>>> They seem to just indicate a temporary problem, and can generally be 
>>>>>> ignored.  I've found that they're regularly followed by 
>>>>>> AssociatedEvents, and it seems communication really works fine at that 
>>>>>> point.
>>>>>> 
>>>>>> 2) Task finished messages get lost.  When this message gets sent, we 
>>>>>> dont' know it actually gets there:
>>>>>> 
>>>>>> https://github.com/apache/incubator-spark/blob/scala-2.10/core/src/main/scala/org/apache/spark/executor/StandaloneExecutorBackend.scala#L90
>>>>>> 
>>>>>> (this is so incredible, I feel I must be overlooking something -- but 
>>>>>> there is no ack somewhere else that I'm overlooking, is there??)  So, 
>>>>>> after the patch, spark wasn't hanging b/c of the unhandled 
>>>>>> DisassociatedEvent.  It hangs b/c the executor has sent some 
>>>>>> taskFinished messages that never get received by the driver.  So the 
>>>>>> driver is waiting for some tasks to finish, but the executors think they 
>>>>>> are all done.
>>>>>> 
>>>>>> I'm gonna add the reliable proxy pattern for this particular interaction 
>>>>>> and see if its fixes the problem
>>>>>> http://doc.akka.io/docs/akka/2.2.3/contrib/reliable-proxy.html#introducing-the-reliable-proxy
>>>>>> 
>>>>>> imran
>>>>> 
>>>>> 
>>>>> 
>>>>> -- 
>>>>> It's just about how deep your longing is!
>>>> 
>>>> 
>>>> 
>>>> 
>>>> -- 
>>>> It's just about how deep your longing is!
>>> 
>> 
>> 
>> 
>> 
>> -- 
>> s
> 
> 
> 
> 
> -- 
> s

Reply via email to