mridulm commented on code in PR #46706:
URL: https://github.com/apache/spark/pull/46706#discussion_r1610232331


##########
core/src/main/scala/org/apache/spark/MapOutputTracker.scala:
##########
@@ -153,8 +152,10 @@ private class ShuffleStatus(
   /**
    * Mapping from a mapId to the mapIndex, this is required to reduce the 
searching overhead within
    * the function updateMapOutput(mapId, bmAddress).
+   *
+   * Exposed for testing.
    */
-  private[this] val mapIdToMapIndex = new OpenHashMap[Long, Int]()
+  private[spark] val mapIdToMapIndex = new HashMap[Long, Int]()

Review Comment:
   QQ: Why change to `HashMap` from `OpenHashMap` ? (it is specialized for 
`Long` and `Int`)



##########
core/src/main/scala/org/apache/spark/MapOutputTracker.scala:
##########
@@ -164,6 +165,8 @@ private class ShuffleStatus(
     if (mapStatuses(mapIndex) == null) {
       _numAvailableMapOutputs += 1
       invalidateSerializedMapOutputStatusCache()
+    } else {
+      mapIdToMapIndex.remove(mapStatuses(mapIndex).mapId)
     }

Review Comment:
   ```suggestion
       val currentMapStatus = mapStatuses(mapIndex)
       if (currentMapStatus == null) {
         _numAvailableMapOutputs += 1
         invalidateSerializedMapOutputStatusCache()
       } else {
         mapIdToMapIndex.remove(currentMapStatus.mapId)
       }
   ```



##########
core/src/main/scala/org/apache/spark/MapOutputTracker.scala:
##########
@@ -224,7 +227,9 @@ private class ShuffleStatus(
     logDebug(s"Removing existing map output ${mapIndex} ${bmAddress}")
     if (mapStatuses(mapIndex) != null && mapStatuses(mapIndex).location == 
bmAddress) {
       _numAvailableMapOutputs -= 1
-      mapStatusesDeleted(mapIndex) = mapStatuses(mapIndex)
+      val currentMapStatus = mapStatuses(mapIndex)

Review Comment:
   Pull `currentMapStatus` outside the `if`, and use it for the `if` condition 
as well.



##########
core/src/main/scala/org/apache/spark/MapOutputTracker.scala:
##########
@@ -224,7 +227,9 @@ private class ShuffleStatus(
     logDebug(s"Removing existing map output ${mapIndex} ${bmAddress}")
     if (mapStatuses(mapIndex) != null && mapStatuses(mapIndex).location == 
bmAddress) {
       _numAvailableMapOutputs -= 1
-      mapStatusesDeleted(mapIndex) = mapStatuses(mapIndex)
+      val currentMapStatus = mapStatuses(mapIndex)
+      mapIdToMapIndex.remove(currentMapStatus.mapId)

Review Comment:
   Removing it here will mean we cant query for it in `mapStatusesDeleted`, 
where we are relying on mapId -> mapIndex being in mapIdToMapIndex even when 
mapIndex is in `mapStatusesDeleted`
   
   We should move this cleanup to when `mapStatusesDeleted` is being cleaned up.
   
   Same applies to the cases below as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to