Github user tgravescs commented on the issue:
https://github.com/apache/spark/pull/22112
> Without making shuffle output order repeatable, we do not have a way to
properly fix this.
Perhaps I'm missing it, but you are saying shuffle here, but just shuffle
itself can't fix this. Your map output has to be consistent and the
partitioning function has to be consistent. The shuffle simply transfers the
bytes its supposed to. Sparks shuffle of those bytes is not consistent in that
the order it fetches from can change and without the sort happening on that
data the order can be different on rerun. I guess maybe you mean the
ShuffledRDD as a whole or do you mean something else here?
> shuffled RDD will never be deterministic unless the shuffle key is the
entire record and key ordering is specified.
This is why I say a sort of the entire record (on the bytes themselves if
they aren't comparable in the RDD case like we talked about in the other pr)
before the partitioning is about the only true solution to this I've thought
of. That can have a big performance impact. I haven't looked to see how hard
it is to insert that so I guess I should do that. Note I'm actually not
advocating sort for all the operations we are talking about, I'm just saying
that is the only option I see that "fixes" this reliably without having the
user. I think eventually we should do that for repartition or others using the
round robin type partitioning.
> What I mentioned was not specific to spark, but general to any MR like
system.
> This applies even in hadoop mapreduce and used to be a bug in some of our
pig udf's :-)
> For example, if there is random output generated in mapper and there are
node failures during reducer phase (after all mapper's have completed), the
exact same problem would occur with random mapper output.
> We cannot, ofcourse, stop users from doing it - but we do not guarantee
correct results (just as hadoop mapreduce does not in this scenario).
We are actually in agreement then.
All I'm saying is zip is just another variant of this, you could document
it as such and do nothing internal to spark to "fix it". The user has to
handle by sorting, checkpointing, etc. We could be user friendly by doing
something like @cloud-fan is mentioning with failing all reducers when
possible or just failing if a resultTask has finished unless they specify some
config that says they know what they are doing.
I guess we can separate out these 2 discussions. I think the point of this
pr is to temporarily workaround the data loss/corruption issue with repartition
by failing. So if everyone agrees on that lets move the discussion to a jira
about what to do with the rest of the operators and fix repartition here.
thoughts?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]