[
https://issues.apache.org/jira/browse/KAFKA-7019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16580500#comment-16580500
]
ASF GitHub Bot commented on KAFKA-7019:
---------------------------------------
lindong28 closed pull request #5221: KAFKA-7019; Make reading metadata
lock-free by maintaining an atomically-updated read snapshot
URL: https://github.com/apache/kafka/pull/5221
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala
b/core/src/main/scala/kafka/server/MetadataCache.scala
index f4f8c155d73..25967b30637 100755
--- a/core/src/main/scala/kafka/server/MetadataCache.scala
+++ b/core/src/main/scala/kafka/server/MetadataCache.scala
@@ -39,11 +39,13 @@ import org.apache.kafka.common.requests.{MetadataResponse,
UpdateMetadataRequest
*/
class MetadataCache(brokerId: Int) extends Logging {
- private val cache = mutable.Map[String, mutable.Map[Int,
UpdateMetadataRequest.PartitionState]]()
- @volatile private var controllerId: Option[Int] = None
- private val aliveBrokers = mutable.Map[Int, Broker]()
- private val aliveNodes = mutable.Map[Int, collection.Map[ListenerName,
Node]]()
private val partitionMetadataLock = new ReentrantReadWriteLock()
+ //this is the cache state. every MetadataSnapshot instance is immutable, and
updates (performed under a lock)
+ //replace the value with a completely new one. this means reads (which are
not under any lock) need to grab
+ //the value of this var (into a val) ONCE and retain that read copy for the
duration of their operation.
+ //multiple reads of this value risk getting different snapshots.
+ @volatile private var metadataSnapshot: MetadataSnapshot =
MetadataSnapshot(partitionStates = mutable.AnyRefMap.empty,
+ controllerId = None, aliveBrokers = mutable.LongMap.empty, aliveNodes =
mutable.LongMap.empty)
this.logIdent = s"[MetadataCache brokerId=$brokerId] "
private val stateChangeLogger = new StateChangeLogger(brokerId,
inControllerContext = false, None)
@@ -51,10 +53,10 @@ class MetadataCache(brokerId: Int) extends Logging {
// This method is the main hotspot when it comes to the performance of
metadata requests,
// we should be careful about adding additional logic here.
// filterUnavailableEndpoints exists to support v0 MetadataResponses
- private def getEndpoints(brokers: Iterable[Int], listenerName: ListenerName,
filterUnavailableEndpoints: Boolean): Seq[Node] = {
- val result = new mutable.ArrayBuffer[Node](math.min(aliveBrokers.size,
brokers.size))
+ private def getEndpoints(snapshot: MetadataSnapshot, brokers: Iterable[Int],
listenerName: ListenerName, filterUnavailableEndpoints: Boolean): Seq[Node] = {
+ val result = new
mutable.ArrayBuffer[Node](math.min(snapshot.aliveBrokers.size, brokers.size))
brokers.foreach { brokerId =>
- val endpoint = getAliveEndpoint(brokerId, listenerName) match {
+ val endpoint = getAliveEndpoint(snapshot, brokerId, listenerName) match {
case None => if (!filterUnavailableEndpoints) Some(new Node(brokerId,
"", -1)) else None
case Some(node) => Some(node)
}
@@ -66,46 +68,46 @@ class MetadataCache(brokerId: Int) extends Logging {
// errorUnavailableEndpoints exists to support v0 MetadataResponses
// If errorUnavailableListeners=true, return LISTENER_NOT_FOUND if listener
is missing on the broker.
// Otherwise, return LEADER_NOT_AVAILABLE for broker unavailable and missing
listener (Metadata response v5 and below).
- private def getPartitionMetadata(topic: String, listenerName: ListenerName,
errorUnavailableEndpoints: Boolean,
+ private def getPartitionMetadata(snapshot: MetadataSnapshot, topic: String,
listenerName: ListenerName, errorUnavailableEndpoints: Boolean,
errorUnavailableListeners: Boolean):
Option[Iterable[MetadataResponse.PartitionMetadata]] = {
- cache.get(topic).map { partitions =>
+ snapshot.partitionStates.get(topic).map { partitions =>
partitions.map { case (partitionId, partitionState) =>
- val topicPartition = new TopicPartition(topic, partitionId)
+ val topicPartition = new TopicPartition(topic, partitionId.toInt)
val leaderBrokerId = partitionState.basePartitionState.leader
- val maybeLeader = getAliveEndpoint(leaderBrokerId, listenerName)
+ val maybeLeader = getAliveEndpoint(snapshot, leaderBrokerId,
listenerName)
val replicas =
partitionState.basePartitionState.replicas.asScala.map(_.toInt)
- val replicaInfo = getEndpoints(replicas, listenerName,
errorUnavailableEndpoints)
- val offlineReplicaInfo =
getEndpoints(partitionState.offlineReplicas.asScala.map(_.toInt), listenerName,
errorUnavailableEndpoints)
+ val replicaInfo = getEndpoints(snapshot, replicas, listenerName,
errorUnavailableEndpoints)
+ val offlineReplicaInfo = getEndpoints(snapshot,
partitionState.offlineReplicas.asScala.map(_.toInt), listenerName,
errorUnavailableEndpoints)
maybeLeader match {
case None =>
- val error = if (!aliveBrokers.contains(brokerId)) { // we are
already holding the read lock
+ val error = if (!snapshot.aliveBrokers.contains(brokerId)) { // we
are already holding the read lock
debug(s"Error while fetching metadata for $topicPartition:
leader not available")
Errors.LEADER_NOT_AVAILABLE
} else {
debug(s"Error while fetching metadata for $topicPartition:
listener $listenerName not found on leader $leaderBrokerId")
if (errorUnavailableListeners) Errors.LISTENER_NOT_FOUND else
Errors.LEADER_NOT_AVAILABLE
}
- new MetadataResponse.PartitionMetadata(error, partitionId,
Node.noNode(),
+ new MetadataResponse.PartitionMetadata(error, partitionId.toInt,
Node.noNode(),
replicaInfo.asJava, java.util.Collections.emptyList(),
offlineReplicaInfo.asJava)
case Some(leader) =>
val isr =
partitionState.basePartitionState.isr.asScala.map(_.toInt)
- val isrInfo = getEndpoints(isr, listenerName,
errorUnavailableEndpoints)
+ val isrInfo = getEndpoints(snapshot, isr, listenerName,
errorUnavailableEndpoints)
if (replicaInfo.size < replicas.size) {
debug(s"Error while fetching metadata for $topicPartition:
replica information not available for " +
s"following brokers
${replicas.filterNot(replicaInfo.map(_.id).contains).mkString(",")}")
- new
MetadataResponse.PartitionMetadata(Errors.REPLICA_NOT_AVAILABLE, partitionId,
leader,
+ new
MetadataResponse.PartitionMetadata(Errors.REPLICA_NOT_AVAILABLE,
partitionId.toInt, leader,
replicaInfo.asJava, isrInfo.asJava, offlineReplicaInfo.asJava)
} else if (isrInfo.size < isr.size) {
debug(s"Error while fetching metadata for $topicPartition: in
sync replica information not available for " +
s"following brokers
${isr.filterNot(isrInfo.map(_.id).contains).mkString(",")}")
- new
MetadataResponse.PartitionMetadata(Errors.REPLICA_NOT_AVAILABLE, partitionId,
leader,
+ new
MetadataResponse.PartitionMetadata(Errors.REPLICA_NOT_AVAILABLE,
partitionId.toInt, leader,
replicaInfo.asJava, isrInfo.asJava, offlineReplicaInfo.asJava)
} else {
- new MetadataResponse.PartitionMetadata(Errors.NONE, partitionId,
leader, replicaInfo.asJava,
+ new MetadataResponse.PartitionMetadata(Errors.NONE,
partitionId.toInt, leader, replicaInfo.asJava,
isrInfo.asJava, offlineReplicaInfo.asJava)
}
}
@@ -113,122 +115,117 @@ class MetadataCache(brokerId: Int) extends Logging {
}
}
- private def getAliveEndpoint(brokerId: Int, listenerName: ListenerName):
Option[Node] =
- inReadLock(partitionMetadataLock) {
- // Returns None if broker is not alive or if the broker does not have a
listener named `listenerName`.
- // Since listeners can be added dynamically, a broker with a missing
listener could be a transient error.
- aliveNodes.get(brokerId).flatMap(_.get(listenerName))
- }
+ private def getAliveEndpoint(snapshot: MetadataSnapshot, brokerId: Int,
listenerName: ListenerName): Option[Node] =
+ // Returns None if broker is not alive or if the broker does not have a
listener named `listenerName`.
+ // Since listeners can be added dynamically, a broker with a missing
listener could be a transient error.
+ snapshot.aliveNodes.get(brokerId).flatMap(_.get(listenerName))
// errorUnavailableEndpoints exists to support v0 MetadataResponses
def getTopicMetadata(topics: Set[String], listenerName: ListenerName,
errorUnavailableEndpoints: Boolean = false,
errorUnavailableListeners: Boolean = false):
Seq[MetadataResponse.TopicMetadata] = {
- inReadLock(partitionMetadataLock) {
- topics.toSeq.flatMap { topic =>
- getPartitionMetadata(topic, listenerName, errorUnavailableEndpoints,
errorUnavailableListeners).map { partitionMetadata =>
- new MetadataResponse.TopicMetadata(Errors.NONE, topic,
Topic.isInternal(topic), partitionMetadata.toBuffer.asJava)
- }
+ val snapshot = metadataSnapshot
+ topics.toSeq.flatMap { topic =>
+ getPartitionMetadata(snapshot, topic, listenerName,
errorUnavailableEndpoints, errorUnavailableListeners).map { partitionMetadata =>
+ new MetadataResponse.TopicMetadata(Errors.NONE, topic,
Topic.isInternal(topic), partitionMetadata.toBuffer.asJava)
}
}
}
def getAllTopics(): Set[String] = {
- inReadLock(partitionMetadataLock) {
- cache.keySet.toSet
- }
+ getAllTopics(metadataSnapshot)
}
- def getAllPartitions(): Map[TopicPartition,
UpdateMetadataRequest.PartitionState] = {
- inReadLock(partitionMetadataLock) {
- cache.flatMap { case (topic, partitionStates) =>
- partitionStates.map { case (partition, state ) => (new
TopicPartition(topic, partition), state) }
- }.toMap
- }
+ private def getAllTopics(snapshot: MetadataSnapshot): Set[String] = {
+ snapshot.partitionStates.keySet
+ }
+
+ private def getAllPartitions(snapshot: MetadataSnapshot):
Map[TopicPartition, UpdateMetadataRequest.PartitionState] = {
+ snapshot.partitionStates.flatMap { case (topic, partitionStates) =>
+ partitionStates.map { case (partition, state ) => (new
TopicPartition(topic, partition.toInt), state) }
+ }.toMap
}
def getNonExistingTopics(topics: Set[String]): Set[String] = {
- inReadLock(partitionMetadataLock) {
- topics -- cache.keySet
- }
+ topics -- metadataSnapshot.partitionStates.keySet
}
def isBrokerAlive(brokerId: Int): Boolean = {
- inReadLock(partitionMetadataLock) {
- aliveBrokers.contains(brokerId)
- }
+ metadataSnapshot.aliveBrokers.contains(brokerId)
}
def getAliveBrokers: Seq[Broker] = {
- inReadLock(partitionMetadataLock) {
- aliveBrokers.values.toBuffer
- }
+ metadataSnapshot.aliveBrokers.values.toBuffer
}
- private def addOrUpdatePartitionInfo(topic: String,
+ private def addOrUpdatePartitionInfo(partitionStates:
mutable.AnyRefMap[String,
mutable.LongMap[UpdateMetadataRequest.PartitionState]],
+ topic: String,
partitionId: Int,
stateInfo:
UpdateMetadataRequest.PartitionState) {
- inWriteLock(partitionMetadataLock) {
- val infos = cache.getOrElseUpdate(topic, mutable.Map())
- infos(partitionId) = stateInfo
- }
+ val infos = partitionStates.getOrElseUpdate(topic, mutable.LongMap())
+ infos(partitionId) = stateInfo
}
def getPartitionInfo(topic: String, partitionId: Int):
Option[UpdateMetadataRequest.PartitionState] = {
- inReadLock(partitionMetadataLock) {
- cache.get(topic).flatMap(_.get(partitionId))
- }
+ metadataSnapshot.partitionStates.get(topic).flatMap(_.get(partitionId))
}
// if the leader is not known, return None;
// if the leader is known and corresponding node is available, return
Some(node)
// if the leader is known but corresponding node with the listener name is
not available, return Some(NO_NODE)
def getPartitionLeaderEndpoint(topic: String, partitionId: Int,
listenerName: ListenerName): Option[Node] = {
- inReadLock(partitionMetadataLock) {
- cache.get(topic).flatMap(_.get(partitionId)) map { partitionInfo =>
- val leaderId = partitionInfo.basePartitionState.leader
+ val snapshot = metadataSnapshot
+ snapshot.partitionStates.get(topic).flatMap(_.get(partitionId)) map {
partitionInfo =>
+ val leaderId = partitionInfo.basePartitionState.leader
- aliveNodes.get(leaderId) match {
- case Some(nodeMap) =>
- nodeMap.getOrElse(listenerName, Node.noNode)
- case None =>
- Node.noNode
- }
+ snapshot.aliveNodes.get(leaderId) match {
+ case Some(nodeMap) =>
+ nodeMap.getOrElse(listenerName, Node.noNode)
+ case None =>
+ Node.noNode
}
}
}
- def getControllerId: Option[Int] = controllerId
+ def getControllerId: Option[Int] = metadataSnapshot.controllerId
def getClusterMetadata(clusterId: String, listenerName: ListenerName):
Cluster = {
- inReadLock(partitionMetadataLock) {
- val nodes = aliveNodes.map { case (id, nodes) => (id,
nodes.get(listenerName).orNull) }
- def node(id: Integer): Node = nodes.get(id).orNull
- val partitions = getAllPartitions()
- .filter { case (_, state) => state.basePartitionState.leader !=
LeaderAndIsr.LeaderDuringDelete }
- .map { case (tp, state) =>
- new PartitionInfo(tp.topic, tp.partition,
node(state.basePartitionState.leader),
- state.basePartitionState.replicas.asScala.map(node).toArray,
- state.basePartitionState.isr.asScala.map(node).toArray,
- state.offlineReplicas.asScala.map(node).toArray)
- }
- val unauthorizedTopics = Collections.emptySet[String]
- val internalTopics = getAllTopics().filter(Topic.isInternal).asJava
- new Cluster(clusterId, nodes.values.filter(_ != null).toList.asJava,
- partitions.toList.asJava,
- unauthorizedTopics, internalTopics,
- getControllerId.map(id => node(id)).orNull)
- }
+ val snapshot = metadataSnapshot
+ val nodes = snapshot.aliveNodes.map { case (id, nodes) => (id,
nodes.get(listenerName).orNull) }
+ def node(id: Integer): Node = nodes.get(id.toLong).orNull
+ val partitions = getAllPartitions(snapshot)
+ .filter { case (_, state) => state.basePartitionState.leader !=
LeaderAndIsr.LeaderDuringDelete }
+ .map { case (tp, state) =>
+ new PartitionInfo(tp.topic, tp.partition,
node(state.basePartitionState.leader),
+ state.basePartitionState.replicas.asScala.map(node).toArray,
+ state.basePartitionState.isr.asScala.map(node).toArray,
+ state.offlineReplicas.asScala.map(node).toArray)
+ }
+ val unauthorizedTopics = Collections.emptySet[String]
+ val internalTopics = getAllTopics(snapshot).filter(Topic.isInternal).asJava
+ new Cluster(clusterId, nodes.values.filter(_ != null).toList.asJava,
+ partitions.toList.asJava,
+ unauthorizedTopics, internalTopics,
+ snapshot.controllerId.map(id => node(id)).orNull)
}
// This method returns the deleted TopicPartitions received from
UpdateMetadataRequest
- def updateCache(correlationId: Int, updateMetadataRequest:
UpdateMetadataRequest): Seq[TopicPartition] = {
+ def updateMetadata(correlationId: Int, updateMetadataRequest:
UpdateMetadataRequest): Seq[TopicPartition] = {
inWriteLock(partitionMetadataLock) {
- controllerId = updateMetadataRequest.controllerId match {
+
+ //since kafka may do partial metadata updates, we start by copying the
previous state
+ val partitionStates = new mutable.AnyRefMap[String,
mutable.LongMap[UpdateMetadataRequest.PartitionState]](metadataSnapshot.partitionStates.size)
+ metadataSnapshot.partitionStates.foreach { case (topic,
oldPartitionStates) =>
+ val copy = new
mutable.LongMap[UpdateMetadataRequest.PartitionState](oldPartitionStates.size)
+ copy ++= oldPartitionStates
+ partitionStates += (topic -> copy)
+ }
+ val aliveBrokers = new
mutable.LongMap[Broker](metadataSnapshot.aliveBrokers.size)
+ val aliveNodes = new mutable.LongMap[collection.Map[ListenerName,
Node]](metadataSnapshot.aliveNodes.size)
+ val controllerId = updateMetadataRequest.controllerId match {
case id if id < 0 => None
case id => Some(id)
}
- aliveNodes.clear()
- aliveBrokers.clear()
+
updateMetadataRequest.liveBrokers.asScala.foreach { broker =>
// `aliveNodes` is a hot path for metadata requests for large
clusters, so we use java.util.HashMap which
// is a bit faster than scala.collection.mutable.HashMap. When we drop
support for Scala 2.10, we could
@@ -253,34 +250,38 @@ class MetadataCache(brokerId: Int) extends Logging {
val controllerId = updateMetadataRequest.controllerId
val controllerEpoch = updateMetadataRequest.controllerEpoch
if (info.basePartitionState.leader == LeaderAndIsr.LeaderDuringDelete)
{
- removePartitionInfo(tp.topic, tp.partition)
+ removePartitionInfo(partitionStates, tp.topic, tp.partition)
stateChangeLogger.trace(s"Deleted partition $tp from metadata cache
in response to UpdateMetadata " +
s"request sent by controller $controllerId epoch $controllerEpoch
with correlation id $correlationId")
deletedPartitions += tp
} else {
- addOrUpdatePartitionInfo(tp.topic, tp.partition, info)
+ addOrUpdatePartitionInfo(partitionStates, tp.topic, tp.partition,
info)
stateChangeLogger.trace(s"Cached leader info $info for partition $tp
in response to " +
s"UpdateMetadata request sent by controller $controllerId epoch
$controllerEpoch with correlation id $correlationId")
}
}
+ metadataSnapshot = MetadataSnapshot(partitionStates, controllerId,
aliveBrokers, aliveNodes)
deletedPartitions
}
}
def contains(topic: String): Boolean = {
- inReadLock(partitionMetadataLock) {
- cache.contains(topic)
- }
+ metadataSnapshot.partitionStates.contains(topic)
}
def contains(tp: TopicPartition): Boolean = getPartitionInfo(tp.topic,
tp.partition).isDefined
- private def removePartitionInfo(topic: String, partitionId: Int): Boolean = {
- cache.get(topic).exists { infos =>
+ private def removePartitionInfo(partitionStates: mutable.AnyRefMap[String,
mutable.LongMap[UpdateMetadataRequest.PartitionState]], topic: String,
partitionId: Int): Boolean = {
+ partitionStates.get(topic).exists { infos =>
infos.remove(partitionId)
- if (infos.isEmpty) cache.remove(topic)
+ if (infos.isEmpty) partitionStates.remove(topic)
true
}
}
+ case class MetadataSnapshot(partitionStates: mutable.AnyRefMap[String,
mutable.LongMap[UpdateMetadataRequest.PartitionState]],
+ controllerId: Option[Int],
+ aliveBrokers: mutable.LongMap[Broker],
+ aliveNodes:
mutable.LongMap[collection.Map[ListenerName, Node]])
+
}
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 9658f1a33a5..14e537e63db 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -1006,7 +1006,7 @@ class ReplicaManager(val config: KafkaConfig,
stateChangeLogger.warn(stateControllerEpochErrorMessage)
throw new
ControllerMovedException(stateChangeLogger.messageWithPrefix(stateControllerEpochErrorMessage))
} else {
- val deletedPartitions = metadataCache.updateCache(correlationId,
updateMetadataRequest)
+ val deletedPartitions = metadataCache.updateMetadata(correlationId,
updateMetadataRequest)
controllerEpoch = updateMetadataRequest.controllerEpoch
deletedPartitions
}
diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
index 4d1e4abb9b8..31283468aac 100755
--- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
@@ -130,7 +130,7 @@ class AddPartitionsTest extends BaseRequestTest {
assertEquals(0, partitionMetadata(0).partition)
assertEquals(1, partitionMetadata(1).partition)
assertEquals(2, partitionMetadata(2).partition)
- val replicas = topicMetadata.partitionMetadata.get(1).replicas
+ val replicas = partitionMetadata(1).replicas
assertEquals(2, replicas.size)
assertTrue(replicas.asScala.head.id == 0 || replicas.asScala.head.id == 1)
assertTrue(replicas.asScala(1).id == 0 || replicas.asScala(1).id == 1)
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index d88001166f2..18b18fdc355 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -478,7 +478,7 @@ class KafkaApisTest {
)
val updateMetadataRequest = new
UpdateMetadataRequest.Builder(ApiKeys.UPDATE_METADATA.latestVersion, 0,
0, Map.empty[TopicPartition,
UpdateMetadataRequest.PartitionState].asJava, brokers.asJava).build()
- metadataCache.updateCache(correlationId = 0, updateMetadataRequest)
+ metadataCache.updateMetadata(correlationId = 0, updateMetadataRequest)
(plaintextListener, anotherListener)
}
@@ -575,6 +575,6 @@ class KafkaApisTest {
val partitions = (0 until numPartitions).map(new TopicPartition(topic, _)
-> partitionState).toMap
val updateMetadataRequest = new
UpdateMetadataRequest.Builder(ApiKeys.UPDATE_METADATA.latestVersion, 0,
0, partitions.asJava, Set(broker).asJava).build()
- metadataCache.updateCache(correlationId = 0, updateMetadataRequest)
+ metadataCache.updateMetadata(correlationId = 0, updateMetadataRequest)
}
}
diff --git a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
index 82c14ee5827..93ac62d562d 100644
--- a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
+++ b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
@@ -72,7 +72,7 @@ class MetadataCacheTest {
val version = ApiKeys.UPDATE_METADATA.latestVersion
val updateMetadataRequest = new UpdateMetadataRequest.Builder(version,
controllerId, controllerEpoch,
partitionStates.asJava, brokers.asJava).build()
- cache.updateCache(15, updateMetadataRequest)
+ cache.updateMetadata(15, updateMetadataRequest)
for (securityProtocol <- Seq(SecurityProtocol.PLAINTEXT,
SecurityProtocol.SSL)) {
val listenerName = ListenerName.forSecurityProtocol(securityProtocol)
@@ -166,7 +166,7 @@ class MetadataCacheTest {
val version = ApiKeys.UPDATE_METADATA.latestVersion
val updateMetadataRequest = new UpdateMetadataRequest.Builder(version,
controllerId, controllerEpoch,
partitionStates.asJava, brokers.asJava).build()
- cache.updateCache(15, updateMetadataRequest)
+ cache.updateMetadata(15, updateMetadataRequest)
val topicMetadatas = cache.getTopicMetadata(Set(topic), listenerName,
errorUnavailableListeners = errorUnavailableListeners)
assertEquals(1, topicMetadatas.size)
@@ -210,7 +210,7 @@ class MetadataCacheTest {
val version = ApiKeys.UPDATE_METADATA.latestVersion
val updateMetadataRequest = new UpdateMetadataRequest.Builder(version,
controllerId, controllerEpoch,
partitionStates.asJava, brokers.asJava).build()
- cache.updateCache(15, updateMetadataRequest)
+ cache.updateMetadata(15, updateMetadataRequest)
// Validate errorUnavailableEndpoints = false
val topicMetadatas = cache.getTopicMetadata(Set(topic), listenerName,
errorUnavailableEndpoints = false)
@@ -270,7 +270,7 @@ class MetadataCacheTest {
val version = ApiKeys.UPDATE_METADATA.latestVersion
val updateMetadataRequest = new UpdateMetadataRequest.Builder(version,
controllerId, controllerEpoch,
partitionStates.asJava, brokers.asJava).build()
- cache.updateCache(15, updateMetadataRequest)
+ cache.updateMetadata(15, updateMetadataRequest)
// Validate errorUnavailableEndpoints = false
val topicMetadatas = cache.getTopicMetadata(Set(topic), listenerName,
errorUnavailableEndpoints = false)
@@ -322,7 +322,7 @@ class MetadataCacheTest {
val version = ApiKeys.UPDATE_METADATA.latestVersion
val updateMetadataRequest = new UpdateMetadataRequest.Builder(version, 2,
controllerEpoch, partitionStates.asJava,
brokers.asJava).build()
- cache.updateCache(15, updateMetadataRequest)
+ cache.updateMetadata(15, updateMetadataRequest)
val topicMetadata = cache.getTopicMetadata(Set(topic),
ListenerName.forSecurityProtocol(SecurityProtocol.SSL))
assertEquals(1, topicMetadata.size)
@@ -351,7 +351,7 @@ class MetadataCacheTest {
val version = ApiKeys.UPDATE_METADATA.latestVersion
val updateMetadataRequest = new UpdateMetadataRequest.Builder(version,
2, controllerEpoch, partitionStates.asJava,
brokers.asJava).build()
- cache.updateCache(15, updateMetadataRequest)
+ cache.updateMetadata(15, updateMetadataRequest)
}
val initialBrokerIds = (0 to 2).toSet
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Reduction the contention between metadata update and metadata read operation
> ----------------------------------------------------------------------------
>
> Key: KAFKA-7019
> URL: https://issues.apache.org/jira/browse/KAFKA-7019
> Project: Kafka
> Issue Type: Improvement
> Components: core
> Reporter: Dong Lin
> Assignee: Radai Rosenblatt
> Priority: Major
> Fix For: 2.1.0
>
>
> Currently MetadataCache.updateCache() grabs a write lock in order to process
> the UpdateMetadataRequest from controller. And a read lock is needed in order
> to handle the MetadataRequest from clients. Thus the handling of
> MetadataRequest and UpdateMetadataRequest blocks each other and the broker
> can only process such request at a time even if there are multiple request
> handler threads. Note that broker can not process MetadataRequest in parallel
> if there is a UpdateMetadataRequest waiting for the write lock, even if
> MetadataRequest only requires the read lock to e processed.
> For large cluster which has tens of thousands of partitions, it can take e.g.
> 200 ms to process UpdateMetadataRequest and MetadataRequest from large
> clients (e.g. MM). During the period when user is rebalancinng cluster, the
> leadership change will cause both UpdateMetadataRequest from controller and
> also MetadataRequest from client. If a broker receives 10 MetadataRequest per
> second and 2 UpdateMetadataRequest per second on average, since these
> requests need to be processed one-at-a-time, it can reduce the request
> handler thread idle ratio to 0 which makes this broker unavailable to user.
> We can address this problem by removing the read lock in MetadataCache. The
> idea is that MetadataCache.updateCache() can instantiate a new copy of the
> cache as method local variable when it is processing the
> UpdateMetadataRequest and replace the class private varaible with newly
> instantiated method local varaible at the end of MetadataCache.updateCache().
> The handling of MetadataRequest only requires access to the read-only
> class-private variable.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)