Github user squito commented on the pull request:

    https://github.com/apache/spark/pull/9214#issuecomment-150667835
  
    @JoshRosen @rxin I'm going to see if I can do a little cleanup still, but 
the functionality is there now.  However, I did realize there are some open 
questions in the logic, particularly wrt to non-deterministic data.
    
    (a) its possible for shuffle files generated in one attempt to not be 
generated in another attempt.  Eg., if the data is empty, a sort based shuffle 
may not generate any output file.  Similarly, the hash based shuffle may not 
generate a file if the output for a particular (map,reduce) pair is empty.  
Furthermore, based on the [test case for 
SPARK-4085](https://github.com/apache/spark/blob/master/core/src/test/scala/org/apache/spark/ShuffleSuite.scala#L266)
 if one shuffle output file is missing, we should consider the entire shuffle 
output missing and regenerate.  You could imagine some degenerate cases, eg., 
attempt 1 creates outputs a & b, attempt 2 creates outputs b & c, and so 
attempt 2 overwrites attempt 1, even though its not really the first attempt.
    
    (b) what should the MapStatus output of the uncommitted attempt be?  With 
deterministic data it doesn't *really* matter, at least to fix SPARK-8029.  The 
output sizes will be approximately the same and that is really good enough.  
But with non-determinstic data, its possible that attempt1 gets committed, but 
then attempt2 creates a MapStatus with some zero-sized blocks where attempt1 
had non-zero sized blocks.  The map status from attempt2 is used, so the 
shuffle-fetch side decides to completely skip reading those blocks.
    
    Some possible solutions are:
    
    1) undefined behavior on non-deterministic data
    2) Shuffle writers always create the same output files, even if they are 
zero-length.  This could really hurt the performance of the HashShuffle with 
lots of partitions, but then again, its already unusable with lots of 
partitions so it probably doesn't matter.  I assume the effect on sort shuffle 
is negligible.  And then add some new marker MapStatus which gets returned when 
the shuffle output is not committed.
    3) maybe the test for SPARK-4085 could be changed, and then we can change 
`ShuffleOutputCoordinator` so that it considers the destination pre-existing if 
*any* of the files are already there.  You still can have some weird cases 
where attempt 1 creates outputs a & b, and attempt 2 creates c & d, but it 
doesn't matter if you commit files c & d also.  You'd need the same marker 
MapStatus for the non-committed attempts.
    
    I assume (1) is not an option.  I'll implement (2) but wanted to see if you 
have any thoughts.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to