Github user squito commented on the issue:

    https://github.com/apache/spark/pull/21698
  
    jumping in the middle on this discussion -- everybody has raised some great 
points.  
    
    1) My first takeaway from this is just how hard it can be to reason about 
this, because spark's exact semantics aren't really that clear.  
`repartition()` has been in spark for a while and this issue went ignored; 
we're all experienced with spark and still have to think about it very 
carefully; imagine the difficulty for the average spark user!  I propose we add 
some documentation describing the actual fault-tolerant semantics, and what 
users can and can't do.  In addition to the ordering issue we're discussing 
here, another case I've seen users get wrong is with random data generators.
    
    More specific points:
    
    2) I agree with @mridulm about `zip` etc., not just repartition.  It took 
quite a while for me to see this.  The point isn't that you need a specific 
ordering of the output, its that if you zip [a, b] with [1,2], your output 
should never be [(a,1), (a,2)], which I think is possible in the current 
situation (though probably needs a significantly more complicated example).
    
    3) I do think that `repartition` is probably the most serious case users 
will hit this, and I would still be open to consider a special case fix.
    
    4) I see some discussion about making shuffles deterministic, but it proved 
to be very difficult.  Is there a prior discussion on this you can point me to? 
 Is it that even if you used fetch-to-disk and had the shuffle-fetch side read 
the map-output in a set order, you'd still have random variations in spills?
    
    5) this might be a dumb idea -- since we only need to do this sort on RDDs 
post shuffle, we know the records have been serialized to bytes, and you can 
always compare bytes.  Has this been considered at all?  I realize it would be 
a mess in the current code, as those serialized bytes from the shuffle aren't 
exposed where you'd want to put this, but I want to consider whether it would 
even be correct.  and an easier to implement approach, though horribly 
inefficient, would be to just serialize again.
    
    6) The proposed solution gives you correctness at the price of a very 
expensive recomputation.  Another option would be to just fail-fast -- on a 
fetch-failure in repartition, fail the entire job.  You could even expose a 
configuration to allow both your current approach or this.
    
    7) We could add logic to detect whether even an order-dependent operation 
was safe to retry -- eg. repartition just after reading from hdfs or after a 
checkpoint can be done as it is now.  Each stage would need to know this based 
on extra properties of all the RDDs it was defined from.   (I'm not sure this 
is what you mean, @cloud-fan by "Ah I see, then we need to change DAGScheduler 
a lot to fix it, which may not worth." -- if you meant something else, can you 
explain?)
    
    8) Honestly I consider this bug so serious I'd consider loudly warning from 
every api which suffers from this if we can't fix -- make them deprecated and 
log a warning.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to