Github user srowen commented on a diff in the pull request:
https://github.com/apache/spark/pull/22371#discussion_r216167356
--- Diff:
core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala ---
@@ -138,13 +148,22 @@ private[spark] class IndexShuffleBlockResolver(
mapId: Int,
lengths: Array[Long],
dataTmp: File): Unit = {
+ shuffleIdToLocks.putIfAbsent(shuffleId, new
Array[Object](lengths.length))
+ val mapLocks = shuffleIdToLocks.get(shuffleId)
+ val lock = mapLocks.synchronized {
+ if (mapLocks(mapId) == null) {
+ mapLocks(mapId) = new Object()
+ }
+ mapLocks(mapId)
+ }
+
val indexFile = getIndexFile(shuffleId, mapId)
val indexTmp = Utils.tempFileWith(indexFile)
try {
val dataFile = getDataFile(shuffleId, mapId)
- // There is only one IndexShuffleBlockResolver per executor, this
synchronization make sure
- // the following check and rename are atomic.
- synchronized {
+ // We need make sure the following check and rename are atomic, and
we only need to
--- End diff --
I don't know the logic well enough to really evaluate this, but it looks
plausible. It looks like this block operates on `indexFile` and `dataFile` and
things derived from them, and those appear to be keyed by `shuffleId` and
`mapId`, so sounds plausible that there is no need to synchronize access when
different shuffle or map IDs are used.
I see the synchronized block does things like delete `indexFile`. This is
read outside the synchronized block. I wonder if there is an issue there?
should this really be checking for the file inside a block that excludes
deletion of that file at the same time?
Granted that is the existing logic here and maybe OK to not touch that now,
but I did wonder when trying to reason about this.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]