Github user squito commented on the issue:

    https://github.com/apache/spark/pull/19388
  
    @rezasafi and I talked about this for a while offline.  Its a lot trickier 
than this seems at first.  Here's whats going on:
    
    1. `SparkHadoopWriter` is pretending `rddId` is `stageId`, but clearly its 
not.  In the simple test cases they're the same, but they can be arbitrarily 
different.  That is not new -- it was in the old RDD commit protocol 
(https://github.com/apache/spark/blob/branch-2.0/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L1084).
  However, SPARK-18191 changed it so that on the executors, it was using the 
real `stageId`, which resulted in an inconsistency.
    
    2. We really want the `stageId`.  Reza had initially made a change to just 
use the `rddId` consistently, since that is what was being used before.  
However, this leads to an inconsistency, because the `OutputCommitCoordinator` 
actually expects the `stageId`.  Previously, this seemed to work because 
`PairRDDFunctions.saveAsNewAPIHadoopDataset` didn't go through the 
`OutputCommitCoordinator` (!!! can this be right?)  and 
`PairRDDFunctions.saveAsHadoopDataset`, which did go through the 
`OutputCommitCoordinator`, always used the actual `stageId` (it didn't do any 
setup before the `stageId` was available).  e.g here's the code in 2.0: 
https://github.com/apache/spark/blob/branch-2.0/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L1161
    The RDD alone is *not* a good unique identifier -- the same RDD can get 
saved to hadoop multiple times, and those could even be run in parallel.  We 
need to make sure those tasks don't get mixed up with each other.
    
    3. Its actually quite hard to get the right `stageId`.  Its not exposed at 
all to the user before the job starts.  In fact, we *can't* expose the 
`stageId` before your call to `sc.runJob()`, because there could other threads 
racing to submit jobs, and the `stageId` isn't known until its inside the 
`DAGScheduler`'s eventLoop here: 
https://github.com/apache/spark/blob/4f8dc6b01ea787243a38678ea8199fbb0814cffc/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L866
    
    One way to do this would be to add another version of `sc.runJob()` which 
let you pass in yet another function, which is run inside the `DAGScheduler` 
after it has the `stageId`.  I don't even think we should expose that function 
publicly, we just need it inside inside PairRDDFunctions.
    
    Another possibility would be to forget about the actual stageId, and 
instead just get some unique id from the `OutputCommitCoordinator`, which is 
then passed along inside the closures.  But then you'd have to figure out how 
to replicate `OutputCommitCoordinator.stageStart() / stageEnd()`, which 
probably boils down to the same thing -- another version of `runJob`, this one 
which pushes the given ID down into the scheduler.
    
    Any other ideas?
    
    @tgravescs @markhamstra you may be interested in this as well.  I know 
you've told me you've had success w/ speculation lately, but seems like it 
would be pretty broken given the `rddId` vs. `stageId` (or maybe it doesn't 
matter b/c in most cases the Hadoop output committer ignores the `jobContext` 
passed in on `setupJob`?)


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to