Ngone51 commented on PR #46706: URL: https://github.com/apache/spark/pull/46706#issuecomment-2130842872
@dongjoon-hyun @mridulm Sorry, can we make it a bug and backport to maintenance release branches? This actually causes us an issue internally. I was pushing a quick fix before realizing it is the root cause. The issue leads to shuffle fetch failure and the job failure in the end. It happens this way: 1) Stage A computes the partition P0 by task t1 (TID, a.k.a mapId) on executor e1 2) Executor Y starts deommission 3) Executor Y reports false-positve lost to driver during its decommission 4) Stage B reuse the shuffle dependency with Stage A, and computes the partition P0 again by task t2 on executor e2 5) When task t2 finishes, we see two items ((t1 -> P0), (t2 -> P0)) for the same paritition in `mapIdToMapIndex` but only one item (mapStatuses(P0)=MapStatus(t2, e2)) in `mapStatuses`. 6) Executor Y starts to migrate task t1's mapstatus (to executor e3 for example) and call `updateMapOutput` on driver. Regarding to 5), we'd use mapId (i.e., t1) to get mapIndex (i.e., P0) and use P0 to get task t2's mapstatus. ```scala // updateMapOutput val mapIndex = mapIdToMapIndex.get(mapId) val mapStatusOpt = mapIndex.map(mapStatuses(_)).flatMap(Option(_)) ``` 7) Task t2's mapstatus's location then would be updated to executor e3 but it's indeed still located on executor e2. This finally leads to the fetch failure in the end. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
