Github user squito commented on the pull request:
https://github.com/apache/spark/pull/9214#issuecomment-153964949
Hi @JoshRosen
> What's worse, I think that this locking scheme would have to coordinate
across processes, since we'd need to make sure that the external shuffle
service acquires the proper read locks.
> I've noticed that this current patch does not employ any locks for
reading, but I don't think that's currently a problem:
> * The only case where we would need a lock is to prevent the case where
we read the sort-shuffle index file and then have the data file replaced by a
concurrent write.
> * Since sort-shuffle only creates one data file, that file will never be
overwritten once created.
>
> Does that logic sound right?
Hmm, perhaps, I'd like to be totally clear: (a) assuming that there is no
disk error which leads to a file going completely missing, then no files ever
get replaced (so the data file can't get replaced after you've opened the index
file for reading). (b) if you do consider vanishing files -- then it is
possible for the data file to get overwritten if you lose your index file or
your mapstatus file. So the worst case scenario is (i) reader open the index
file, read the offset (ii) index file vanishes (iii) conflicting attempts
finishes, notices the missing index file, writes an index file and overwrites
the data file (iv) reader opens the data file, but it reads the wrong location.
And on windows, you'll have a problem if the conflicting attempt finishes any
time the index or data file is open for reading, a much wider window.
With non-deterministic data, there is another more subtle issue -- even if
the second attempt commits its output in between any downstream reads, you can
have the downstream stage see a mixture of output for one task. Eg. stage A
has two attempts for task 1 (A.1.1 & A.1.2). In stage B, task 1 reads A.1.1,
and task 2 reads A.1.2. Might not seem like a big deal, but I doubt it is what
a user would expect, it certainly doesn't fit my mental model I started with as
a user, and there may be cases where it matters. You might have some random
data generating process, then some transformations which split the data up by
keys -- but where you expect some relationship to hold between your keys.
Maybe some paired sampling process or something. If you read output from
different attempts, that relationship could be violated. (Though as I'm
writing this I'm realizing that *none* of the proposed solutions would help
with this.)
> Concurrent writes can occur in a few ways:
> * They might occur if we cancelled / killed a task and then launched a
new attempt of the same task. This could happen because task cancellation is
asynchronous.
actually, we do *not* currently kill running tasks. See
(SPARK-2666)[https://issues.apache.org/jira/browse/SPARK-2666]. At one point I
think there was some discussion about intentionally not canceling tasks, since
they might do useful work, but I think now there is consensus they should be
cancelled, it just hasn't been done. I suppose since its just for efficiency,
not correctness, it hasn't been focused on. It *should* be easy, but scheduler
changes are always more painful than they seem ...
Adding task cancellation would certainly make the issue here more rare, but
it could still occur.
> * They used to be able to occur if there were multiple concurrent
attempts for the same stage, but I believe that this cause has been ruled out
by scheduler changes.
not exactly. The scheduler prevents concurrent *non-zombie* attempts for
the same stage. But since tasks aren't cancelled on zombification, you've
still got concurrent attempts, with one non-zombie and an unlimited number of
zombie attempts.
> In sort-based shuffle, the entire map output is stored in a single file,
so an individual map output can't be "partially lost."
Well, you've still got the index and data file, and one is useless without
the other, so I think the output *can* be partially lost. (and with this
change, there would be a mapstatus file as well.)
> I have an alternative proposal:
> ...
> * Once the master has marked an executor as dead, prevent that executor
from re-registering: the executor should kill itself instead.
This sounds like a *big* change in behavior to me. I agree that allowing
executors to re-register does add complexity, and it certainly has confused me
as a user in the past. But unless there is no way around it, it seems like the
type of change we'd want to make configurable till we understood the full
ramifications. And in the meantime, we'd still want to get correctness. Eg.,
I think another way you can lose an executor now is if it gets overwhelmed
serving too many shuffle requests -- if one of those requests times out, you
get a fetch failure, executor gets removed. Of course, killing the executor
shouldn't hurt correctness, I'm just trying to point out that there might be
more cases the user needs to deal with for tuning things after that kind of
change. Also, removing an executor can actually have a *huge* effect on
performance -- if you run some big 5 stage pipeline for 10 hours, and you there
is some transient failure in the 10th hour, you will lose some shuffle ou
tput for each of your 5 stages and it may result in a long delay. Even if
there aren't many tasks to execute, you'll still need to run the 5 stages
serially.
> By the way: this argument also uncovers a bug in our logic for deciding
when to enable the OutputCommitCoordinator: due to the ability for tasks to
keep running on "failed/dead" executors, jobs which do not use speculation are
still vulnerable to output commit races in certain rare failure cases (I'll
file a JIRA to fix this).
yikes, good call. Especially since we don't actually cancel tasks in
zombie stage attempts, this might not be that rare.
> would the proposal that I outlined above be a sufficient fix for
SPARK-8029?
yes, I do think it would be sufficient. I think you'd want to include task
cancellation, SPARK-2666, if you were to kill an executor on duplicate
attempts, otherwise it would be far too common.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]