We can set timeouts high enough ! same as connection timeout that we
already set.


On Wed, Nov 13, 2013 at 11:37 PM, Matei Zaharia <[email protected]>wrote:

> Hey Prashant, do messages still get lost while we’re dissociated? Or can
> you set the timeouts high enough to proven that?
>
> Matei
>
> On Nov 13, 2013, at 12:39 AM, Prashant Sharma <[email protected]>
> wrote:
>
> We may no longer need to track disassociation and IMHO use the *improved*
> feature in akka 2.2.x called remote death watch. Which lets us acknowledge
> a remote death both in case of a natural demise and accidental deaths. This
> was not the case with remote death watch in previous akka releases. Please
> take a closer look at the patch at
> https://github.com/apache/incubator-spark/pull/163 and let us know.
>
> This patch does not make disassociation disappear, they are added to akka
> as such but gives us sufficient knobs to tune things as to when they occur.
> Don't forget to tune those extra properties apart from other timeouts.
>
>
>
>
>
> On Sat, Nov 2, 2013 at 11:08 AM, Matei Zaharia <[email protected]>wrote:
>
>> Prashant, the problem seems to be that messages sent while we’re
>> disassociated are lost. I think we’d have to just prevent disassociation
>> altogether, or replace all remote actor refs with the reliable proxies
>> (which sounds painful).
>>
>> Matei
>>
>> On Nov 1, 2013, at 7:53 PM, Prashant Sharma <[email protected]> wrote:
>>
>> Hey Matei and Imran,
>>
>> I think may be we can solve the problem without downgrading to 2.1.0 may
>> by capturing dissociation and then setting a timeout if it associates again
>> we keep moving else we shutdown the executor. This timeout can ofcourse be
>> configurable.
>>
>> Thoughts ?
>>
>>
>> On Sat, Nov 2, 2013 at 3:29 AM, Matei Zaharia <[email protected]>wrote:
>>
>>> Hey Imran,
>>>
>>> Good to know that Akka 2.1 handles this — that at least will give us a
>>> start.
>>>
>>> In the old code, executors certainly did get flagged as “down”
>>> occasionally, but that was due to a timeout we controlled (we keep sending
>>> heartbeats back and forth to track them). The timeout used to be smaller
>>> and usually the reason to exceed it was GC. However, if Akka 2.2 can
>>> sometimes drop the connections itself, this is a problem and we either have
>>> to use the reliable proxies for everything or see if we can configure it
>>> otherwise. Anyway, we’ll definitely look into it.
>>>
>>> Matei
>>>
>>> On Nov 1, 2013, at 1:09 PM, Imran Rashid <[email protected]> wrote:
>>>
>>> I downgraded spark to akka 2.1.0, and everything seems to work now.  I'm
>>> going to run my tests a few more times , but I'd really have expected to
>>> see a failure by now w/ the 2.2.3 version.
>>>
>>> I'll submit a patch shortly (need to fix some compile errors in
>>> streaming still).
>>>
>>> Matei -- I think I realize now that when you were talking about the
>>> expectation of a tcp connection staying alive, you were explaining why this
>>> is *not* a bug in the current release.  You wouldn't end up in a situation
>>> where the executor thinks it finished the task, but the driver doesn't know
>>> about it, b/c if the connection dies, the executor wil get restarted.  That
>>> makes sense.  But, it seems like if we upgrade to akka 2.2.x, a lot of
>>> things change.  I was probably wrong about seeing that problem in previous
>>> releases -- it was just a vague recollection, which fit my current
>>> theories, so I jumped to conclusions.
>>>
>>> thanks everyone
>>>
>>>
>>>
>>> On Fri, Nov 1, 2013 at 9:27 AM, Imran Rashid <[email protected]>wrote:
>>>
>>>> thanks everyone for all of the input.
>>>>
>>>> Matei: makes a lot more sense with your explanation of spark's expected
>>>> behavior of tcp, I can see why this makes sense now.  But, to show my total
>>>> ignorance here, I'm wondering that when the connection does break, are you
>>>> sure all of your messages that you thought you sent before the break were
>>>> received?  I'm guessing that you don't.  Which is fine, if the response to
>>>> that is to have the executor just die completely, and restart.  that was
>>>> the behavior I was initially observing with the code on the 2.10 branch,
>>>> where the executor handles a DisassociatedEvent explicitly, and dies.
>>>>
>>>> But -- is that the behavior we want?  do we want it to be robust to tcp
>>>> connections breaking, without having to completely restart the executor?
>>>> you might say that dying & restarting will lead to correct behavior, even
>>>> if its inefficient.  But sometimes, I've seen restarts so frequently that
>>>> no progress is made.
>>>>
>>>> I don't see why this changed w/ the different versions of akka -- I
>>>> don't see any relevant configuration settings that would change how
>>>> "strongly" tcp tries to keep the connection alive, but I may be missing
>>>> something.  But it does seem like the netty configuration options have
>>>> changed completely between the two versions:
>>>>
>>>> http://doc.akka.io/docs/akka/2.2.3/scala/remoting.html#Remote_Configuration
>>>> vs
>>>> http://doc.akka.io/docs/akka/2.0.5/scala/remoting.html
>>>>
>>>> btw, akka 2.1.0 also has been built for scala 2.10:
>>>>
>>>> http://search.maven.org/#artifactdetails|com.typesafe.akka|akka-remote_2.10|2.1.0|bundle
>>>> and its netty configuration is closer to 2.0.5:
>>>> http://doc.akka.io/docs/akka/2.1.0/scala/remoting.html
>>>>
>>>> perhaps someone more knowledge then me about netty & tcp can look
>>>> through the changes and decide what the right changes are.
>>>>
>>>> Prashant said:
>>>> >Before we conclude something about reliable messaging, I want you to
>>>> for once consider other possibilities like >actual network reconnection and
>>>> may be a GC pause ? Try connecting something like jconsole (or alike ) and
>>>> >see what happens on the driver and executor.
>>>> >
>>>> >My doubt are since we are using standalone mode where even master and
>>>> worker are also actors then if we see >a weird behaviour on the executor
>>>> and driver then Why not on master and worker too ? They should also break
>>>> >away from each other. For this reason, I am doubting our conclusions and
>>>> may be if we narrow down the >problem first before we conclude something.
>>>> It is a regression in akka 2.2.3 it uses more memory than it used to >be in
>>>> 2.1.x.
>>>> >See https://github.com/akka/akka/issues/1810
>>>>
>>>>
>>>> Well, there could easily be the same problem with dropped connections
>>>> between master & worker -- they just communicate so little, it doesn't
>>>> really matter.  The odds that a message gets dropped between them is very
>>>> low, only because there are barely any messages.
>>>>
>>>> I completely agree that the problem could be because of a contention,
>>>> or gc pause, etc.  In fact, I'm only giving spark 24 out of 32 cores
>>>> available on each box, and 90g out of 125g memory.  I've looked at gc a
>>>> little with jstat, and I did see some gc pauses but nothing ridiculous.
>>>>
>>>> But, I think the question remains.  Suppose it is gc pauses, etc. that
>>>> cause the disassociation events; what do we do to fix it?  How can we
>>>> diagnose the problem, and figure out which of the configuration variables
>>>> to tune?  clearly, there *will be* long gc pauses, and the networking layer
>>>> needs to be able to deal with them.
>>>>
>>>> still I understand your desire to see if that might be the cause of the
>>>> problem in this particular case, so I will dig a little more.
>>>>
>>>>
>>>> (btw, should I move this thread to the dev list now?  it is getting
>>>> into the nitty-gritty of implementation ...)
>>>>
>>>> On Fri, Nov 1, 2013 at 1:15 AM, Matei Zaharia 
>>>> <[email protected]>wrote:
>>>>
>>>>> Yes, so far they’ve been built on that assumption — not that Akka
>>>>> would *guarantee* delivery in that as soon as the send() call returns you
>>>>> know it’s delivered, but that Akka would act the same way as a TCP socket,
>>>>> allowing you to send a stream of messages in order and hear when the
>>>>> connection breaks. Maybe that isn’t what they want to provide, but I'd 
>>>>> find
>>>>> it weird, because it’s very easy to write a server with this property.
>>>>>
>>>>> Matei
>>>>>
>>>>> On Oct 31, 2013, at 9:58 PM, Sriram Ramachandrasekaran <
>>>>> [email protected]> wrote:
>>>>>
>>>>> Sorry if I my understanding is wrong. May be, for this particular case
>>>>> it might be something to do with the load/network, but, in general, are 
>>>>> you
>>>>> saying that, we build these communication channels(block manager
>>>>> communication, task events communication, etc) assuming akka would take
>>>>> care of it? I somehow feel that, it's being overly optimistic. Correct me
>>>>> if I am wrong.
>>>>>
>>>>>
>>>>>
>>>>> On Fri, Nov 1, 2013 at 10:08 AM, Matei Zaharia <
>>>>> [email protected]> wrote:
>>>>>
>>>>>> It’s true that Akka’s delivery guarantees are in general
>>>>>> at-most-once, but if you look at the text there it says that they differ 
>>>>>> by
>>>>>> transport. In the previous version, I’m quite sure that except maybe in
>>>>>> very rare circumstances or cases where we had a bug, Akka’s remote layer
>>>>>> always kept connections up between each pair of hosts. So the guarantee 
>>>>>> was
>>>>>> that as long as you haven’t received a “disconnected” event, your 
>>>>>> messages
>>>>>> are being delivered, though of course when you do receive that event you
>>>>>> don’t know which messages have really made it through unless you acked
>>>>>> them. But that didn’t matter for our use case — from our point of view an
>>>>>> executor was either up or down.
>>>>>>
>>>>>> For this reason I still think it should be possible to configure Akka
>>>>>> to do the same on 2.2. Most likely some timeouts just got lower. With 
>>>>>> large
>>>>>> heaps you can easily get a GC pause of 60 seconds, so these timeouts 
>>>>>> should
>>>>>> be in the minutes.
>>>>>>
>>>>>> If for some reason this isn’t the case, then we have a bigger problem
>>>>>> — there are *lots* of messages beyond task-finished that need to be sent
>>>>>> reliably, including things like block manager events (a block was added /
>>>>>> removed on this node) and commands to tell the block manager to drop 
>>>>>> data.
>>>>>> It would be silly to implement acks at the application level for all 
>>>>>> these.
>>>>>> But I doubt this is the case. Prashant’s observation that the standalone
>>>>>> cluster manager stayed up is a further sign that this might be due to GC.
>>>>>>
>>>>>> Matei
>>>>>>
>>>>>> On Oct 31, 2013, at 9:11 PM, Sriram Ramachandrasekaran <
>>>>>> [email protected]> wrote:
>>>>>>
>>>>>> Hi Imran,
>>>>>> Just to add, we've noticed dis-associations in a couple projects that
>>>>>> we built(using akka 2.2.x not spark). We went to some details to find out
>>>>>> what was happening. As Matei, suggested, Akka keeps the TCP connection 
>>>>>> open
>>>>>> and uses that to talk to peers. We noticed that in our case, initially, 
>>>>>> we
>>>>>> were seeing dis-associations generally at the end of keep-alive duration.
>>>>>> So, when the keep-alive duration ends, at the TCP layer, a keep-alive 
>>>>>> probe
>>>>>> gets sent to inform the peer on the other side that the connection is 
>>>>>> still
>>>>>> alive/valid. For some reason, the probe dint renew the keep-alive
>>>>>> connection and we saw a lot of dis-associations during that time. Later, 
>>>>>> we
>>>>>> realized this was not a pattern either. This 
>>>>>> thread<https://groups.google.com/forum/#!msg/akka-user/RYxaPl_nby4/1USHDFIRgOkJ>contains
>>>>>>  the full history of our discussions with the Akka team. It's still
>>>>>> open and unclear as to what was causing it for our case.
>>>>>> We tried tweaking various settings of akka(wrt heartbeats, failure
>>>>>> detector, even plugged-in our own failure detector with no effect).
>>>>>>
>>>>>> Imran - Just to clarify your point on message delivery - akka's
>>>>>> message delivery policy is at-most-once. However, there's no guarantee 
>>>>>> for
>>>>>> a message to be delivered to a peer. The documentation clearly explains
>>>>>> that.
>>>>>> http://doc.akka.io/docs/akka/2.0.2/general/message-send-semantics.html. 
>>>>>> It's
>>>>>> the responsibility of the application developer to handle cases where
>>>>>> message is suspected to be not have been delivered.
>>>>>>
>>>>>> I hope this helps.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Fri, Nov 1, 2013 at 8:35 AM, Imran Rashid <[email protected]>wrote:
>>>>>>
>>>>>>>
>>>>>>> unfortunately that change wasn't the silver bullet I was hoping
>>>>>>> for.  Even with
>>>>>>> 1) ignoring DisassociatedEvent
>>>>>>> 2) executor uses ReliableProxy to send messages back to driver
>>>>>>> 3) turn up akka.remote.watch-failure-detector.threshold=12
>>>>>>>
>>>>>>>
>>>>>>> there is a lot of weird behavior.  First, there are a few
>>>>>>> DisassociatedEvents, but some that are followed by AssociatedEvents, so
>>>>>>> that seems ok.  But sometimes the re-associations are immediately 
>>>>>>> followed
>>>>>>> by this:
>>>>>>>
>>>>>>> 13/10/31 18:51:10 INFO executor.StandaloneExecutorBackend: got
>>>>>>> lifecycleevent: AssociationError 
>>>>>>> [akka.tcp://sparkExecutor@<executor>:41441]
>>>>>>> -> [akka.tcp://spark@<driver>:41321]: Error [Invalid address:
>>>>>>> akka.tcp://spark@<driver>:41321] [
>>>>>>> akka.remote.InvalidAssociation: Invalid address:
>>>>>>> akka.tcp://spark@<driver>:41321
>>>>>>> Caused by:
>>>>>>> akka.remote.transport.Transport$InvalidAssociationException: The remote
>>>>>>> system has quarantined this system. No further associations to the 
>>>>>>> remote
>>>>>>> system are possible until this system is restarted.
>>>>>>> ]
>>>>>>>
>>>>>>> On the driver, there are messages like:
>>>>>>>
>>>>>>> [INFO] [10/31/2013 18:51:07.838]
>>>>>>> [spark-akka.actor.default-dispatcher-3] [Remoting] Address [
>>>>>>> akka.tcp://sparkExecutor@<executor>:46123] is now quarantined, all
>>>>>>> messages to this address will be delivered to dead letters.
>>>>>>> [WARN] [10/31/2013 18:51:10.845]
>>>>>>> [spark-akka.actor.default-dispatcher-20] 
>>>>>>> [akka://spark/system/remote-watcher]
>>>>>>> Detected unreachable: [akka.tcp://sparkExecutor@<executor>:41441]
>>>>>>>
>>>>>>>
>>>>>>> and when the driver does decide that the executor has been
>>>>>>> terminated, it removes the executor, but doesn't start another one.
>>>>>>>
>>>>>>> there are a ton of messages also about messages to the block manager
>>>>>>> master ... I'm wondering if there are other parts of the system that 
>>>>>>> need
>>>>>>> to use a reliable proxy (or some sort of acknowledgement).
>>>>>>>
>>>>>>> I really don't think this was working properly even w/ previous
>>>>>>> versions of spark / akka.  I'm still learning about akka, but I think 
>>>>>>> you
>>>>>>> always need an ack to be confident w/ remote communicate.  Perhaps the 
>>>>>>> old
>>>>>>> version of akka just had more robust defaults or something, but I bet it
>>>>>>> could still have the same problems.  Even before, I have seen the driver
>>>>>>> thinking there were running tasks, but nothing happening on any 
>>>>>>> executor --
>>>>>>> it was just rare enough (and hard to reproduce) that I never bothered
>>>>>>> looking into it more.
>>>>>>>
>>>>>>> I will keep digging ...
>>>>>>>
>>>>>>> On Thu, Oct 31, 2013 at 4:36 PM, Matei Zaharia <
>>>>>>> [email protected]> wrote:
>>>>>>>
>>>>>>>> BTW the problem might be the Akka failure detector settings that
>>>>>>>> seem new in 2.2:
>>>>>>>> http://doc.akka.io/docs/akka/2.2.3/scala/remoting.html
>>>>>>>>
>>>>>>>> Their timeouts seem pretty aggressive by default — around 10
>>>>>>>> seconds. This can easily be too little if you have large garbage
>>>>>>>> collections. We should make sure they are higher than our own node 
>>>>>>>> failure
>>>>>>>> detection timeouts.
>>>>>>>>
>>>>>>>> Matei
>>>>>>>>
>>>>>>>> On Oct 31, 2013, at 1:33 PM, Imran Rashid <[email protected]>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>> pretty sure I found the problem -- two problems actually.  And I
>>>>>>>> think one of them has been a general lurking problem w/ spark for a 
>>>>>>>> while.
>>>>>>>>
>>>>>>>> 1)  we should ignore disassociation events, as you suggested
>>>>>>>> earlier.  They seem to just indicate a temporary problem, and can 
>>>>>>>> generally
>>>>>>>> be ignored.  I've found that they're regularly followed by
>>>>>>>> AssociatedEvents, and it seems communication really works fine at that
>>>>>>>> point.
>>>>>>>>
>>>>>>>> 2) Task finished messages get lost.  When this message gets sent,
>>>>>>>> we dont' know it actually gets there:
>>>>>>>>
>>>>>>>>
>>>>>>>> https://github.com/apache/incubator-spark/blob/scala-2.10/core/src/main/scala/org/apache/spark/executor/StandaloneExecutorBackend.scala#L90
>>>>>>>>
>>>>>>>> (this is so incredible, I feel I must be overlooking something --
>>>>>>>> but there is no ack somewhere else that I'm overlooking, is there??)  
>>>>>>>> So,
>>>>>>>> after the patch, spark wasn't hanging b/c of the unhandled
>>>>>>>> DisassociatedEvent.  It hangs b/c the executor has sent some 
>>>>>>>> taskFinished
>>>>>>>> messages that never get received by the driver.  So the driver is 
>>>>>>>> waiting
>>>>>>>> for some tasks to finish, but the executors think they are all done.
>>>>>>>>
>>>>>>>> I'm gonna add the reliable proxy pattern for this particular
>>>>>>>> interaction and see if its fixes the problem
>>>>>>>>
>>>>>>>> http://doc.akka.io/docs/akka/2.2.3/contrib/reliable-proxy.html#introducing-the-reliable-proxy
>>>>>>>>
>>>>>>>> imran
>>>>>>>>
>>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> It's just about how deep your longing is!
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> It's just about how deep your longing is!
>>>>>
>>>>>
>>>
>>>
>>
>>
>> --
>> s
>>
>>
>>
>
>
> --
> s
>
>
>


-- 
s

Reply via email to