[ https://issues.apache.org/jira/browse/KAFKA-6517?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16366212#comment-16366212 ]
ASF GitHub Bot commented on KAFKA-6517: --------------------------------------- rajinisivaram closed pull request #4551: KAFKA-6517: Avoid deadlock in ZooKeeperClient during session expiry URL: https://github.com/apache/kafka/pull/4551 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala index 9a1d16274df..3934fd0ad5d 100644 --- a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala +++ b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala @@ -24,7 +24,7 @@ import java.util.concurrent.{ArrayBlockingQueue, ConcurrentHashMap, CountDownLat import com.yammer.metrics.core.{Gauge, MetricName} import kafka.metrics.KafkaMetricsGroup import kafka.utils.CoreUtils.{inLock, inReadLock, inWriteLock} -import kafka.utils.Logging +import kafka.utils.{KafkaScheduler, Logging} import org.apache.kafka.common.utils.Time import org.apache.zookeeper.AsyncCallback.{ACLCallback, Children2Callback, DataCallback, StatCallback, StringCallback, VoidCallback} import org.apache.zookeeper.KeeperException.Code @@ -59,6 +59,7 @@ class ZooKeeperClient(connectString: String, private val zNodeChildChangeHandlers = new ConcurrentHashMap[String, ZNodeChildChangeHandler]().asScala private val inFlightRequests = new Semaphore(maxInFlightRequests) private val stateChangeHandlers = new ConcurrentHashMap[String, StateChangeHandler]().asScala + private[zookeeper] val expiryScheduler = new KafkaScheduler(0, "zk-session-expiry-handler") private val metricNames = Set[String]() @@ -90,6 +91,7 @@ class ZooKeeperClient(connectString: String, metricNames += "SessionState" + expiryScheduler.startup() waitUntilConnected(connectionTimeoutMs, TimeUnit.MILLISECONDS) override def metricName(name: String, metricTags: scala.collection.Map[String, String]): MetricName = { @@ -122,7 +124,7 @@ class ZooKeeperClient(connectString: String, * response type (e.g. Seq[CreateRequest] -> Seq[CreateResponse]). Otherwise, the most specific common supertype * will be used (e.g. Seq[AsyncRequest] -> Seq[AsyncResponse]). */ - def handleRequests[Req <: AsyncRequest](requests: Seq[Req]): Seq[Req#Response] = inReadLock(initializationLock) { + def handleRequests[Req <: AsyncRequest](requests: Seq[Req]): Seq[Req#Response] = { if (requests.isEmpty) Seq.empty else { @@ -132,10 +134,12 @@ class ZooKeeperClient(connectString: String, requests.foreach { request => inFlightRequests.acquire() try { - send(request) { response => - responseQueue.add(response) - inFlightRequests.release() - countDownLatch.countDown() + inReadLock(initializationLock) { + send(request) { response => + responseQueue.add(response) + inFlightRequests.release() + countDownLatch.countDown() + } } } catch { case e: Throwable => @@ -148,7 +152,8 @@ class ZooKeeperClient(connectString: String, } } - private def send[Req <: AsyncRequest](request: Req)(processResponse: Req#Response => Unit): Unit = { + // Visibility to override for testing + private[zookeeper] def send[Req <: AsyncRequest](request: Req)(processResponse: Req#Response => Unit): Unit = { // Safe to cast as we always create a response of the right type def callback(response: AsyncResponse): Unit = processResponse(response.asInstanceOf[Req#Response]) @@ -303,12 +308,18 @@ class ZooKeeperClient(connectString: String, stateChangeHandlers.clear() zooKeeper.close() metricNames.foreach(removeMetric(_)) + expiryScheduler.shutdown() info("Closed.") } def sessionId: Long = inReadLock(initializationLock) { zooKeeper.getSessionId } + + // Only for testing + private[zookeeper] def currentZooKeeper: ZooKeeper = inReadLock(initializationLock) { + zooKeeper + } private def initialize(): Unit = { if (!connectionState.isAlive) { @@ -352,12 +363,14 @@ class ZooKeeperClient(connectString: String, error("Auth failed.") stateChangeHandlers.values.foreach(_.onAuthFailure()) } else if (state == KeeperState.Expired) { - inWriteLock(initializationLock) { - info("Session expired.") - stateChangeHandlers.values.foreach(_.beforeInitializingSession()) - initialize() - stateChangeHandlers.values.foreach(_.afterInitializingSession()) - } + expiryScheduler.schedule("zk-session-expired", () => { + inWriteLock(initializationLock) { + info("Session expired.") + stateChangeHandlers.values.foreach(_.beforeInitializingSession()) + initialize() + stateChangeHandlers.values.foreach(_.afterInitializingSession()) + } + }, delay = 0L, period = -1L, unit = TimeUnit.MILLISECONDS) } case Some(path) => (event.getType: @unchecked) match { diff --git a/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala index c8ebaa90735..f1c09d7308d 100644 --- a/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala +++ b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala @@ -19,7 +19,7 @@ package kafka.zookeeper import java.nio.charset.StandardCharsets import java.util.UUID import java.util.concurrent.atomic.AtomicBoolean -import java.util.concurrent.{ArrayBlockingQueue, CountDownLatch, TimeUnit} +import java.util.concurrent.{ArrayBlockingQueue, ConcurrentLinkedQueue, CountDownLatch, Executors, Semaphore, TimeUnit} import com.yammer.metrics.Metrics import com.yammer.metrics.core.{Gauge, Meter, MetricName} @@ -29,8 +29,8 @@ import org.apache.kafka.common.utils.Time import org.apache.zookeeper.KeeperException.{Code, NoNodeException} import org.apache.zookeeper.Watcher.Event.{EventType, KeeperState} import org.apache.zookeeper.ZooKeeper.States -import org.apache.zookeeper.{CreateMode, WatchedEvent, ZooDefs} -import org.junit.Assert.{assertArrayEquals, assertEquals, assertTrue} +import org.apache.zookeeper.{CreateMode, WatchedEvent, Watcher, ZooDefs, ZooKeeper} +import org.junit.Assert.{assertArrayEquals, assertEquals, assertFalse, assertNull, assertTrue} import org.junit.{After, Before, Test} import scala.collection.JavaConverters._ @@ -385,6 +385,114 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness { } finally zooKeeperClient.close() } + /** + * Tests that if session expiry notification is received while a thread is processing requests, + * session expiry is handled and the request thread completes with responses to all requests, + * even though some requests may fail due to session expiry or disconnection. + * + * Sequence of events on different threads: + * Request thread: + * - Sends `maxInflightRequests` requests (these may complete before session is expired) + * Main thread: + * - Waits for at least one request to be processed (this should succeed) + * - Expires session by creating new client with same session id + * - Unblocks another `maxInflightRequests` requests before and after new client is closed (these may fail) + * ZooKeeperClient Event thread: + * - Delivers responses and session expiry (no ordering guarantee between these, both are processed asynchronously) + * Response executor thread: + * - Blocks subsequent sends by delaying response until session expiry is processed + * ZooKeeperClient Session Expiry Handler: + * - Unblocks subsequent sends + * Main thread: + * - Waits for all sends to complete. The requests sent after session expiry processing should succeed. + */ + @Test + def testSessionExpiry(): Unit = { + val maxInflightRequests = 2 + val responseExecutor = Executors.newSingleThreadExecutor + val sendSemaphore = new Semaphore(0) + val sendCompleteSemaphore = new Semaphore(0) + val sendSize = maxInflightRequests * 5 + @volatile var resultCodes: Seq[Code] = null + val stateChanges = new ConcurrentLinkedQueue[String]() + val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, maxInflightRequests, + time, "testGroupType", "testGroupName") { + override def send[Req <: AsyncRequest](request: Req)(processResponse: Req#Response => Unit): Unit = { + super.send(request)( response => { + responseExecutor.submit(new Runnable { + override def run(): Unit = { + sendCompleteSemaphore.release() + sendSemaphore.acquire() + processResponse(response) + } + }) + }) + } + } + try { + zooKeeperClient.registerStateChangeHandler(new StateChangeHandler { + override val name: String ="test-state-change-handler" + override def afterInitializingSession(): Unit = { + verifyHandlerThread() + stateChanges.add("afterInitializingSession") + } + override def beforeInitializingSession(): Unit = { + verifyHandlerThread() + stateChanges.add("beforeInitializingSession") + sendSemaphore.release(sendSize) // Resume remaining sends + } + private def verifyHandlerThread(): Unit = { + val threadName = Thread.currentThread.getName + assertTrue(s"Unexpected thread + $threadName", threadName.startsWith(zooKeeperClient.expiryScheduler.threadNamePrefix)) + } + }) + + val requestThread = new Thread { + override def run(): Unit = { + val requests = (1 to sendSize).map(i => GetDataRequest(s"/$i")) + resultCodes = zooKeeperClient.handleRequests(requests).map(_.resultCode) + } + } + requestThread.start() + sendCompleteSemaphore.acquire() // Wait for request thread to start processing requests + + // Trigger session expiry by reusing the session id in another client + val dummyWatcher = new Watcher { + override def process(event: WatchedEvent): Unit = {} + } + val anotherZkClient = new ZooKeeper(zkConnect, 1000, dummyWatcher, + zooKeeperClient.currentZooKeeper.getSessionId, + zooKeeperClient.currentZooKeeper.getSessionPasswd) + assertNull(anotherZkClient.exists("/nonexistent", false)) // Make sure new client works + sendSemaphore.release(maxInflightRequests) // Resume a few more sends which may fail + anotherZkClient.close() + sendSemaphore.release(maxInflightRequests) // Resume a few more sends which may fail + + requestThread.join(10000) + if (requestThread.isAlive) { + requestThread.interrupt() + fail("Request thread did not complete") + } + assertEquals(Seq("beforeInitializingSession", "afterInitializingSession"), stateChanges.asScala.toSeq) + + assertEquals(resultCodes.size, sendSize) + val connectionLostCount = resultCodes.count(_ == Code.CONNECTIONLOSS) + assertTrue(s"Unexpected connection lost requests $resultCodes", connectionLostCount <= maxInflightRequests) + val expiredCount = resultCodes.count(_ == Code.SESSIONEXPIRED) + assertTrue(s"Unexpected session expired requests $resultCodes", expiredCount <= maxInflightRequests) + assertTrue(s"No connection lost or expired requests $resultCodes", connectionLostCount + expiredCount > 0) + assertEquals(Code.NONODE, resultCodes.head) + assertEquals(Code.NONODE, resultCodes.last) + assertTrue(s"Unexpected result code $resultCodes", + resultCodes.filterNot(Set(Code.NONODE, Code.SESSIONEXPIRED, Code.CONNECTIONLOSS).contains).isEmpty) + + } finally { + zooKeeperClient.close() + responseExecutor.shutdownNow() + } + assertFalse("Expiry executor not shutdown", zooKeeperClient.expiryScheduler.isStarted) + } + def isExpectedMetricName(metricName: MetricName, name: String): Boolean = metricName.getName == name && metricName.getGroup == "testMetricGroup" && metricName.getType == "testMetricType" ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > ZooKeeperClient holds a lock while waiting for responses, blocking shutdown > --------------------------------------------------------------------------- > > Key: KAFKA-6517 > URL: https://issues.apache.org/jira/browse/KAFKA-6517 > Project: Kafka > Issue Type: Bug > Components: core > Affects Versions: 1.1.0 > Reporter: Rajini Sivaram > Assignee: Rajini Sivaram > Priority: Blocker > Fix For: 1.1.0 > > > Stack traces from a local test run that was deadlocked because shutdown > couldn't acquire the lock: > # kafka-scheduler-7: acquired read lock in > kafka.zookeeper.ZooKeeperClient.handleRequests > # Test worker-EventThread waiting for write lock to process SessionExpired > in kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$.process > # ForkJoinPool-1-worker-11 processing KafkaServer.shutdown is queued behind > 2) waiting to acquire read lock for > kafka.zookeeper.ZooKeeperClient.unregisterStateChangeHandler > Stack traces of the relevant threads: > {quote} > "kafka-scheduler-7" daemon prio=5 tid=0x00007fade918d800 nid=0xd317 waiting > on condition [0x000070000b371000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x00000007e4c6e698> (a > java.util.concurrent.CountDownLatch$Sync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:994) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1303) > at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:236) > at > kafka.zookeeper.ZooKeeperClient$$anonfun$handleRequests$1.apply(ZooKeeperClient.scala:146) > at > kafka.zookeeper.ZooKeeperClient$$anonfun$handleRequests$1.apply(ZooKeeperClient.scala:126) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250) > at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:256) > at > kafka.zookeeper.ZooKeeperClient.handleRequests(ZooKeeperClient.scala:125) > at > kafka.zk.KafkaZkClient.retryRequestsUntilConnected(KafkaZkClient.scala:1432) > at > kafka.zk.KafkaZkClient.kafka$zk$KafkaZkClient$$retryRequestUntilConnected(KafkaZkClient.scala:1425) > at > kafka.zk.KafkaZkClient.conditionalUpdatePath(KafkaZkClient.scala:583) > at > kafka.utils.ReplicationUtils$.updateLeaderAndIsr(ReplicationUtils.scala:33) > at > kafka.cluster.Partition.kafka$cluster$Partition$$updateIsr(Partition.scala:665) > at > kafka.cluster.Partition$$anonfun$4.apply$mcZ$sp(Partition.scala:509) > at kafka.cluster.Partition$$anonfun$4.apply(Partition.scala:500) > at kafka.cluster.Partition$$anonfun$4.apply(Partition.scala:500) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250) > at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:258) > at kafka.cluster.Partition.maybeShrinkIsr(Partition.scala:499) > at > kafka.server.ReplicaManager$$anonfun$kafka$server$ReplicaManager$$maybeShrinkIsr$2.apply(ReplicaManager.scala:1335) > at > kafka.server.ReplicaManager$$anonfun$kafka$server$ReplicaManager$$maybeShrinkIsr$2.apply(ReplicaManager.scala:1335) > ...... > "Test worker-EventThread" daemon prio=5 tid=0x00007fade90cf800 nid=0xef13 > waiting on condition [0x000070000a23f000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x0000000781847620> (a > java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:867) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1197) > at > java.util.concurrent.locks.ReentrantReadWriteLock$WriteLock.lock(ReentrantReadWriteLock.java:945) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:248) > at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:258) > at > kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$.process(ZooKeeperClient.scala:355) > at > org.apache.zookeeper.ClientCnxn$EventThread.processEvent(ClientCnxn.java:531) > at > org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:506) > > "ForkJoinPool-1-worker-11" daemon prio=5 tid=0x00007fade9a83000 nid=0x17907 > waiting on condition [0x0000700011eaf000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x0000000781847620> (a > java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireShared(AbstractQueuedSynchronizer.java:964) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireShared(AbstractQueuedSynchronizer.java:1282) > at > java.util.concurrent.locks.ReentrantReadWriteLock$ReadLock.lock(ReentrantReadWriteLock.java:731) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:248) > at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:256) > at > kafka.zookeeper.ZooKeeperClient.unregisterStateChangeHandler(ZooKeeperClient.scala:295) > at > kafka.zk.KafkaZkClient.unregisterStateChangeHandler(KafkaZkClient.scala:1217) > at > kafka.common.ZkNodeChangeNotificationListener.close(ZkNodeChangeNotificationListener.scala:68) > at > kafka.server.DynamicConfigManager.shutdown(DynamicConfigManager.scala:181) > at > kafka.server.KafkaServer$$anonfun$shutdown$2.apply$mcV$sp(KafkaServer.scala:552) > at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:85) > at kafka.server.KafkaServer.shutdown(KafkaServer.scala:552) > {quote} -- This message was sent by Atlassian JIRA (v7.6.3#76005)