[ 
https://issues.apache.org/jira/browse/KAFKA-6517?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16366212#comment-16366212
 ] 

ASF GitHub Bot commented on KAFKA-6517:
---------------------------------------

rajinisivaram closed pull request #4551: KAFKA-6517: Avoid deadlock in 
ZooKeeperClient during session expiry
URL: https://github.com/apache/kafka/pull/4551
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala 
b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
index 9a1d16274df..3934fd0ad5d 100644
--- a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
+++ b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
@@ -24,7 +24,7 @@ import java.util.concurrent.{ArrayBlockingQueue, 
ConcurrentHashMap, CountDownLat
 import com.yammer.metrics.core.{Gauge, MetricName}
 import kafka.metrics.KafkaMetricsGroup
 import kafka.utils.CoreUtils.{inLock, inReadLock, inWriteLock}
-import kafka.utils.Logging
+import kafka.utils.{KafkaScheduler, Logging}
 import org.apache.kafka.common.utils.Time
 import org.apache.zookeeper.AsyncCallback.{ACLCallback, Children2Callback, 
DataCallback, StatCallback, StringCallback, VoidCallback}
 import org.apache.zookeeper.KeeperException.Code
@@ -59,6 +59,7 @@ class ZooKeeperClient(connectString: String,
   private val zNodeChildChangeHandlers = new ConcurrentHashMap[String, 
ZNodeChildChangeHandler]().asScala
   private val inFlightRequests = new Semaphore(maxInFlightRequests)
   private val stateChangeHandlers = new ConcurrentHashMap[String, 
StateChangeHandler]().asScala
+  private[zookeeper] val expiryScheduler = new KafkaScheduler(0, 
"zk-session-expiry-handler")
 
   private val metricNames = Set[String]()
 
@@ -90,6 +91,7 @@ class ZooKeeperClient(connectString: String,
 
   metricNames += "SessionState"
 
+  expiryScheduler.startup()
   waitUntilConnected(connectionTimeoutMs, TimeUnit.MILLISECONDS)
 
   override def metricName(name: String, metricTags: 
scala.collection.Map[String, String]): MetricName = {
@@ -122,7 +124,7 @@ class ZooKeeperClient(connectString: String,
    * response type (e.g. Seq[CreateRequest] -> Seq[CreateResponse]). 
Otherwise, the most specific common supertype
    * will be used (e.g. Seq[AsyncRequest] -> Seq[AsyncResponse]).
    */
-  def handleRequests[Req <: AsyncRequest](requests: Seq[Req]): 
Seq[Req#Response] = inReadLock(initializationLock) {
+  def handleRequests[Req <: AsyncRequest](requests: Seq[Req]): 
Seq[Req#Response] = {
     if (requests.isEmpty)
       Seq.empty
     else {
@@ -132,10 +134,12 @@ class ZooKeeperClient(connectString: String,
       requests.foreach { request =>
         inFlightRequests.acquire()
         try {
-          send(request) { response =>
-            responseQueue.add(response)
-            inFlightRequests.release()
-            countDownLatch.countDown()
+          inReadLock(initializationLock) {
+            send(request) { response =>
+              responseQueue.add(response)
+              inFlightRequests.release()
+              countDownLatch.countDown()
+            }
           }
         } catch {
           case e: Throwable =>
@@ -148,7 +152,8 @@ class ZooKeeperClient(connectString: String,
     }
   }
 
-  private def send[Req <: AsyncRequest](request: Req)(processResponse: 
Req#Response => Unit): Unit = {
+  // Visibility to override for testing
+  private[zookeeper] def send[Req <: AsyncRequest](request: 
Req)(processResponse: Req#Response => Unit): Unit = {
     // Safe to cast as we always create a response of the right type
     def callback(response: AsyncResponse): Unit = 
processResponse(response.asInstanceOf[Req#Response])
 
@@ -303,12 +308,18 @@ class ZooKeeperClient(connectString: String,
     stateChangeHandlers.clear()
     zooKeeper.close()
     metricNames.foreach(removeMetric(_))
+    expiryScheduler.shutdown()
     info("Closed.")
   }
 
   def sessionId: Long = inReadLock(initializationLock) {
     zooKeeper.getSessionId
   }
+
+  // Only for testing
+  private[zookeeper] def currentZooKeeper: ZooKeeper = 
inReadLock(initializationLock) {
+    zooKeeper
+  }
   
   private def initialize(): Unit = {
     if (!connectionState.isAlive) {
@@ -352,12 +363,14 @@ class ZooKeeperClient(connectString: String,
             error("Auth failed.")
             stateChangeHandlers.values.foreach(_.onAuthFailure())
           } else if (state == KeeperState.Expired) {
-            inWriteLock(initializationLock) {
-              info("Session expired.")
-              stateChangeHandlers.values.foreach(_.beforeInitializingSession())
-              initialize()
-              stateChangeHandlers.values.foreach(_.afterInitializingSession())
-            }
+            expiryScheduler.schedule("zk-session-expired", () => {
+              inWriteLock(initializationLock) {
+                info("Session expired.")
+                
stateChangeHandlers.values.foreach(_.beforeInitializingSession())
+                initialize()
+                
stateChangeHandlers.values.foreach(_.afterInitializingSession())
+              }
+            }, delay = 0L, period = -1L, unit = TimeUnit.MILLISECONDS)
           }
         case Some(path) =>
           (event.getType: @unchecked) match {
diff --git a/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala 
b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
index c8ebaa90735..f1c09d7308d 100644
--- a/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
@@ -19,7 +19,7 @@ package kafka.zookeeper
 import java.nio.charset.StandardCharsets
 import java.util.UUID
 import java.util.concurrent.atomic.AtomicBoolean
-import java.util.concurrent.{ArrayBlockingQueue, CountDownLatch, TimeUnit}
+import java.util.concurrent.{ArrayBlockingQueue, ConcurrentLinkedQueue, 
CountDownLatch, Executors, Semaphore, TimeUnit}
 
 import com.yammer.metrics.Metrics
 import com.yammer.metrics.core.{Gauge, Meter, MetricName}
@@ -29,8 +29,8 @@ import org.apache.kafka.common.utils.Time
 import org.apache.zookeeper.KeeperException.{Code, NoNodeException}
 import org.apache.zookeeper.Watcher.Event.{EventType, KeeperState}
 import org.apache.zookeeper.ZooKeeper.States
-import org.apache.zookeeper.{CreateMode, WatchedEvent, ZooDefs}
-import org.junit.Assert.{assertArrayEquals, assertEquals, assertTrue}
+import org.apache.zookeeper.{CreateMode, WatchedEvent, Watcher, ZooDefs, 
ZooKeeper}
+import org.junit.Assert.{assertArrayEquals, assertEquals, assertFalse, 
assertNull, assertTrue}
 import org.junit.{After, Before, Test}
 
 import scala.collection.JavaConverters._
@@ -385,6 +385,114 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
     } finally zooKeeperClient.close()
   }
 
+  /**
+    * Tests that if session expiry notification is received while a thread is 
processing requests,
+    * session expiry is handled and the request thread completes with 
responses to all requests,
+    * even though some requests may fail due to session expiry or 
disconnection.
+    *
+    * Sequence of events on different threads:
+    *   Request thread:
+    *       - Sends `maxInflightRequests` requests (these may complete before 
session is expired)
+    *   Main thread:
+    *       - Waits for at least one request to be processed (this should 
succeed)
+    *       - Expires session by creating new client with same session id
+    *       - Unblocks another `maxInflightRequests` requests before and after 
new client is closed (these may fail)
+    *   ZooKeeperClient Event thread:
+    *       - Delivers responses and session expiry (no ordering guarantee 
between these, both are processed asynchronously)
+    *   Response executor thread:
+    *       - Blocks subsequent sends by delaying response until session 
expiry is processed
+    *   ZooKeeperClient Session Expiry Handler:
+    *       - Unblocks subsequent sends
+    *   Main thread:
+    *       - Waits for all sends to complete. The requests sent after session 
expiry processing should succeed.
+    */
+  @Test
+  def testSessionExpiry(): Unit = {
+    val maxInflightRequests = 2
+    val responseExecutor = Executors.newSingleThreadExecutor
+    val sendSemaphore = new Semaphore(0)
+    val sendCompleteSemaphore = new Semaphore(0)
+    val sendSize = maxInflightRequests * 5
+    @volatile var resultCodes: Seq[Code] = null
+    val stateChanges = new ConcurrentLinkedQueue[String]()
+    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, 
zkConnectionTimeout, maxInflightRequests,
+      time, "testGroupType", "testGroupName") {
+      override def send[Req <: AsyncRequest](request: Req)(processResponse: 
Req#Response => Unit): Unit = {
+        super.send(request)( response => {
+          responseExecutor.submit(new Runnable {
+            override def run(): Unit = {
+              sendCompleteSemaphore.release()
+              sendSemaphore.acquire()
+              processResponse(response)
+            }
+          })
+        })
+      }
+    }
+    try {
+      zooKeeperClient.registerStateChangeHandler(new StateChangeHandler {
+        override val name: String ="test-state-change-handler"
+        override def afterInitializingSession(): Unit = {
+          verifyHandlerThread()
+          stateChanges.add("afterInitializingSession")
+        }
+        override def beforeInitializingSession(): Unit = {
+          verifyHandlerThread()
+          stateChanges.add("beforeInitializingSession")
+          sendSemaphore.release(sendSize) // Resume remaining sends
+        }
+        private def verifyHandlerThread(): Unit = {
+          val threadName = Thread.currentThread.getName
+          assertTrue(s"Unexpected thread + $threadName", 
threadName.startsWith(zooKeeperClient.expiryScheduler.threadNamePrefix))
+        }
+      })
+
+      val requestThread = new Thread {
+        override def run(): Unit = {
+          val requests = (1 to sendSize).map(i => GetDataRequest(s"/$i"))
+          resultCodes = 
zooKeeperClient.handleRequests(requests).map(_.resultCode)
+        }
+      }
+      requestThread.start()
+      sendCompleteSemaphore.acquire() // Wait for request thread to start 
processing requests
+
+      // Trigger session expiry by reusing the session id in another client
+      val dummyWatcher = new Watcher {
+        override def process(event: WatchedEvent): Unit = {}
+      }
+      val anotherZkClient = new ZooKeeper(zkConnect, 1000, dummyWatcher,
+        zooKeeperClient.currentZooKeeper.getSessionId,
+        zooKeeperClient.currentZooKeeper.getSessionPasswd)
+      assertNull(anotherZkClient.exists("/nonexistent", false)) // Make sure 
new client works
+      sendSemaphore.release(maxInflightRequests) // Resume a few more sends 
which may fail
+      anotherZkClient.close()
+      sendSemaphore.release(maxInflightRequests) // Resume a few more sends 
which may fail
+
+      requestThread.join(10000)
+      if (requestThread.isAlive) {
+        requestThread.interrupt()
+        fail("Request thread did not complete")
+      }
+      assertEquals(Seq("beforeInitializingSession", 
"afterInitializingSession"), stateChanges.asScala.toSeq)
+
+      assertEquals(resultCodes.size, sendSize)
+      val connectionLostCount = resultCodes.count(_ == Code.CONNECTIONLOSS)
+      assertTrue(s"Unexpected connection lost requests $resultCodes", 
connectionLostCount <= maxInflightRequests)
+      val expiredCount = resultCodes.count(_ == Code.SESSIONEXPIRED)
+      assertTrue(s"Unexpected session expired requests $resultCodes", 
expiredCount <= maxInflightRequests)
+      assertTrue(s"No connection lost or expired requests $resultCodes", 
connectionLostCount + expiredCount > 0)
+      assertEquals(Code.NONODE, resultCodes.head)
+      assertEquals(Code.NONODE, resultCodes.last)
+      assertTrue(s"Unexpected result code $resultCodes",
+        resultCodes.filterNot(Set(Code.NONODE, Code.SESSIONEXPIRED, 
Code.CONNECTIONLOSS).contains).isEmpty)
+
+    } finally {
+      zooKeeperClient.close()
+      responseExecutor.shutdownNow()
+    }
+    assertFalse("Expiry executor not shutdown", 
zooKeeperClient.expiryScheduler.isStarted)
+  }
+
   def isExpectedMetricName(metricName: MetricName, name: String): Boolean =
     metricName.getName == name && metricName.getGroup == "testMetricGroup" && 
metricName.getType == "testMetricType"
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> ZooKeeperClient holds a lock while waiting for responses, blocking shutdown
> ---------------------------------------------------------------------------
>
>                 Key: KAFKA-6517
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6517
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 1.1.0
>            Reporter: Rajini Sivaram
>            Assignee: Rajini Sivaram
>            Priority: Blocker
>             Fix For: 1.1.0
>
>
> Stack traces from a local test run that was deadlocked because shutdown 
> couldn't acquire the lock:
>  # kafka-scheduler-7: acquired read lock in 
> kafka.zookeeper.ZooKeeperClient.handleRequests
>  # Test worker-EventThread waiting for write lock to process SessionExpired 
> in kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$.process
>  # ForkJoinPool-1-worker-11 processing KafkaServer.shutdown is queued behind 
> 2) waiting to acquire read lock for 
> kafka.zookeeper.ZooKeeperClient.unregisterStateChangeHandler
> Stack traces of the relevant threads:
> {quote}
> "kafka-scheduler-7" daemon prio=5 tid=0x00007fade918d800 nid=0xd317 waiting 
> on condition [0x000070000b371000]
>    java.lang.Thread.State: WAITING (parking)
>         at sun.misc.Unsafe.park(Native Method)
>         - parking to wait for  <0x00000007e4c6e698> (a 
> java.util.concurrent.CountDownLatch$Sync)
>         at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
>         at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834)
>         at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:994)
>         at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1303)
>         at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:236)
>         at 
> kafka.zookeeper.ZooKeeperClient$$anonfun$handleRequests$1.apply(ZooKeeperClient.scala:146)
>         at 
> kafka.zookeeper.ZooKeeperClient$$anonfun$handleRequests$1.apply(ZooKeeperClient.scala:126)
>         at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250)
>         at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:256)
>         at 
> kafka.zookeeper.ZooKeeperClient.handleRequests(ZooKeeperClient.scala:125)
>         at 
> kafka.zk.KafkaZkClient.retryRequestsUntilConnected(KafkaZkClient.scala:1432)
>         at 
> kafka.zk.KafkaZkClient.kafka$zk$KafkaZkClient$$retryRequestUntilConnected(KafkaZkClient.scala:1425)
>         at 
> kafka.zk.KafkaZkClient.conditionalUpdatePath(KafkaZkClient.scala:583)
>         at 
> kafka.utils.ReplicationUtils$.updateLeaderAndIsr(ReplicationUtils.scala:33)
>         at 
> kafka.cluster.Partition.kafka$cluster$Partition$$updateIsr(Partition.scala:665)
>         at 
> kafka.cluster.Partition$$anonfun$4.apply$mcZ$sp(Partition.scala:509)
>         at kafka.cluster.Partition$$anonfun$4.apply(Partition.scala:500)
>         at kafka.cluster.Partition$$anonfun$4.apply(Partition.scala:500)
>         at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250)
>         at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:258)
>         at kafka.cluster.Partition.maybeShrinkIsr(Partition.scala:499)
>         at 
> kafka.server.ReplicaManager$$anonfun$kafka$server$ReplicaManager$$maybeShrinkIsr$2.apply(ReplicaManager.scala:1335)
>         at 
> kafka.server.ReplicaManager$$anonfun$kafka$server$ReplicaManager$$maybeShrinkIsr$2.apply(ReplicaManager.scala:1335)
> ......
> "Test worker-EventThread" daemon prio=5 tid=0x00007fade90cf800 nid=0xef13 
> waiting on condition [0x000070000a23f000]
>    java.lang.Thread.State: WAITING (parking)
>         at sun.misc.Unsafe.park(Native Method)
>         - parking to wait for  <0x0000000781847620> (a 
> java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)
>         at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
>         at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834)
>         at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:867)
>         at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1197)
>         at 
> java.util.concurrent.locks.ReentrantReadWriteLock$WriteLock.lock(ReentrantReadWriteLock.java:945)
>         at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:248)
>         at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:258)
>         at 
> kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$.process(ZooKeeperClient.scala:355)
>         at 
> org.apache.zookeeper.ClientCnxn$EventThread.processEvent(ClientCnxn.java:531)
>         at 
> org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:506)
>  
> "ForkJoinPool-1-worker-11" daemon prio=5 tid=0x00007fade9a83000 nid=0x17907 
> waiting on condition [0x0000700011eaf000]
>    java.lang.Thread.State: WAITING (parking)
>         at sun.misc.Unsafe.park(Native Method)
>         - parking to wait for  <0x0000000781847620> (a 
> java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)
>         at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
>         at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834)
>         at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireShared(AbstractQueuedSynchronizer.java:964)
>         at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireShared(AbstractQueuedSynchronizer.java:1282)
>         at 
> java.util.concurrent.locks.ReentrantReadWriteLock$ReadLock.lock(ReentrantReadWriteLock.java:731)
>         at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:248)
>         at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:256)
>         at 
> kafka.zookeeper.ZooKeeperClient.unregisterStateChangeHandler(ZooKeeperClient.scala:295)
>         at 
> kafka.zk.KafkaZkClient.unregisterStateChangeHandler(KafkaZkClient.scala:1217)
>         at 
> kafka.common.ZkNodeChangeNotificationListener.close(ZkNodeChangeNotificationListener.scala:68)
>         at 
> kafka.server.DynamicConfigManager.shutdown(DynamicConfigManager.scala:181)
>         at 
> kafka.server.KafkaServer$$anonfun$shutdown$2.apply$mcV$sp(KafkaServer.scala:552)
>         at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:85)
>         at kafka.server.KafkaServer.shutdown(KafkaServer.scala:552)
> {quote}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to