juliuszsompolski commented on a change in pull request #25612: 
[SPARK-3137][Core]Replace the global TorrentBroadcast lock with fine grained 
locks
URL: https://github.com/apache/spark/pull/25612#discussion_r318971022
 
 

 ##########
 File path: 
core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
 ##########
 @@ -290,6 +293,42 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: 
T, id: Long)
 
 private object TorrentBroadcast extends Logging {
 
+  /** Locks to ensure there is only one thread fetching the same 
[[TorrentBroadcast]] block. */
+  private val torrentBroadcastLock = new ConcurrentHashMap[BroadcastBlockId, 
AnyRef]()
+
+  /** Acquire a lock for fetching a [[TorrentBroadcast]] block. */
+  private def acquireTorrentBroadcastLock(broadcastId: BroadcastBlockId): Unit 
= {
+    while (true) {
+      val lock = torrentBroadcastLock.putIfAbsent(broadcastId, new Object)
+      if (lock == null) return
+      lock.synchronized {
+        while (torrentBroadcastLock.get(broadcastId) eq lock) {
+          lock.wait()
 
 Review comment:
   Why not just stripe over a fixed number of Reentrant locks? This should 
reduce contention enough (and potential collissions can be controlled with 
numLocks) is easy to understand and  doesn't reimplement its own lock queues. 
It's also fully interruptible, while in the current implementation the threads 
that are queueing for the lock are not interruptible. 
   ```
   private val numLocks = 100
   private val locks = (1 to numLocks).map(_ => new 
java.util.concurrent.locks.ReentrantLock)
   
   private def withTorrentBroadcastLock[T](broadcastId: BroadcastBlockId)(func: 
=> T): T = {
     val lock = locks(broadcastId.broadcastId % numLocks)
     lock.lockInterruptibly()
     try {
       func
     } finally {
       lock.unlock()
     }
   }
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to