Github user tgravescs commented on the issue:

    https://github.com/apache/spark/pull/21698
  
    
    >  @squito @tgravescs I am probably missing something about why hash 
partitioner helps, can you please clarify ?
    > IIRC the partitioner for CoalescedRDD when shuffle is enabled is 
HashPartitioner ... the issue is the distributePartition before the shuffle 
which is order sensitive but is not deterministic since its input is not 
deterministic if it is derived from one or more shuffle output's.
    
    @mridulm sorry I wasn't real clear, I guess on the RDD side its not called 
RoundRobinPartitioner (like it is on dataframe side), but the 
distributePartition is essentially doing that would need to change to just use 
a normal hash or something that is deterministic.  Basically any operation that 
does the shuffle has to have deterministic output for choosing which reducers 
it goes to. The idea at least is what the first PR for this jira was: 
https://github.com/apache/spark/pull/20414 (disclaimer, I haven't looked at 
that in detail).  I'll spend more time going through the code to see all the 
specifics.  But like we've discussed the downside is it will not be evenly 
distributed.  So the question is if we would want that?  
    
    > After more thoughts, the problem can be generalized as
    > 1. RDD#mapPartitions and its friends can take arbitrary user functions, 
which may produce random result
    > 2. The shuffle map tasks will consume the inputs and distribute them to 
different reducers.
    > 3. When a reduce task throws FetchFailed, Spark scheduler will rerun the 
map tasks to rewrite the missing shuffle blocks and retry the reduce task. The 
already finished reduce tasks will not be rerun.
    
    Thanks @cloud-fan  for the write up. This is exactly why I brought up 
HashPartitioner (ie stop using round robin whether in partitioner or 
distributePartition function) and exactly why Pig stopped using it for its 
Union operation.  Its not just us internally doing it, the user code could do 
anything that the output is not in the same order on rerun of the Map Task.
    
    Like you said, the sort has to be done before the round robin and it has to 
be done on the entire record (not just on key for instance if you had 
key,value), and I do see this as being a possibly very expensive operation.  
But if for instance we can't change repartition to not be evenly distributed, 
it seems like one of the only options.  I haven't looked at the details about 
inserting it here either so I need to do that to understand how complicated it 
would be.
    
    If we can't come up with another solution, I would actually be ok with 
failing short term, its better then corruption.  Or perhaps we can actually 
allow user to choose the behavior, have a config for it would fail, one config 
for they don't care because they know they checkpointed or something, and one 
for doing the sort.  
    
    > I think the correct fix is: be more conservative when handling fetch 
failure and rerun more reduce > tasks. We can provide an internal API to tag a 
RDD as deterministic (very common in Spark SQL) > and then we can safely be 
optimistic when handling fetch failure.
    
    Like @jiangxb1987 said and we discussed above I don't think this will work 
when you have result tasks that could have committed output.  You can't undo a 
task commit.   
    
    Now we could do a combination of things where as long as we aren't a 
ResultTask we fail all reducers and maps to rerun. If we are running ResultTask 
you either fail entire job or fall back to the expensive Sort.
    
    Just to kind of summarize, the only solutions that I've thought of or 
mentioned by others: 
    - don't use round robin (which for repartition could break the api since it 
wouldn't be evenly distributed)
    - Sort all records before doing the round robin assignment ( has to be 
deterministic) - probably very expensive and need to investigate how hard code 
would be
    - Fail all reducers and rerun more if the we haven't started any 
ResultTasks, if ResultTask decide another solution
    - Just fail entire job on this type of failure


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to