[ 
https://issues.apache.org/jira/browse/KAFKA-7019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16580500#comment-16580500
 ] 

ASF GitHub Bot commented on KAFKA-7019:
---------------------------------------

lindong28 closed pull request #5221: KAFKA-7019; Make reading metadata 
lock-free by maintaining an atomically-updated read snapshot
URL: https://github.com/apache/kafka/pull/5221
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala 
b/core/src/main/scala/kafka/server/MetadataCache.scala
index f4f8c155d73..25967b30637 100755
--- a/core/src/main/scala/kafka/server/MetadataCache.scala
+++ b/core/src/main/scala/kafka/server/MetadataCache.scala
@@ -39,11 +39,13 @@ import org.apache.kafka.common.requests.{MetadataResponse, 
UpdateMetadataRequest
  */
 class MetadataCache(brokerId: Int) extends Logging {
 
-  private val cache = mutable.Map[String, mutable.Map[Int, 
UpdateMetadataRequest.PartitionState]]()
-  @volatile private var controllerId: Option[Int] = None
-  private val aliveBrokers = mutable.Map[Int, Broker]()
-  private val aliveNodes = mutable.Map[Int, collection.Map[ListenerName, 
Node]]()
   private val partitionMetadataLock = new ReentrantReadWriteLock()
+  //this is the cache state. every MetadataSnapshot instance is immutable, and 
updates (performed under a lock)
+  //replace the value with a completely new one. this means reads (which are 
not under any lock) need to grab
+  //the value of this var (into a val) ONCE and retain that read copy for the 
duration of their operation.
+  //multiple reads of this value risk getting different snapshots.
+  @volatile private var metadataSnapshot: MetadataSnapshot = 
MetadataSnapshot(partitionStates = mutable.AnyRefMap.empty,
+    controllerId = None, aliveBrokers = mutable.LongMap.empty, aliveNodes = 
mutable.LongMap.empty)
 
   this.logIdent = s"[MetadataCache brokerId=$brokerId] "
   private val stateChangeLogger = new StateChangeLogger(brokerId, 
inControllerContext = false, None)
@@ -51,10 +53,10 @@ class MetadataCache(brokerId: Int) extends Logging {
   // This method is the main hotspot when it comes to the performance of 
metadata requests,
   // we should be careful about adding additional logic here.
   // filterUnavailableEndpoints exists to support v0 MetadataResponses
-  private def getEndpoints(brokers: Iterable[Int], listenerName: ListenerName, 
filterUnavailableEndpoints: Boolean): Seq[Node] = {
-    val result = new mutable.ArrayBuffer[Node](math.min(aliveBrokers.size, 
brokers.size))
+  private def getEndpoints(snapshot: MetadataSnapshot, brokers: Iterable[Int], 
listenerName: ListenerName, filterUnavailableEndpoints: Boolean): Seq[Node] = {
+    val result = new 
mutable.ArrayBuffer[Node](math.min(snapshot.aliveBrokers.size, brokers.size))
     brokers.foreach { brokerId =>
-      val endpoint = getAliveEndpoint(brokerId, listenerName) match {
+      val endpoint = getAliveEndpoint(snapshot, brokerId, listenerName) match {
         case None => if (!filterUnavailableEndpoints) Some(new Node(brokerId, 
"", -1)) else None
         case Some(node) => Some(node)
       }
@@ -66,46 +68,46 @@ class MetadataCache(brokerId: Int) extends Logging {
   // errorUnavailableEndpoints exists to support v0 MetadataResponses
   // If errorUnavailableListeners=true, return LISTENER_NOT_FOUND if listener 
is missing on the broker.
   // Otherwise, return LEADER_NOT_AVAILABLE for broker unavailable and missing 
listener (Metadata response v5 and below).
-  private def getPartitionMetadata(topic: String, listenerName: ListenerName, 
errorUnavailableEndpoints: Boolean,
+  private def getPartitionMetadata(snapshot: MetadataSnapshot, topic: String, 
listenerName: ListenerName, errorUnavailableEndpoints: Boolean,
                                    errorUnavailableListeners: Boolean): 
Option[Iterable[MetadataResponse.PartitionMetadata]] = {
-    cache.get(topic).map { partitions =>
+    snapshot.partitionStates.get(topic).map { partitions =>
       partitions.map { case (partitionId, partitionState) =>
-        val topicPartition = new TopicPartition(topic, partitionId)
+        val topicPartition = new TopicPartition(topic, partitionId.toInt)
         val leaderBrokerId = partitionState.basePartitionState.leader
-        val maybeLeader = getAliveEndpoint(leaderBrokerId, listenerName)
+        val maybeLeader = getAliveEndpoint(snapshot, leaderBrokerId, 
listenerName)
         val replicas = 
partitionState.basePartitionState.replicas.asScala.map(_.toInt)
-        val replicaInfo = getEndpoints(replicas, listenerName, 
errorUnavailableEndpoints)
-        val offlineReplicaInfo = 
getEndpoints(partitionState.offlineReplicas.asScala.map(_.toInt), listenerName, 
errorUnavailableEndpoints)
+        val replicaInfo = getEndpoints(snapshot, replicas, listenerName, 
errorUnavailableEndpoints)
+        val offlineReplicaInfo = getEndpoints(snapshot, 
partitionState.offlineReplicas.asScala.map(_.toInt), listenerName, 
errorUnavailableEndpoints)
 
         maybeLeader match {
           case None =>
-            val error = if (!aliveBrokers.contains(brokerId)) { // we are 
already holding the read lock
+            val error = if (!snapshot.aliveBrokers.contains(brokerId)) { // we 
are already holding the read lock
               debug(s"Error while fetching metadata for $topicPartition: 
leader not available")
               Errors.LEADER_NOT_AVAILABLE
             } else {
               debug(s"Error while fetching metadata for $topicPartition: 
listener $listenerName not found on leader $leaderBrokerId")
               if (errorUnavailableListeners) Errors.LISTENER_NOT_FOUND else 
Errors.LEADER_NOT_AVAILABLE
             }
-            new MetadataResponse.PartitionMetadata(error, partitionId, 
Node.noNode(),
+            new MetadataResponse.PartitionMetadata(error, partitionId.toInt, 
Node.noNode(),
               replicaInfo.asJava, java.util.Collections.emptyList(), 
offlineReplicaInfo.asJava)
 
           case Some(leader) =>
             val isr = 
partitionState.basePartitionState.isr.asScala.map(_.toInt)
-            val isrInfo = getEndpoints(isr, listenerName, 
errorUnavailableEndpoints)
+            val isrInfo = getEndpoints(snapshot, isr, listenerName, 
errorUnavailableEndpoints)
 
             if (replicaInfo.size < replicas.size) {
               debug(s"Error while fetching metadata for $topicPartition: 
replica information not available for " +
                 s"following brokers 
${replicas.filterNot(replicaInfo.map(_.id).contains).mkString(",")}")
 
-              new 
MetadataResponse.PartitionMetadata(Errors.REPLICA_NOT_AVAILABLE, partitionId, 
leader,
+              new 
MetadataResponse.PartitionMetadata(Errors.REPLICA_NOT_AVAILABLE, 
partitionId.toInt, leader,
                 replicaInfo.asJava, isrInfo.asJava, offlineReplicaInfo.asJava)
             } else if (isrInfo.size < isr.size) {
               debug(s"Error while fetching metadata for $topicPartition: in 
sync replica information not available for " +
                 s"following brokers 
${isr.filterNot(isrInfo.map(_.id).contains).mkString(",")}")
-              new 
MetadataResponse.PartitionMetadata(Errors.REPLICA_NOT_AVAILABLE, partitionId, 
leader,
+              new 
MetadataResponse.PartitionMetadata(Errors.REPLICA_NOT_AVAILABLE, 
partitionId.toInt, leader,
                 replicaInfo.asJava, isrInfo.asJava, offlineReplicaInfo.asJava)
             } else {
-              new MetadataResponse.PartitionMetadata(Errors.NONE, partitionId, 
leader, replicaInfo.asJava,
+              new MetadataResponse.PartitionMetadata(Errors.NONE, 
partitionId.toInt, leader, replicaInfo.asJava,
                 isrInfo.asJava, offlineReplicaInfo.asJava)
             }
         }
@@ -113,122 +115,117 @@ class MetadataCache(brokerId: Int) extends Logging {
     }
   }
 
-  private def getAliveEndpoint(brokerId: Int, listenerName: ListenerName): 
Option[Node] =
-    inReadLock(partitionMetadataLock) {
-      // Returns None if broker is not alive or if the broker does not have a 
listener named `listenerName`.
-      // Since listeners can be added dynamically, a broker with a missing 
listener could be a transient error.
-      aliveNodes.get(brokerId).flatMap(_.get(listenerName))
-    }
+  private def getAliveEndpoint(snapshot: MetadataSnapshot, brokerId: Int, 
listenerName: ListenerName): Option[Node] =
+    // Returns None if broker is not alive or if the broker does not have a 
listener named `listenerName`.
+    // Since listeners can be added dynamically, a broker with a missing 
listener could be a transient error.
+    snapshot.aliveNodes.get(brokerId).flatMap(_.get(listenerName))
 
   // errorUnavailableEndpoints exists to support v0 MetadataResponses
   def getTopicMetadata(topics: Set[String], listenerName: ListenerName, 
errorUnavailableEndpoints: Boolean = false,
                        errorUnavailableListeners: Boolean = false): 
Seq[MetadataResponse.TopicMetadata] = {
-    inReadLock(partitionMetadataLock) {
-      topics.toSeq.flatMap { topic =>
-        getPartitionMetadata(topic, listenerName, errorUnavailableEndpoints, 
errorUnavailableListeners).map { partitionMetadata =>
-          new MetadataResponse.TopicMetadata(Errors.NONE, topic, 
Topic.isInternal(topic), partitionMetadata.toBuffer.asJava)
-        }
+    val snapshot = metadataSnapshot
+    topics.toSeq.flatMap { topic =>
+      getPartitionMetadata(snapshot, topic, listenerName, 
errorUnavailableEndpoints, errorUnavailableListeners).map { partitionMetadata =>
+        new MetadataResponse.TopicMetadata(Errors.NONE, topic, 
Topic.isInternal(topic), partitionMetadata.toBuffer.asJava)
       }
     }
   }
 
   def getAllTopics(): Set[String] = {
-    inReadLock(partitionMetadataLock) {
-      cache.keySet.toSet
-    }
+    getAllTopics(metadataSnapshot)
   }
 
-  def getAllPartitions(): Map[TopicPartition, 
UpdateMetadataRequest.PartitionState] = {
-    inReadLock(partitionMetadataLock) {
-      cache.flatMap { case (topic, partitionStates) =>
-        partitionStates.map { case (partition, state ) => (new 
TopicPartition(topic, partition), state) }
-      }.toMap
-    }
+  private def getAllTopics(snapshot: MetadataSnapshot): Set[String] = {
+    snapshot.partitionStates.keySet
+  }
+
+  private def getAllPartitions(snapshot: MetadataSnapshot): 
Map[TopicPartition, UpdateMetadataRequest.PartitionState] = {
+    snapshot.partitionStates.flatMap { case (topic, partitionStates) =>
+      partitionStates.map { case (partition, state ) => (new 
TopicPartition(topic, partition.toInt), state) }
+    }.toMap
   }
 
   def getNonExistingTopics(topics: Set[String]): Set[String] = {
-    inReadLock(partitionMetadataLock) {
-      topics -- cache.keySet
-    }
+    topics -- metadataSnapshot.partitionStates.keySet
   }
 
   def isBrokerAlive(brokerId: Int): Boolean = {
-    inReadLock(partitionMetadataLock) {
-      aliveBrokers.contains(brokerId)
-    }
+    metadataSnapshot.aliveBrokers.contains(brokerId)
   }
 
   def getAliveBrokers: Seq[Broker] = {
-    inReadLock(partitionMetadataLock) {
-      aliveBrokers.values.toBuffer
-    }
+    metadataSnapshot.aliveBrokers.values.toBuffer
   }
 
-  private def addOrUpdatePartitionInfo(topic: String,
+  private def addOrUpdatePartitionInfo(partitionStates: 
mutable.AnyRefMap[String, 
mutable.LongMap[UpdateMetadataRequest.PartitionState]],
+                                       topic: String,
                                        partitionId: Int,
                                        stateInfo: 
UpdateMetadataRequest.PartitionState) {
-    inWriteLock(partitionMetadataLock) {
-      val infos = cache.getOrElseUpdate(topic, mutable.Map())
-      infos(partitionId) = stateInfo
-    }
+    val infos = partitionStates.getOrElseUpdate(topic, mutable.LongMap())
+    infos(partitionId) = stateInfo
   }
 
   def getPartitionInfo(topic: String, partitionId: Int): 
Option[UpdateMetadataRequest.PartitionState] = {
-    inReadLock(partitionMetadataLock) {
-      cache.get(topic).flatMap(_.get(partitionId))
-    }
+    metadataSnapshot.partitionStates.get(topic).flatMap(_.get(partitionId))
   }
 
   // if the leader is not known, return None;
   // if the leader is known and corresponding node is available, return 
Some(node)
   // if the leader is known but corresponding node with the listener name is 
not available, return Some(NO_NODE)
   def getPartitionLeaderEndpoint(topic: String, partitionId: Int, 
listenerName: ListenerName): Option[Node] = {
-    inReadLock(partitionMetadataLock) {
-      cache.get(topic).flatMap(_.get(partitionId)) map { partitionInfo =>
-        val leaderId = partitionInfo.basePartitionState.leader
+    val snapshot = metadataSnapshot
+    snapshot.partitionStates.get(topic).flatMap(_.get(partitionId)) map { 
partitionInfo =>
+      val leaderId = partitionInfo.basePartitionState.leader
 
-        aliveNodes.get(leaderId) match {
-          case Some(nodeMap) =>
-            nodeMap.getOrElse(listenerName, Node.noNode)
-          case None =>
-            Node.noNode
-        }
+      snapshot.aliveNodes.get(leaderId) match {
+        case Some(nodeMap) =>
+          nodeMap.getOrElse(listenerName, Node.noNode)
+        case None =>
+          Node.noNode
       }
     }
   }
 
-  def getControllerId: Option[Int] = controllerId
+  def getControllerId: Option[Int] = metadataSnapshot.controllerId
 
   def getClusterMetadata(clusterId: String, listenerName: ListenerName): 
Cluster = {
-    inReadLock(partitionMetadataLock) {
-      val nodes = aliveNodes.map { case (id, nodes) => (id, 
nodes.get(listenerName).orNull) }
-      def node(id: Integer): Node = nodes.get(id).orNull
-      val partitions = getAllPartitions()
-        .filter { case (_, state) => state.basePartitionState.leader != 
LeaderAndIsr.LeaderDuringDelete }
-        .map { case (tp, state) =>
-          new PartitionInfo(tp.topic, tp.partition, 
node(state.basePartitionState.leader),
-            state.basePartitionState.replicas.asScala.map(node).toArray,
-            state.basePartitionState.isr.asScala.map(node).toArray,
-            state.offlineReplicas.asScala.map(node).toArray)
-        }
-      val unauthorizedTopics = Collections.emptySet[String]
-      val internalTopics = getAllTopics().filter(Topic.isInternal).asJava
-      new Cluster(clusterId, nodes.values.filter(_ != null).toList.asJava,
-        partitions.toList.asJava,
-        unauthorizedTopics, internalTopics,
-        getControllerId.map(id => node(id)).orNull)
-    }
+    val snapshot = metadataSnapshot
+    val nodes = snapshot.aliveNodes.map { case (id, nodes) => (id, 
nodes.get(listenerName).orNull) }
+    def node(id: Integer): Node = nodes.get(id.toLong).orNull
+    val partitions = getAllPartitions(snapshot)
+      .filter { case (_, state) => state.basePartitionState.leader != 
LeaderAndIsr.LeaderDuringDelete }
+      .map { case (tp, state) =>
+        new PartitionInfo(tp.topic, tp.partition, 
node(state.basePartitionState.leader),
+          state.basePartitionState.replicas.asScala.map(node).toArray,
+          state.basePartitionState.isr.asScala.map(node).toArray,
+          state.offlineReplicas.asScala.map(node).toArray)
+      }
+    val unauthorizedTopics = Collections.emptySet[String]
+    val internalTopics = getAllTopics(snapshot).filter(Topic.isInternal).asJava
+    new Cluster(clusterId, nodes.values.filter(_ != null).toList.asJava,
+      partitions.toList.asJava,
+      unauthorizedTopics, internalTopics,
+      snapshot.controllerId.map(id => node(id)).orNull)
   }
 
   // This method returns the deleted TopicPartitions received from 
UpdateMetadataRequest
-  def updateCache(correlationId: Int, updateMetadataRequest: 
UpdateMetadataRequest): Seq[TopicPartition] = {
+  def updateMetadata(correlationId: Int, updateMetadataRequest: 
UpdateMetadataRequest): Seq[TopicPartition] = {
     inWriteLock(partitionMetadataLock) {
-      controllerId = updateMetadataRequest.controllerId match {
+
+      //since kafka may do partial metadata updates, we start by copying the 
previous state
+      val partitionStates = new mutable.AnyRefMap[String, 
mutable.LongMap[UpdateMetadataRequest.PartitionState]](metadataSnapshot.partitionStates.size)
+      metadataSnapshot.partitionStates.foreach { case (topic, 
oldPartitionStates) =>
+        val copy = new 
mutable.LongMap[UpdateMetadataRequest.PartitionState](oldPartitionStates.size)
+        copy ++= oldPartitionStates
+        partitionStates += (topic -> copy)
+      }
+      val aliveBrokers = new 
mutable.LongMap[Broker](metadataSnapshot.aliveBrokers.size)
+      val aliveNodes = new mutable.LongMap[collection.Map[ListenerName, 
Node]](metadataSnapshot.aliveNodes.size)
+      val controllerId = updateMetadataRequest.controllerId match {
           case id if id < 0 => None
           case id => Some(id)
         }
-      aliveNodes.clear()
-      aliveBrokers.clear()
+
       updateMetadataRequest.liveBrokers.asScala.foreach { broker =>
         // `aliveNodes` is a hot path for metadata requests for large 
clusters, so we use java.util.HashMap which
         // is a bit faster than scala.collection.mutable.HashMap. When we drop 
support for Scala 2.10, we could
@@ -253,34 +250,38 @@ class MetadataCache(brokerId: Int) extends Logging {
         val controllerId = updateMetadataRequest.controllerId
         val controllerEpoch = updateMetadataRequest.controllerEpoch
         if (info.basePartitionState.leader == LeaderAndIsr.LeaderDuringDelete) 
{
-          removePartitionInfo(tp.topic, tp.partition)
+          removePartitionInfo(partitionStates, tp.topic, tp.partition)
           stateChangeLogger.trace(s"Deleted partition $tp from metadata cache 
in response to UpdateMetadata " +
             s"request sent by controller $controllerId epoch $controllerEpoch 
with correlation id $correlationId")
           deletedPartitions += tp
         } else {
-          addOrUpdatePartitionInfo(tp.topic, tp.partition, info)
+          addOrUpdatePartitionInfo(partitionStates, tp.topic, tp.partition, 
info)
           stateChangeLogger.trace(s"Cached leader info $info for partition $tp 
in response to " +
             s"UpdateMetadata request sent by controller $controllerId epoch 
$controllerEpoch with correlation id $correlationId")
         }
       }
+      metadataSnapshot = MetadataSnapshot(partitionStates, controllerId, 
aliveBrokers, aliveNodes)
       deletedPartitions
     }
   }
 
   def contains(topic: String): Boolean = {
-    inReadLock(partitionMetadataLock) {
-      cache.contains(topic)
-    }
+    metadataSnapshot.partitionStates.contains(topic)
   }
 
   def contains(tp: TopicPartition): Boolean = getPartitionInfo(tp.topic, 
tp.partition).isDefined
 
-  private def removePartitionInfo(topic: String, partitionId: Int): Boolean = {
-    cache.get(topic).exists { infos =>
+  private def removePartitionInfo(partitionStates: mutable.AnyRefMap[String, 
mutable.LongMap[UpdateMetadataRequest.PartitionState]], topic: String, 
partitionId: Int): Boolean = {
+    partitionStates.get(topic).exists { infos =>
       infos.remove(partitionId)
-      if (infos.isEmpty) cache.remove(topic)
+      if (infos.isEmpty) partitionStates.remove(topic)
       true
     }
   }
 
+  case class MetadataSnapshot(partitionStates: mutable.AnyRefMap[String, 
mutable.LongMap[UpdateMetadataRequest.PartitionState]],
+                              controllerId: Option[Int],
+                              aliveBrokers: mutable.LongMap[Broker],
+                              aliveNodes: 
mutable.LongMap[collection.Map[ListenerName, Node]])
+
 }
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 9658f1a33a5..14e537e63db 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -1006,7 +1006,7 @@ class ReplicaManager(val config: KafkaConfig,
         stateChangeLogger.warn(stateControllerEpochErrorMessage)
         throw new 
ControllerMovedException(stateChangeLogger.messageWithPrefix(stateControllerEpochErrorMessage))
       } else {
-        val deletedPartitions = metadataCache.updateCache(correlationId, 
updateMetadataRequest)
+        val deletedPartitions = metadataCache.updateMetadata(correlationId, 
updateMetadataRequest)
         controllerEpoch = updateMetadataRequest.controllerEpoch
         deletedPartitions
       }
diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala 
b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
index 4d1e4abb9b8..31283468aac 100755
--- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
@@ -130,7 +130,7 @@ class AddPartitionsTest extends BaseRequestTest {
     assertEquals(0, partitionMetadata(0).partition)
     assertEquals(1, partitionMetadata(1).partition)
     assertEquals(2, partitionMetadata(2).partition)
-    val replicas = topicMetadata.partitionMetadata.get(1).replicas
+    val replicas = partitionMetadata(1).replicas
     assertEquals(2, replicas.size)
     assertTrue(replicas.asScala.head.id == 0 || replicas.asScala.head.id == 1)
     assertTrue(replicas.asScala(1).id == 0 || replicas.asScala(1).id == 1)
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index d88001166f2..18b18fdc355 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -478,7 +478,7 @@ class KafkaApisTest {
     )
     val updateMetadataRequest = new 
UpdateMetadataRequest.Builder(ApiKeys.UPDATE_METADATA.latestVersion, 0,
       0, Map.empty[TopicPartition, 
UpdateMetadataRequest.PartitionState].asJava, brokers.asJava).build()
-    metadataCache.updateCache(correlationId = 0, updateMetadataRequest)
+    metadataCache.updateMetadata(correlationId = 0, updateMetadataRequest)
     (plaintextListener, anotherListener)
   }
 
@@ -575,6 +575,6 @@ class KafkaApisTest {
     val partitions = (0 until numPartitions).map(new TopicPartition(topic, _) 
-> partitionState).toMap
     val updateMetadataRequest = new 
UpdateMetadataRequest.Builder(ApiKeys.UPDATE_METADATA.latestVersion, 0,
       0, partitions.asJava, Set(broker).asJava).build()
-    metadataCache.updateCache(correlationId = 0, updateMetadataRequest)
+    metadataCache.updateMetadata(correlationId = 0, updateMetadataRequest)
   }
 }
diff --git a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala 
b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
index 82c14ee5827..93ac62d562d 100644
--- a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
+++ b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
@@ -72,7 +72,7 @@ class MetadataCacheTest {
     val version = ApiKeys.UPDATE_METADATA.latestVersion
     val updateMetadataRequest = new UpdateMetadataRequest.Builder(version, 
controllerId, controllerEpoch,
       partitionStates.asJava, brokers.asJava).build()
-    cache.updateCache(15, updateMetadataRequest)
+    cache.updateMetadata(15, updateMetadataRequest)
 
     for (securityProtocol <- Seq(SecurityProtocol.PLAINTEXT, 
SecurityProtocol.SSL)) {
       val listenerName = ListenerName.forSecurityProtocol(securityProtocol)
@@ -166,7 +166,7 @@ class MetadataCacheTest {
     val version = ApiKeys.UPDATE_METADATA.latestVersion
     val updateMetadataRequest = new UpdateMetadataRequest.Builder(version, 
controllerId, controllerEpoch,
       partitionStates.asJava, brokers.asJava).build()
-    cache.updateCache(15, updateMetadataRequest)
+    cache.updateMetadata(15, updateMetadataRequest)
 
     val topicMetadatas = cache.getTopicMetadata(Set(topic), listenerName, 
errorUnavailableListeners = errorUnavailableListeners)
     assertEquals(1, topicMetadatas.size)
@@ -210,7 +210,7 @@ class MetadataCacheTest {
     val version = ApiKeys.UPDATE_METADATA.latestVersion
     val updateMetadataRequest = new UpdateMetadataRequest.Builder(version, 
controllerId, controllerEpoch,
       partitionStates.asJava, brokers.asJava).build()
-    cache.updateCache(15, updateMetadataRequest)
+    cache.updateMetadata(15, updateMetadataRequest)
 
     // Validate errorUnavailableEndpoints = false
     val topicMetadatas = cache.getTopicMetadata(Set(topic), listenerName, 
errorUnavailableEndpoints = false)
@@ -270,7 +270,7 @@ class MetadataCacheTest {
     val version = ApiKeys.UPDATE_METADATA.latestVersion
     val updateMetadataRequest = new UpdateMetadataRequest.Builder(version, 
controllerId, controllerEpoch,
       partitionStates.asJava, brokers.asJava).build()
-    cache.updateCache(15, updateMetadataRequest)
+    cache.updateMetadata(15, updateMetadataRequest)
 
     // Validate errorUnavailableEndpoints = false
     val topicMetadatas = cache.getTopicMetadata(Set(topic), listenerName, 
errorUnavailableEndpoints = false)
@@ -322,7 +322,7 @@ class MetadataCacheTest {
     val version = ApiKeys.UPDATE_METADATA.latestVersion
     val updateMetadataRequest = new UpdateMetadataRequest.Builder(version, 2, 
controllerEpoch, partitionStates.asJava,
       brokers.asJava).build()
-    cache.updateCache(15, updateMetadataRequest)
+    cache.updateMetadata(15, updateMetadataRequest)
 
     val topicMetadata = cache.getTopicMetadata(Set(topic), 
ListenerName.forSecurityProtocol(SecurityProtocol.SSL))
     assertEquals(1, topicMetadata.size)
@@ -351,7 +351,7 @@ class MetadataCacheTest {
       val version = ApiKeys.UPDATE_METADATA.latestVersion
       val updateMetadataRequest = new UpdateMetadataRequest.Builder(version, 
2, controllerEpoch, partitionStates.asJava,
         brokers.asJava).build()
-      cache.updateCache(15, updateMetadataRequest)
+      cache.updateMetadata(15, updateMetadataRequest)
     }
 
     val initialBrokerIds = (0 to 2).toSet


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> Reduction the contention between metadata update and metadata read operation
> ----------------------------------------------------------------------------
>
>                 Key: KAFKA-7019
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7019
>             Project: Kafka
>          Issue Type: Improvement
>          Components: core
>            Reporter: Dong Lin
>            Assignee: Radai Rosenblatt
>            Priority: Major
>             Fix For: 2.1.0
>
>
> Currently MetadataCache.updateCache() grabs a write lock in order to process 
> the UpdateMetadataRequest from controller. And a read lock is needed in order 
> to handle the MetadataRequest from clients. Thus the handling of 
> MetadataRequest and UpdateMetadataRequest blocks each other and the broker 
> can only process such request at a time even if there are multiple request 
> handler threads. Note that broker can not process MetadataRequest in parallel 
> if there is a UpdateMetadataRequest waiting for the write lock, even if 
> MetadataRequest only requires the read lock to e processed.
> For large cluster which has tens of thousands of partitions, it can take e.g. 
> 200 ms to process UpdateMetadataRequest and MetadataRequest from large 
> clients (e.g. MM). During the period when user is rebalancinng cluster, the 
> leadership change will cause both UpdateMetadataRequest from controller and 
> also MetadataRequest from client. If a broker receives 10 MetadataRequest per 
> second and 2 UpdateMetadataRequest per second on average, since these 
> requests need to be processed one-at-a-time, it can reduce the request 
> handler thread idle ratio to 0 which makes this broker unavailable to user.
> We can address this problem by removing the read lock in MetadataCache. The 
> idea is that MetadataCache.updateCache() can instantiate a new copy of the 
> cache as method local variable when it is processing the 
> UpdateMetadataRequest and replace the class private varaible with newly 
> instantiated method local varaible at the end of MetadataCache.updateCache(). 
> The handling of MetadataRequest only requires access to the read-only 
> class-private variable. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to