zsxwing commented on a change in pull request #25680:
[TODO_JIRA_IS_DOWN][CORE]Use KeyLock to simplify MapOutputTracker.getStatuses
URL: https://github.com/apache/spark/pull/25680#discussion_r320868587
##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -707,51 +710,18 @@ private[spark] class MapOutputTrackerWorker(conf:
SparkConf) extends MapOutputTr
if (statuses == null) {
logInfo("Don't have map outputs for shuffle " + shuffleId + ", fetching
them")
val startTimeNs = System.nanoTime()
- var fetchedStatuses: Array[MapStatus] = null
- fetching.synchronized {
- // Someone else is fetching it; wait for them to be done
- while (fetching.contains(shuffleId)) {
- try {
- fetching.wait()
- } catch {
- case e: InterruptedException =>
- }
- }
-
- // Either while we waited the fetch happened successfully, or
- // someone fetched it in between the get and the fetching.synchronized.
- fetchedStatuses = mapStatuses.get(shuffleId).orNull
+ fetchingLock.withLock(shuffleId) {
+ var fetchedStatuses = mapStatuses.get(shuffleId).orNull
if (fetchedStatuses == null) {
- // We have to do the fetch, get others to wait for us.
- fetching += shuffleId
- }
- }
-
- if (fetchedStatuses == null) {
- // We won the race to fetch the statuses; do so
- logInfo("Doing the fetch; tracker endpoint = " + trackerEndpoint)
- // This try-finally prevents hangs due to timeouts:
- try {
+ logInfo("Doing the fetch; tracker endpoint = " + trackerEndpoint)
val fetchedBytes =
askTracker[Array[Byte]](GetMapOutputStatuses(shuffleId))
fetchedStatuses =
MapOutputTracker.deserializeMapStatuses(fetchedBytes)
logInfo("Got the output locations")
mapStatuses.put(shuffleId, fetchedStatuses)
- } finally {
- fetching.synchronized {
- fetching -= shuffleId
- fetching.notifyAll()
- }
}
- }
- logDebug(s"Fetching map output statuses for shuffle $shuffleId took " +
- s"${TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeNs)}
ms")
-
- if (fetchedStatuses != null) {
+ logDebug(s"Fetching map output statuses for shuffle $shuffleId took " +
+ s"${TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeNs)}
ms")
fetchedStatuses
- } else {
- logError("Missing all output locations for shuffle " + shuffleId)
Review comment:
We never reach here because if `fetchedStatuses` was `null`,
`mapStatuses.put(shuffleId, fetchedStatuses)` would have thrown NPE.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]