dongjoon-hyun commented on a change in pull request #32730:
URL: https://github.com/apache/spark/pull/32730#discussion_r646245154



##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -154,14 +159,26 @@ private class ShuffleStatus(
    */
   def updateMapOutput(mapId: Long, bmAddress: BlockManagerId): Unit = 
withWriteLock {
     try {
-      val mapStatusOpt = mapStatuses.find(_.mapId == mapId)
+      val mapStatusOpt = mapStatuses.find(x => x != null && x.mapId == mapId)
       mapStatusOpt match {
         case Some(mapStatus) =>
           logInfo(s"Updating map output for ${mapId} to ${bmAddress}")
           mapStatus.updateLocation(bmAddress)
           invalidateSerializedMapOutputStatusCache()
         case None =>
-          logWarning(s"Asked to update map output ${mapId} for untracked map 
status.")
+          val index = mapStatusesDeleted.indexWhere(x => x != null && x.mapId 
== mapId)
+          if (index >= 0) {
+            val mapStatus = mapStatusesDeleted(index)
+            mapStatus.updateLocation(bmAddress)
+            assert(mapStatuses(index) == null)

Review comment:
       > I have not looked into decomissioning in detail - but this assumption 
will hold right (assertion) ?
   
   For the worker decommission, yes. There are three cases.
   - When the worker is decommissioning, it should be handled at the previous 
`Some(mapStatus)`.
   - When the worker is killed before finishing, new executor is started before 
handling the dead executors. It should be handled at the previous 
`Some(mapStatus)`.
   - When the worker is killed before finishing, the data of the deleted 
executors are removed and `Asked to update map output ${mapId} for untracked 
map status`  happened before. And, this PR is fixing it.
   
   For the following, if something is filled here already with the different 
mapId, we can update this or ignore this. However, in both case, `assert` is 
too strong. In that case, we had better skip in that case. Thanks! I'll update 
this PR.
   > Can there be an interleaving recomputation or speculative task which 
updates MOT for that partition index ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to