[ 
https://issues.apache.org/jira/browse/KAFKA-6431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16549996#comment-16549996
 ] 

ASF GitHub Bot commented on KAFKA-6431:
---------------------------------------

ying-zheng closed pull request #5338: KAFKA-6431: Shard purgatory to mitigate 
lock contention
URL: https://github.com/apache/kafka/pull/5338
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/server/DelayedOperation.scala 
b/core/src/main/scala/kafka/server/DelayedOperation.scala
index 2a096e1a811..f92adb704c7 100644
--- a/core/src/main/scala/kafka/server/DelayedOperation.scala
+++ b/core/src/main/scala/kafka/server/DelayedOperation.scala
@@ -19,11 +19,11 @@ package kafka.server
 
 import java.util.concurrent._
 import java.util.concurrent.atomic._
-import java.util.concurrent.locks.{Lock, ReentrantLock, ReentrantReadWriteLock}
+import java.util.concurrent.locks.{Lock, ReentrantLock}
 
 import com.yammer.metrics.core.Gauge
 import kafka.metrics.KafkaMetricsGroup
-import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
+import kafka.utils.CoreUtils.inLock
 import kafka.utils._
 import kafka.utils.timer._
 
@@ -147,6 +147,8 @@ abstract class DelayedOperation(override val delayMs: Long,
 
 object DelayedOperationPurgatory {
 
+  private val Shards = 512 // Shard the watcher list to reduce lock contention
+
   def apply[T <: DelayedOperation](purgatoryName: String,
                                    brokerId: Int = 0,
                                    purgeInterval: Int = 1000,
@@ -168,11 +170,25 @@ final class DelayedOperationPurgatory[T <: 
DelayedOperation](purgatoryName: Stri
                                                              reaperEnabled: 
Boolean = true,
                                                              timerEnabled: 
Boolean = true)
         extends Logging with KafkaMetricsGroup {
-
   /* a list of operation watching keys */
-  private val watchersForKey = new Pool[Any, Watchers](Some((key: Any) => new 
Watchers(key)))
+  private class WatcherList {
+    val watchersForKey = new Pool[Any, Watchers](Some((key: Any) => new 
Watchers(key)))
+
+    val watchersLock = new ReentrantLock()
+
+    /*
+     * Return all the current watcher lists,
+     * note that the returned watchers may be removed from the list by other 
threads
+     */
+    def allWatchers = {
+      inLock(watchersLock) { watchersForKey.values }
+    }
+  }
 
-  private val removeWatchersLock = new ReentrantReadWriteLock()
+  private val watcherLists = 
Array.fill[WatcherList](DelayedOperationPurgatory.Shards)(new WatcherList)
+  private def watcherList(key: Any): WatcherList = {
+    watcherLists(Math.abs(key.hashCode() % watcherLists.length))
+  }
 
   // the number of estimated total operations in the purgatory
   private[this] val estimatedTotalOperations = new AtomicInteger(0)
@@ -270,7 +286,8 @@ final class DelayedOperationPurgatory[T <: 
DelayedOperation](purgatoryName: Stri
    * @return the number of completed operations during this process
    */
   def checkAndComplete(key: Any): Int = {
-    val watchers = inReadLock(removeWatchersLock) { watchersForKey.get(key) }
+    val wl = watcherList(key)
+    val watchers = inLock(wl.watchersLock) { wl.watchersForKey.get(key) }
     if(watchers == null)
       0
     else
@@ -282,7 +299,9 @@ final class DelayedOperationPurgatory[T <: 
DelayedOperation](purgatoryName: Stri
    * on multiple lists, and some of its watched entries may still be in the 
watch lists
    * even when it has been completed, this number may be larger than the 
number of real operations watched
    */
-  def watched: Int = allWatchers.map(_.countWatched).sum
+  def watched(): Int = {
+    watcherLists.foldLeft(0) { _ + _.allWatchers.map(_.countWatched).sum }
+  }
 
   /**
    * Return the number of delayed operations in the expiry queue
@@ -293,27 +312,24 @@ final class DelayedOperationPurgatory[T <: 
DelayedOperation](purgatoryName: Stri
     * Cancel watching on any delayed operations for the given key. Note the 
operation will not be completed
     */
   def cancelForKey(key: Any): List[T] = {
-    inWriteLock(removeWatchersLock) {
-      val watchers = watchersForKey.remove(key)
+    val wl = watcherList(key)
+    inLock(wl.watchersLock) {
+      val watchers = wl.watchersForKey.remove(key)
       if (watchers != null)
         watchers.cancel()
       else
         Nil
     }
   }
-  /*
-   * Return all the current watcher lists,
-   * note that the returned watchers may be removed from the list by other 
threads
-   */
-  private def allWatchers = inReadLock(removeWatchersLock) { 
watchersForKey.values }
 
   /*
    * Return the watch list of the given key, note that we need to
    * grab the removeWatchersLock to avoid the operation being added to a 
removed watcher list
    */
   private def watchForOperation(key: Any, operation: T) {
-    inReadLock(removeWatchersLock) {
-      val watcher = watchersForKey.getAndMaybePut(key)
+    val wl = watcherList(key)
+    inLock(wl.watchersLock) {
+      val watcher = wl.watchersForKey.getAndMaybePut(key)
       watcher.watch(operation)
     }
   }
@@ -322,13 +338,14 @@ final class DelayedOperationPurgatory[T <: 
DelayedOperation](purgatoryName: Stri
    * Remove the key from watcher lists if its list is empty
    */
   private def removeKeyIfEmpty(key: Any, watchers: Watchers) {
-    inWriteLock(removeWatchersLock) {
+    val wl = watcherList(key)
+    inLock(wl.watchersLock) {
       // if the current key is no longer correlated to the watchers to remove, 
skip
-      if (watchersForKey.get(key) != watchers)
+      if (wl.watchersForKey.get(key) != watchers)
         return
 
       if (watchers != null && watchers.isEmpty) {
-        watchersForKey.remove(key)
+        wl.watchersForKey.remove(key)
       }
     }
   }
@@ -424,7 +441,7 @@ final class DelayedOperationPurgatory[T <: 
DelayedOperation](purgatoryName: Stri
       // a little overestimated total number of operations.
       estimatedTotalOperations.getAndSet(delayed)
       debug("Begin purging watch lists")
-      val purged = allWatchers.map(_.purgeCompleted()).sum
+      var purged = watcherLists.foldLeft(0) { _ + 
_.allWatchers.map(_.purgeCompleted()).sum }
       debug("Purged %d elements from watch lists.".format(purged))
     }
   }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Lock contention in Purgatory
> ----------------------------
>
>                 Key: KAFKA-6431
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6431
>             Project: Kafka
>          Issue Type: Improvement
>          Components: core, purgatory
>            Reporter: Ying Zheng
>            Assignee: Ying Zheng
>            Priority: Minor
>
> Purgatory is the data structure in Kafka broker that manages delayed 
> operations. There is a ConcurrentHashMap (Kafka Pool) maps each operation key 
> to the operations (in a ConcurrentLinkedQueue) that are interested in the key.
> When an operation is done or expired, it's removed from the list 
> (ConcurrentLinkedQueue). When the list is empty, it's removed from the 
> ConcurrentHashMap. The 2nd operation has to be protected by a lock, to avoid 
> adding new operations into a list that is being removed. This is currently 
> done by a globally shared ReentrantReadWriteLock. All the read operations on 
> purgatory have to acquire the read permission of this lock. The list removing 
> operations needs the write permission of this lock.
> Our profiling result shows that Kafka broker is spending a nontrivial amount 
> of time on this read write lock.
> The problem is exacerbated when there are a large amount of short operations. 
> For example, when we are doing sync produce operations (acks=all), a 
> DelayedProduce operation is added and then removed for each message. If the 
> QPS of the topic is not high, it's very likely that, when the operation is 
> done and removed, the list of that key (topic partitions) also becomes empty, 
> and has to be removed when holding the write lock. This operation blocks all 
> the read / write operations on entire purgatory for awhile. As there are tens 
> of IO threads accessing purgatory concurrently, this shared lock can easily 
> become a bottleneck. 
> Actually, we only want to avoid concurrent read / write on the same key. The 
> operations on different keys do not conflict with each other.
> I suggest to shard purgatory into smaller partitions, and lock each 
> individual partition independently.
> Assuming there are 10 io threads actively accessing purgatory, sharding 
> purgatory into 512 partitions will make the probability for 2 or more threads 
> accessing the same partition at the same time to be about 2%. We can also use 
> ReentrantLock instead of ReentrantReadWriteLock. When the read operations are 
> not much more than write operations, ReentrantLock has lower overhead than 
> ReentrantReadWriteLock.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to