mridulm commented on a change in pull request #32730:
URL: https://github.com/apache/spark/pull/32730#discussion_r644893840



##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -146,6 +151,7 @@ private class ShuffleStatus(
       _numAvailableMapOutputs += 1
       invalidateSerializedMapOutputStatusCache()
     }
+    mapStatusesDeleted(mapIndex) = mapStatuses(mapIndex)

Review comment:
       Probably also remove from `mapStatusesDeleted` if present ?

##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -234,6 +252,7 @@ private class ShuffleStatus(
     for (mapIndex <- mapStatuses.indices) {
       if (mapStatuses(mapIndex) != null && f(mapStatuses(mapIndex).location)) {
         _numAvailableMapOutputs -= 1
+        mapStatusesDeleted(mapIndex) = mapStatuses(mapIndex)

Review comment:
       You are right, the check on mapId and mapIndex ensure correctness. 
Thanks for clarifying !

##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -146,6 +151,7 @@ private class ShuffleStatus(
       _numAvailableMapOutputs += 1
       invalidateSerializedMapOutputStatusCache()
     }
+    mapStatusesDeleted(mapIndex) = mapStatuses(mapIndex)

Review comment:
       (I am assuming @viirya has the same query)
   Given we have a new map output, which we are storing as 
`mapStatuses(mapIndex) = status` in the next line, why are we saving the 
previous mapStatus ?
   I am sure I am missing something here.

##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -154,14 +160,25 @@ private class ShuffleStatus(
    */
   def updateMapOutput(mapId: Long, bmAddress: BlockManagerId): Unit = 
withWriteLock {
     try {
-      val mapStatusOpt = mapStatuses.find(_.mapId == mapId)
+      val mapStatusOpt = mapStatuses.find(x => x != null && x.mapId == mapId)
       mapStatusOpt match {
         case Some(mapStatus) =>
           logInfo(s"Updating map output for ${mapId} to ${bmAddress}")
           mapStatus.updateLocation(bmAddress)
           invalidateSerializedMapOutputStatusCache()
         case None =>
-          logWarning(s"Asked to update map output ${mapId} for untracked map 
status.")
+          val index = mapStatusesDeleted.indexWhere(x => x != null && x.mapId 
== mapId)
+          if (index >= 0) {
+            val mapStatus = mapStatusesDeleted(index)
+            mapStatus.updateLocation(bmAddress)
+            assert(mapStatuses(index) == null)
+            mapStatuses(index) = mapStatus
+            _numAvailableMapOutputs += 1
+            invalidateSerializedMapOutputStatusCache()
+            logDebug(s"Recover ${mapStatus.mapId} ${mapStatus.location}")

Review comment:
       But that will trigger another `removeMapOutput` right ?
   And so get saved in `mapStatusesDeleted` at that time ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to