Github user tgravescs commented on the issue:
https://github.com/apache/spark/pull/21698
> @squito @tgravescs I am probably missing something about why hash
partitioner helps, can you please clarify ?
> IIRC the partitioner for CoalescedRDD when shuffle is enabled is
HashPartitioner ... the issue is the distributePartition before the shuffle
which is order sensitive but is not deterministic since its input is not
deterministic if it is derived from one or more shuffle output's.
@mridulm sorry I wasn't real clear, I guess on the RDD side its not called
RoundRobinPartitioner (like it is on dataframe side), but the
distributePartition is essentially doing that would need to change to just use
a normal hash or something that is deterministic. Basically any operation that
does the shuffle has to have deterministic output for choosing which reducers
it goes to. The idea at least is what the first PR for this jira was:
https://github.com/apache/spark/pull/20414 (disclaimer, I haven't looked at
that in detail). I'll spend more time going through the code to see all the
specifics. But like we've discussed the downside is it will not be evenly
distributed. So the question is if we would want that?
> After more thoughts, the problem can be generalized as
> 1. RDD#mapPartitions and its friends can take arbitrary user functions,
which may produce random result
> 2. The shuffle map tasks will consume the inputs and distribute them to
different reducers.
> 3. When a reduce task throws FetchFailed, Spark scheduler will rerun the
map tasks to rewrite the missing shuffle blocks and retry the reduce task. The
already finished reduce tasks will not be rerun.
Thanks @cloud-fan for the write up. This is exactly why I brought up
HashPartitioner (ie stop using round robin whether in partitioner or
distributePartition function) and exactly why Pig stopped using it for its
Union operation. Its not just us internally doing it, the user code could do
anything that the output is not in the same order on rerun of the Map Task.
Like you said, the sort has to be done before the round robin and it has to
be done on the entire record (not just on key for instance if you had
key,value), and I do see this as being a possibly very expensive operation.
But if for instance we can't change repartition to not be evenly distributed,
it seems like one of the only options. I haven't looked at the details about
inserting it here either so I need to do that to understand how complicated it
would be.
If we can't come up with another solution, I would actually be ok with
failing short term, its better then corruption. Or perhaps we can actually
allow user to choose the behavior, have a config for it would fail, one config
for they don't care because they know they checkpointed or something, and one
for doing the sort.
> I think the correct fix is: be more conservative when handling fetch
failure and rerun more reduce > tasks. We can provide an internal API to tag a
RDD as deterministic (very common in Spark SQL) > and then we can safely be
optimistic when handling fetch failure.
Like @jiangxb1987 said and we discussed above I don't think this will work
when you have result tasks that could have committed output. You can't undo a
task commit.
Now we could do a combination of things where as long as we aren't a
ResultTask we fail all reducers and maps to rerun. If we are running ResultTask
you either fail entire job or fall back to the expensive Sort.
Just to kind of summarize, the only solutions that I've thought of or
mentioned by others:
- don't use round robin (which for repartition could break the api since it
wouldn't be evenly distributed)
- Sort all records before doing the round robin assignment ( has to be
deterministic) - probably very expensive and need to investigate how hard code
would be
- Fail all reducers and rerun more if the we haven't started any
ResultTasks, if ResultTask decide another solution
- Just fail entire job on this type of failure
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]