Github user JoshRosen commented on the pull request:

    https://github.com/apache/spark/pull/9214#issuecomment-153930717
  
    I've thought about this some more and there are a few things that I still 
find unclear. To recap my understanding:
    
    - We need to guard against both concurrent writes to the same shuffle file 
and concurrent reads and writes to the shuffle file.
    - Concurrent writes can occur in a few ways:
      - They can't / shouldn't occur via speculation, since the DAGScheduler 
will not schedule speculative tasks on the same executor as original tasks.
      - They might occur if we cancelled / killed a task and then launched a 
new attempt of the same task. This could happen because task cancellation is 
asynchronous.
      - They used to be able to occur if there were multiple concurrent 
attempts for the same stage, but I believe that this cause has been ruled out 
by scheduler changes.
    - A shuffle file read that is concurrent with a write to the same file can 
only occur if map output was lost and recomputations were triggered.
      - In sort-based shuffle, the entire map output is stored in a single 
file, so an individual map output can't be "partially lost."
      -  In hash-based shuffle, it would be possible for a portion of a map's 
output to be lost in response to a disk failure.
    
    In the discussion above, it seems like executors that re-register after 
being marked as dead are cited as one possible source of read/write concurrency 
problems, too.
    
    I have an alternative proposal:
    
    - Given that concurrent writer are not expected to happen, we should be 
able to add some strong assertions / checks inside of Executor to be sure of 
this. We can maintain a set of `(mapId, shuffleId)` pairs to track which map 
outputs are currently being computed and can kill the entire executor if we see 
a duplicate attempt being launched.
    - Since the concurrent read and write problem can only occur if map output 
is _partially_ lost, handle the loss of the shuffle files themselves by 
immediately killing the executor. The argument here is that a disk failure or 
filesystem problem means that we should just hard-stop the executor.
    - Once the master has marked an executor as dead, prevent that executor 
from re-registering: the executor should kill itself instead. There is some 
driver-side state associated with each executor that is cleared when an 
executor is marked as dead and having to reason about the re-creation of this 
state when re-registering an executor (which has its own internal state) adds 
complexity and introduces the potential for bugs.
    
      The only opposing argument that I can think of is performance concerns; 
my rebuttal:
    
        - If executor disconnect and re-registration is a super-common 
occurrence then you should increase the heartbeat timeout.
        - If increasing the heartbeat timeout is undesirable because it means 
that tasks are not rescheduled soon enough _and_ it's also the case that dead 
executors frequently return, then users should enable speculation. By the way: 
this argument also uncovers a bug in our logic for deciding when to enable the 
`OutputCommitCoordinator`: due to the ability for tasks to keep running on 
"failed/dead" executors, jobs which do not use speculation are still vulnerable 
to output commit races in certain rare failure cases (I'll file a JIRA to fix 
this).
    
    Summary: in response to rare failure modes, such as disk loss or concurrent 
writes which are never supposed to happen, we should consider hard-killing 
executors. It should always be safe to kill an executor; killing executors 
should only cause performance issues, not correctness issues, and thus we 
should be able to rely on the ability to kill an executor as a catch-all safety 
net for handling rare error scenarios or bugs.
    
    Ignoring the question of implementation complexity, would the proposal that 
I outlined above be a sufficient fix for SPARK-8029?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to