[ 
https://issues.apache.org/jira/browse/KAFKA-7697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16707975#comment-16707975
 ] 

Rajini Sivaram commented on KAFKA-7697:
---------------------------------------

Changes made under KAFKA-7395 now protect fetch using the Partition's 
{{leaderIsrUpdateLock}}. This results in the read lock being acquired while 
completing a delayed fetch. This is unsafe since delayed operations can be 
completed while holding onto another Partition lock. For example the thread 
dump for request-handler-4 shows:

{quote}
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x000000070821f188> (a 
java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireShared(AbstractQueuedSynchronizer.java:967)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireShared(AbstractQueuedSynchronizer.java:1283)
        at 
java.util.concurrent.locks.ReentrantReadWriteLock$ReadLock.lock(ReentrantReadWriteLock.java:727)
        at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:249)
        at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:257)
        at kafka.cluster.Partition.fetchOffsetSnapshot(Partition.scala:832)
        at 
kafka.server.DelayedFetch.$anonfun$tryComplete$1(DelayedFetch.scala:87)
        at 
kafka.server.DelayedFetch.$anonfun$tryComplete$1$adapted(DelayedFetch.scala:79)
        at kafka.server.DelayedFetch$$Lambda$912/582152661.apply(Unknown Source)
        at 
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58)
        at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at kafka.server.DelayedFetch.tryComplete(DelayedFetch.scala:79)
        at 
kafka.server.DelayedOperation.maybeTryComplete(DelayedOperation.scala:121)
        at 
kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:371)
        at 
kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:277)
        at 
kafka.server.ReplicaManager.tryCompleteDelayedFetch(ReplicaManager.scala:307)
        at 
kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:743)
        at kafka.cluster.Partition$$Lambda$917/80048373.apply(Unknown Source)
        at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
        at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:257)
        at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:729)
        at 
kafka.server.ReplicaManager.$anonfun$appendToLocalLog$2(ReplicaManager.scala:735)
        at kafka.server.ReplicaManager$$Lambda$915/220982367.apply(Unknown 
Source)
        at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
        at scala.collection.TraversableLike$$Lambda$12/1209669119.apply(Unknown 
Source)
        at 
scala.collection.mutable.HashMap.$anonfun$foreach$1(HashMap.scala:145)
        at scala.collection.mutable.HashMap$$Lambda$24/477289012.apply(Unknown 
Source)
        at scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:235)
        at scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:228)
        at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
        at scala.collection.mutable.HashMap.foreach(HashMap.scala:145)
        at scala.collection.TraversableLike.map(TraversableLike.scala:233)
        at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
        at 
kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:723)
        at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:470)
        at kafka.server.KafkaApis.handleProduceRequest(KafkaApis.scala:482)
        at kafka.server.KafkaApis.handle(KafkaApis.scala:106)
        at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
        at java.lang.Thread.run(Thread.java:748)
{quote}

A whole bunch of threads including all request handler threads seem to be 
deadlocked as a result of {{leaderIsrUpdateLock}} of two partitions that are 
blocked while completing delayed fetch as a result of waiting writers.

For purgatory operations that acquire a lock, we use that lock as the delayed 
operation lock, but that is not an option here since fetch could contain 
multiple partitions. So we need some other way to avoid blocking for a 
Partition lock while holding onto another Partition lock.

> Possible deadlock in kafka.cluster.Partition
> --------------------------------------------
>
>                 Key: KAFKA-7697
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7697
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 2.1.0
>            Reporter: Gian Merlino
>            Assignee: Rajini Sivaram
>            Priority: Blocker
>             Fix For: 2.2.0, 2.1.1
>
>         Attachments: threaddump.txt
>
>
> After upgrading a fairly busy broker from 0.10.2.0 to 2.1.0, it locked up 
> within a few minutes (by "locked up" I mean that all request handler threads 
> were busy, and other brokers reported that they couldn't communicate with 
> it). I restarted it a few times and it did the same thing each time. After 
> downgrading to 0.10.2.0, the broker was stable. I attached a thread dump from 
> the last attempt on 2.1.0 that shows lots of kafka-request-handler- threads 
> trying to acquire the leaderIsrUpdateLock lock in kafka.cluster.Partition.
> It jumps out that there are two threads that already have some read lock 
> (can't tell which one) and are trying to acquire a second one (on two 
> different read locks: 0x0000000708184b88 and 0x000000070821f188): 
> kafka-request-handler-1 and kafka-request-handler-4. Both are handling a 
> produce request, and in the process of doing so, are calling 
> Partition.fetchOffsetSnapshot while trying to complete a DelayedFetch. At the 
> same time, both of those locks have writers from other threads waiting on 
> them (kafka-request-handler-2 and kafka-scheduler-6). Neither of those locks 
> appear to have writers that hold them (if only because no threads in the dump 
> are deep enough in inWriteLock to indicate that).
> ReentrantReadWriteLock in nonfair mode prioritizes waiting writers over 
> readers. Is it possible that kafka-request-handler-1 and 
> kafka-request-handler-4 are each trying to read-lock the partition that is 
> currently locked by the other one, and they're both parked waiting for 
> kafka-request-handler-2 and kafka-scheduler-6 to get write locks, which they 
> never will, because the former two threads own read locks and aren't giving 
> them up?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to