Github user squito commented on the pull request:
https://github.com/apache/spark/pull/9214#issuecomment-150667835
@JoshRosen @rxin I'm going to see if I can do a little cleanup still, but
the functionality is there now. However, I did realize there are some open
questions in the logic, particularly wrt to non-deterministic data.
(a) its possible for shuffle files generated in one attempt to not be
generated in another attempt. Eg., if the data is empty, a sort based shuffle
may not generate any output file. Similarly, the hash based shuffle may not
generate a file if the output for a particular (map,reduce) pair is empty.
Furthermore, based on the [test case for
SPARK-4085](https://github.com/apache/spark/blob/master/core/src/test/scala/org/apache/spark/ShuffleSuite.scala#L266)
if one shuffle output file is missing, we should consider the entire shuffle
output missing and regenerate. You could imagine some degenerate cases, eg.,
attempt 1 creates outputs a & b, attempt 2 creates outputs b & c, and so
attempt 2 overwrites attempt 1, even though its not really the first attempt.
(b) what should the MapStatus output of the uncommitted attempt be? With
deterministic data it doesn't *really* matter, at least to fix SPARK-8029. The
output sizes will be approximately the same and that is really good enough.
But with non-determinstic data, its possible that attempt1 gets committed, but
then attempt2 creates a MapStatus with some zero-sized blocks where attempt1
had non-zero sized blocks. The map status from attempt2 is used, so the
shuffle-fetch side decides to completely skip reading those blocks.
Some possible solutions are:
1) undefined behavior on non-deterministic data
2) Shuffle writers always create the same output files, even if they are
zero-length. This could really hurt the performance of the HashShuffle with
lots of partitions, but then again, its already unusable with lots of
partitions so it probably doesn't matter. I assume the effect on sort shuffle
is negligible. And then add some new marker MapStatus which gets returned when
the shuffle output is not committed.
3) maybe the test for SPARK-4085 could be changed, and then we can change
`ShuffleOutputCoordinator` so that it considers the destination pre-existing if
*any* of the files are already there. You still can have some weird cases
where attempt 1 creates outputs a & b, and attempt 2 creates c & d, but it
doesn't matter if you commit files c & d also. You'd need the same marker
MapStatus for the non-committed attempts.
I assume (1) is not an option. I'll implement (2) but wanted to see if you
have any thoughts.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]