dongjoon-hyun commented on a change in pull request #32730:
URL: https://github.com/apache/spark/pull/32730#discussion_r646245154
##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -154,14 +159,26 @@ private class ShuffleStatus(
*/
def updateMapOutput(mapId: Long, bmAddress: BlockManagerId): Unit =
withWriteLock {
try {
- val mapStatusOpt = mapStatuses.find(_.mapId == mapId)
+ val mapStatusOpt = mapStatuses.find(x => x != null && x.mapId == mapId)
mapStatusOpt match {
case Some(mapStatus) =>
logInfo(s"Updating map output for ${mapId} to ${bmAddress}")
mapStatus.updateLocation(bmAddress)
invalidateSerializedMapOutputStatusCache()
case None =>
- logWarning(s"Asked to update map output ${mapId} for untracked map
status.")
+ val index = mapStatusesDeleted.indexWhere(x => x != null && x.mapId
== mapId)
+ if (index >= 0) {
+ val mapStatus = mapStatusesDeleted(index)
+ mapStatus.updateLocation(bmAddress)
+ assert(mapStatuses(index) == null)
Review comment:
> I have not looked into decomissioning in detail - but this assumption
will hold right (assertion) ?
For the worker decommission, yes. There are three cases.
- When the worker is decommissioning, it should be handled at the previous
`Some(mapStatus)`.
- When the worker is killed before finishing and new executor is started
before handling the dead executors, it should be handled at the previous
`Some(mapStatus)`.
- When the worker is killed before finishing and the data of the deleted
executors are removed, `Asked to update map output ${mapId} for untracked map
status` happened before. And, this PR is fixing it.
For the following, if something is filled here already with the different
mapId, we can update this or ignore this. However, in both case, `assert` is
too strong. In that case, we had better skip in that case. Thanks! I'll update
this PR.
> Can there be an interleaving recomputation or speculative task which
updates MOT for that partition index ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]