Github user squito commented on the pull request:

    https://github.com/apache/spark/pull/9214#issuecomment-153964949
  
    Hi @JoshRosen
    
    > What's worse, I think that this locking scheme would have to coordinate 
across processes, since we'd need to make sure that the external shuffle 
service acquires the proper read locks.
    
    > I've noticed that this current patch does not employ any locks for 
reading, but I don't think that's currently a problem:
    
    > * The only case where we would need a lock is to prevent the case where 
we read the sort-shuffle index file and then have the data file replaced by a 
concurrent write.
    > * Since sort-shuffle only creates one data file, that file will never be 
overwritten once created.
    > 
    > Does that logic sound right?
    
    Hmm, perhaps, I'd like to be totally clear:  (a) assuming that there is no 
disk error which leads to a file going completely missing, then no files ever 
get replaced (so the data file can't get replaced after you've opened the index 
file for reading).  (b) if you do consider vanishing files -- then it is 
possible for the data file to get overwritten if you lose your index file or 
your mapstatus file.  So the worst case scenario is (i) reader open the index 
file, read the offset (ii) index file vanishes (iii) conflicting attempts 
finishes, notices the missing index file, writes an index file and overwrites 
the data file (iv) reader opens the data file, but it reads the wrong location. 
 And on windows, you'll have a problem if the conflicting attempt finishes any 
time the index or data file is open for reading, a much wider window.
    
    With non-deterministic data, there is another more subtle issue -- even if 
the second attempt commits its output in between any downstream reads, you can 
have the downstream stage see a mixture of output for one task.  Eg. stage A 
has two attempts for task 1 (A.1.1 & A.1.2).  In stage B, task 1 reads A.1.1, 
and task 2 reads A.1.2.  Might not seem like a big deal, but I doubt it is what 
a user would expect, it certainly doesn't fit my mental model I started with as 
a user, and there may be cases where it matters.  You might have some random 
data generating process, then some transformations which split the data up by 
keys -- but where you expect some relationship to hold between your keys.  
Maybe some paired sampling process or something.  If you read output from 
different attempts, that relationship could be violated.  (Though as I'm 
writing this I'm realizing that *none* of the proposed solutions would help 
with this.)
    
    > Concurrent writes can occur in a few ways:
    > * They might occur if we cancelled / killed a task and then launched a 
new attempt of the same task. This could happen because task cancellation is 
asynchronous.
    
    actually, we do *not* currently kill running tasks.  See 
(SPARK-2666)[https://issues.apache.org/jira/browse/SPARK-2666].  At one point I 
think there was some discussion about intentionally not canceling tasks, since 
they might do useful work, but I think now there is consensus they should be 
cancelled, it just hasn't been done.  I suppose since its just for efficiency, 
not correctness, it hasn't been focused on.  It *should* be easy, but scheduler 
changes are always more painful than they seem ...
    
    Adding task cancellation would certainly make the issue here more rare, but 
it could still occur.
    
    > * They used to be able to occur if there were multiple concurrent 
attempts for the same stage, but I believe that this cause has been ruled out 
by scheduler changes.
    
    not exactly.  The scheduler prevents concurrent *non-zombie* attempts for 
the same stage.  But since tasks aren't cancelled on zombification, you've 
still got concurrent attempts, with one non-zombie and an unlimited number of 
zombie attempts.
    
    > In sort-based shuffle, the entire map output is stored in a single file, 
so an individual map output can't be "partially lost."
    
    Well, you've still got the index and data file, and one is useless without 
the other, so I think the output *can* be partially lost.  (and with this 
change, there would be a mapstatus file as well.)
    
    > I have an alternative proposal:
    > ...
    > * Once the master has marked an executor as dead, prevent that executor 
from re-registering: the executor should kill itself instead.
    
    This sounds like a *big* change in behavior to me.  I agree that allowing 
executors to re-register does add complexity, and it certainly has confused me 
as a user in the past.  But unless there is no way around it, it seems like the 
type of change we'd want to make configurable till we understood the full 
ramifications.  And in the meantime, we'd still want to get correctness.  Eg., 
I think another way you can lose an executor now is if it gets overwhelmed 
serving too many shuffle requests -- if one of those requests times out, you 
get a fetch failure, executor gets removed.  Of course, killing the executor 
shouldn't hurt correctness, I'm just trying to point out that there might be 
more cases the user needs to deal with for tuning things after that kind of 
change.  Also, removing an executor can actually have a *huge* effect on 
performance -- if you run some big 5 stage pipeline for 10 hours, and you there 
is some transient failure in the 10th hour, you will lose some shuffle ou
 tput for each of your 5 stages and it may result in a long delay.  Even if 
there aren't many tasks to execute, you'll still need to run the 5 stages 
serially.
    
    > By the way: this argument also uncovers a bug in our logic for deciding 
when to enable the OutputCommitCoordinator: due to the ability for tasks to 
keep running on "failed/dead" executors, jobs which do not use speculation are 
still vulnerable to output commit races in certain rare failure cases (I'll 
file a JIRA to fix this).
    
    yikes, good call.  Especially since we don't actually cancel tasks in 
zombie stage attempts, this might not be that rare.
    
    > would the proposal that I outlined above be a sufficient fix for 
SPARK-8029?
    
    yes, I do think it would be sufficient.  I think you'd want to include task 
cancellation, SPARK-2666, if you were to kill an executor on duplicate 
attempts, otherwise it would be far too common.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to