Github user tgravescs commented on the issue:

    https://github.com/apache/spark/pull/22112
  
    > Without making shuffle output order repeatable, we do not have a way to 
properly fix this.
    
    Perhaps I'm missing it, but you are saying shuffle here, but just shuffle 
itself can't fix this.   Your map output has to be consistent and the 
partitioning function has to be consistent.  The shuffle simply transfers the 
bytes its supposed to.  Sparks shuffle of those bytes is not consistent in that 
the order it fetches from can change and without the sort happening on that 
data the order can be different on rerun.  I guess maybe you mean the 
ShuffledRDD as a whole or do you mean something else here?
    
    >  shuffled RDD will never be deterministic unless the shuffle key is the 
entire record and key ordering is specified. 
    
    This is why I say a sort of the entire record (on the bytes themselves if 
they aren't comparable in the RDD case like we talked about in the other pr) 
before the partitioning is about the only true solution to this I've thought 
of.  That can have a big performance impact.  I haven't looked to see how hard 
it is to insert that so I guess I should do that.  Note I'm actually not 
advocating sort for all the operations we are talking about, I'm just saying 
that is the only option I see that "fixes" this reliably without having the 
user.  I think eventually we should do that for repartition or others using the 
round robin type partitioning. 
    
    > What I mentioned was not specific to spark, but general to any MR like 
system.
    > This applies even in hadoop mapreduce and used to be a bug in some of our 
pig udf's :-)
    > For example, if there is random output generated in mapper and there are 
node failures during reducer phase (after all mapper's have completed), the 
exact same problem would occur with random mapper output.
    > We cannot, ofcourse, stop users from doing it - but we do not guarantee 
correct results (just as hadoop mapreduce does not in this scenario).
    
    We are actually in agreement then.  
    All I'm saying is zip is just another variant of this, you could document 
it as such and do nothing internal to spark to "fix it".  The user has to 
handle by sorting, checkpointing, etc.  We could be user friendly by doing 
something like @cloud-fan  is mentioning with failing all reducers when 
possible or just failing if a resultTask has finished unless they specify some 
config that says they know what they are doing.
    
    I guess we can separate out these 2 discussions.  I think the point of this 
pr is to temporarily workaround the data loss/corruption issue with repartition 
by failing.  So if everyone agrees on that lets move the discussion to a jira 
about what to do with the rest of the operators and fix repartition here.   
thoughts?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to