Github user mridulm commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2019#discussion_r16632362
  
    --- Diff: core/src/main/scala/org/apache/spark/network/Connection.scala ---
    @@ -263,14 +282,20 @@ class SendingConnection(val address: 
InetSocketAddress, selector_ : Selector,
     
       val DEFAULT_INTEREST = SelectionKey.OP_READ
     
    +  var alreadyReading = false
    +
       override def registerInterest() {
         // Registering read too - does not really help in most cases, but for 
some
         // it does - so let us keep it for now.
    -    changeConnectionKeyInterest(SelectionKey.OP_WRITE | DEFAULT_INTEREST)
    +    changeConnectionKeyInterest(
    +      SelectionKey.OP_WRITE | (if (!alreadyReading) {
    +        alreadyReading = true
    +        DEFAULT_INTEREST
    +      } else { 0 }))
    --- End diff --
    
    Please keep following in mind while trying to find a solution :
    
    1) All invocations of register for a write connection will have OP_READ set 
(so there wont be a case where OP_READ is not set).
    OP_WRITE may or may not be set based on whether we have outstanding data to 
write or not.
    This is to ensure the tcp stack alerts us in case remote close is detected 
(via keep alive, etc).
    
    2) Only a single thread per socket will process it at a given point of 
time, we ensure this : and marking for re-registeration happens within this 
(not actual registeration - that always happens in the selector thread).
    
    So we wont have the case of conflicting re-registeration requests : we 
ensure this.
    At worst, we can have :
    a) OP_READ (because we finished write), wakeup selector
    b) before selector thread woke up, we want to re-register with OP_WRITE | 
OP_READ again (since some other thread wanted to write data).
    We process registeration requests in order - and so (b) will take 
precedence over (a).
    
    We handle reverse case of some thread wanting to write while write is going 
on and finishes fully (resulting in (a) ) by use of resetForceReregister.
    This code path is complicated since it handles a lot of edge cases.
    
    3) No thread calls register on selector - only the selector thread can (not 
ensuring this causes deadlocks actually) : hence why we have registeration 
request queues for new and existing sockets.
    
    4) A close can happen because of explicit close by spark, close due to 
socket errors at own side, close due to network issues, close due to remote 
side.
    There is only so much we can do to distinguish these.
    We detect remote close by (1) (note, it is not gauranteed to report 
immediately - and sometimes can take prolonged time) and local close is handled 
gracefully anyway.
    
    
    Given all this, I am not sure what are the MT issues seen and the causes 
for it, it can be quite involved at times - the one main issue I see is, 
repeated invocation to close (and there can be repeated invocations as you 
rightly pointed out) seems to attempt to clean up the state repeatedly.
    This is incorrect - it should do it once and only once; repeated 
invocations are legal, but actual close implementation code should be executed 
once.
    Ofcourse, exception while executing it are fine and unrecoverable, and we 
have to live with it (like in case of socket/stream.close throwing exception).
    
    To alleviate this, I proposed the AtomicBoolean change.
    I might obviously be missing other things since it has been a while since I 
looked at these classes, so a fresh pair of eyes is definitely welcome !


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to