Ngone51 commented on PR #46706:
URL: https://github.com/apache/spark/pull/46706#issuecomment-2130842872

   @dongjoon-hyun  @mridulm  Sorry, can we make it a bug and backport to 
maintenance release branches? This actually causes us an issue internally. I 
was pushing a quick fix before realizing it is the root cause.
   
   The issue leads to shuffle fetch failure and the job failure in the end. It 
happens this way:
   
   1) Stage A computes the partition P0 by task t1 (TID, a.k.a mapId) on 
executor e1
   2) Executor Y starts deommission
   3) Executor Y reports false-positve lost to driver during its decommission
   4) Stage B reuse the shuffle dependency with Stage A, and computes the 
partition P0 again by task t2 on executor e2
   5) When task t2 finishes, we see two items ((t1 -> P0), (t2 -> P0)) for the 
same paritition  in `mapIdToMapIndex` but only one item 
(mapStatuses(P0)=MapStatus(t2, e2)) in `mapStatuses`.
   6) Executor Y starts to migrate task t1's mapstatus (to executor e3 for 
example) and call `updateMapOutput` on driver. Regarding to 5), we'd use mapId 
(i.e., t1) to get mapIndex (i.e., P0) and use P0 to get task t2's mapstatus.
   
   ```scala
   // updateMapOutput
   val mapIndex = mapIdToMapIndex.get(mapId)
   val mapStatusOpt = mapIndex.map(mapStatuses(_)).flatMap(Option(_))
   ```
   7) Task t2's mapstatus's location then would be updated to executor e3 but 
it's indeed still located on executor e2. This finally leads to the fetch 
failure in the end.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to