This is an automated email from the ASF dual-hosted git repository. jolshan pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new d479d129e0b KAFKA-13999: Add ProducerCount metrics (KIP-847) (#13078) d479d129e0b is described below commit d479d129e0b24f2c2173f2bfd1fb261ec2be757b Author: Anastasia Vela <av...@confluent.io> AuthorDate: Wed Mar 1 14:20:15 2023 -0800 KAFKA-13999: Add ProducerCount metrics (KIP-847) (#13078) This is the PR for the implementation of KIP-847: https://cwiki.apache.org/confluence/display/KAFKA/KIP-847%3A+Add+ProducerIdCount+metrics Add ProducerIdCount metric at the broker level: kafka.server:type=ReplicaManager,name=ProducerIdCount Added unit tests below to ensure the metric reported the count correctly. --------- Co-authored-by: Artem Livshits <84364232+artemlivsh...@users.noreply.github.com> Reviewers: Ismael Juma <ism...@juma.me.uk>, Divij Vaidya <di...@amazon.com>, Christo Lolov <christo_lo...@yahoo.com>, Alexandre Dupriez <alexandre.dupr...@gmail.com>, Justine Olshan <jols...@confluent.io> --- core/src/main/scala/kafka/cluster/Partition.scala | 5 ++ core/src/main/scala/kafka/log/UnifiedLog.scala | 13 +++- .../main/scala/kafka/server/ReplicaManager.scala | 4 + .../unit/kafka/server/BrokerMetricNamesTest.scala | 1 + .../unit/kafka/server/ReplicaManagerTest.scala | 85 ++++++++++++++++++++++ docs/ops.html | 5 ++ .../internals/log/ProducerStateManager.java | 32 ++++++-- 7 files changed, 137 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index d9838ff819e..68118ca196c 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -357,6 +357,11 @@ class Partition(val topicPartition: TopicPartition, def isAddingReplica(replicaId: Int): Boolean = assignmentState.isAddingReplica(replicaId) + def producerIdCount: Int = log.map(_.producerIdCount).getOrElse(0) + + // Visible for testing + def removeExpiredProducers(currentTimeMs: Long): Unit = log.foreach(_.removeExpiredProducers(currentTimeMs)) + def inSyncReplicaIds: Set[Int] = partitionState.isr def maybeAddListener(listener: PartitionListener): Boolean = { diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala b/core/src/main/scala/kafka/log/UnifiedLog.scala index 6c220ffbc86..28cb98bad06 100644 --- a/core/src/main/scala/kafka/log/UnifiedLog.scala +++ b/core/src/main/scala/kafka/log/UnifiedLog.scala @@ -470,11 +470,15 @@ class UnifiedLog(@volatile var logStartOffset: Long, } - val producerExpireCheck = scheduler.schedule("PeriodicProducerExpirationCheck", () => { + val producerExpireCheck = scheduler.schedule("PeriodicProducerExpirationCheck", () => removeExpiredProducers(time.milliseconds), + producerIdExpirationCheckIntervalMs, producerIdExpirationCheckIntervalMs) + + // Visible for testing + def removeExpiredProducers(currentTimeMs: Long): Unit = { lock synchronized { - producerStateManager.removeExpiredProducers(time.milliseconds) + producerStateManager.removeExpiredProducers(currentTimeMs) } - }, producerIdExpirationCheckIntervalMs, producerIdExpirationCheckIntervalMs) + } // For compatibility, metrics are defined to be under `Log` class override def metricName(name: String, tags: scala.collection.Map[String, String]): MetricName = { @@ -563,6 +567,9 @@ class UnifiedLog(@volatile var logStartOffset: Long, producerStateManager.hasLateTransaction(currentTimeMs) } + @threadsafe + def producerIdCount: Int = producerStateManager.producerIdCount + def activeProducers: Seq[DescribeProducersResponseData.ProducerState] = { lock synchronized { producerStateManager.activeProducers.asScala.map { case (producerId, state) => diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 597965d2d5c..b987db340c9 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -255,6 +255,7 @@ class ReplicaManager(val config: KafkaConfig, newGauge("AtMinIsrPartitionCount", () => leaderPartitionsIterator.count(_.isAtMinIsr)) newGauge("ReassigningPartitions", () => reassigningPartitionsCount) newGauge("PartitionsWithLateTransactionsCount", () => lateTransactionsCount) + newGauge("ProducerIdCount", () => producerIdCount) def reassigningPartitionsCount: Int = leaderPartitionsIterator.count(_.isReassigning) @@ -263,6 +264,8 @@ class ReplicaManager(val config: KafkaConfig, leaderPartitionsIterator.count(_.hasLateTransaction(currentTimeMs)) } + def producerIdCount: Int = onlinePartitionsIterator.map(_.producerIdCount).sum + val isrExpandRate: Meter = newMeter("IsrExpandsPerSec", "expands", TimeUnit.SECONDS) val isrShrinkRate: Meter = newMeter("IsrShrinksPerSec", "shrinks", TimeUnit.SECONDS) val failedIsrUpdatesRate: Meter = newMeter("FailedIsrUpdatesPerSec", "failedUpdates", TimeUnit.SECONDS) @@ -1926,6 +1929,7 @@ class ReplicaManager(val config: KafkaConfig, removeMetric("AtMinIsrPartitionCount") removeMetric("ReassigningPartitions") removeMetric("PartitionsWithLateTransactionsCount") + removeMetric("ProducerIdCount") } def beginControlledShutdown(): Unit = { diff --git a/core/src/test/scala/unit/kafka/server/BrokerMetricNamesTest.scala b/core/src/test/scala/unit/kafka/server/BrokerMetricNamesTest.scala index dc69076619d..1e3adc30ee2 100644 --- a/core/src/test/scala/unit/kafka/server/BrokerMetricNamesTest.scala +++ b/core/src/test/scala/unit/kafka/server/BrokerMetricNamesTest.scala @@ -48,6 +48,7 @@ class BrokerMetricNamesTest(cluster: ClusterInstance) { "LeaderCount", "PartitionCount", "OfflineReplicaCount", "UnderReplicatedPartitions", "UnderMinIsrPartitionCount", "AtMinIsrPartitionCount", "ReassigningPartitions", "IsrExpandsPerSec", "IsrShrinksPerSec", "FailedIsrUpdatesPerSec", + "ProducerIdCount", ) expectedMetricNames.foreach { metricName => assertEquals(1, metrics.keySet.asScala.count(_.getMBeanName == s"$expectedPrefix=$metricName")) diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index f8d3ae3d490..91387ea315b 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -59,12 +59,14 @@ import org.apache.kafka.metadata.LeaderConstants.NO_LEADER import org.apache.kafka.metadata.LeaderRecoveryState import org.apache.kafka.server.common.OffsetAndEpoch import org.apache.kafka.server.common.MetadataVersion.IBP_2_6_IV0 +import org.apache.kafka.server.metrics.KafkaYammerMetrics import org.apache.kafka.server.util.MockScheduler import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchIsolation, FetchParams, FetchPartitionData, LogConfig, LogDirFailureChannel, LogOffsetMetadata, ProducerStateManager, ProducerStateManagerConfig} import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.ValueSource +import com.yammer.metrics.core.Gauge import org.mockito.invocation.InvocationOnMock import org.mockito.stubbing.Answer import org.mockito.ArgumentMatchers @@ -462,6 +464,89 @@ class ReplicaManagerTest { } } + @Test + def testProducerIdCountMetrics(): Unit = { + val timer = new MockTimer(time) + val replicaManager = setupReplicaManagerWithMockedPurgatories(timer) + + try { + val brokerList = Seq[Integer](0, 1).asJava + + // Create a couple partition for the topic. + val partition0 = replicaManager.createPartition(new TopicPartition(topic, 0)) + partition0.createLogIfNotExists(isNew = false, isFutureReplica = false, + new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints), None) + val partition1 = replicaManager.createPartition(new TopicPartition(topic, 1)) + partition1.createLogIfNotExists(isNew = false, isFutureReplica = false, + new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints), None) + + // Make this replica the leader for the partitions. + Seq(0, 1).foreach { partition => + val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, + Seq(new LeaderAndIsrPartitionState() + .setTopicName(topic) + .setPartitionIndex(partition) + .setControllerEpoch(0) + .setLeader(0) + .setLeaderEpoch(0) + .setIsr(brokerList) + .setPartitionEpoch(0) + .setReplicas(brokerList) + .setIsNew(true)).asJava, + Collections.singletonMap(topic, Uuid.randomUuid()), + Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava, + false).build() + replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => ()) + replicaManager.getPartitionOrException(new TopicPartition(topic, partition)) + .localLogOrException + } + + def appendRecord(pid: Long, sequence: Int, partition: Int): Unit = { + val epoch = 42.toShort + val records = MemoryRecords.withIdempotentRecords(CompressionType.NONE, pid, epoch, sequence, + new SimpleRecord(s"message $sequence".getBytes)) + appendRecords(replicaManager, new TopicPartition(topic, partition), records).onFire { response => + assertEquals(Errors.NONE, response.error) + } + } + + def replicaManagerMetricValue(): Int = { + KafkaYammerMetrics.defaultRegistry().allMetrics().asScala.filter { case (metricName, _) => + metricName.getName == "ProducerIdCount" && metricName.getType == replicaManager.getClass.getSimpleName + }.head._2.asInstanceOf[Gauge[Int]].value + } + + // Initially all metrics are 0. + assertEquals(0, replicaManagerMetricValue()) + + val pid1 = 123L + // Produce a record from 1st pid to 1st partition. + appendRecord(pid1, 0, 0) + assertEquals(1, replicaManagerMetricValue()) + + // Produce another record from 1st pid to 1st partition, metrics shouldn't change. + appendRecord(pid1, 1, 0) + assertEquals(1, replicaManagerMetricValue()) + + // Produce a record from 2nd pid to 1st partition + val pid2 = 456L + appendRecord(pid2, 1, 0) + assertEquals(2, replicaManagerMetricValue()) + + // Produce a record from 1st pid to 2nd partition + appendRecord(pid1, 0, 1) + assertEquals(3, replicaManagerMetricValue()) + + // Simulate producer id expiration. + // We use -1 because the timestamp in this test is set to -1, so when + // the expiration check subtracts timestamp, we get max value. + partition0.removeExpiredProducers(Long.MaxValue - 1); + assertEquals(1, replicaManagerMetricValue()) + } finally { + replicaManager.shutdown(checkpointHW = false) + } + } + @Test def testPartitionsWithLateTransactionsCount(): Unit = { val timer = new MockTimer(time) diff --git a/docs/ops.html b/docs/ops.html index 96ea6d0e71a..8adeeca75cd 100644 --- a/docs/ops.html +++ b/docs/ops.html @@ -1604,6 +1604,11 @@ $ bin/kafka-acls.sh \ <td>kafka.server:type=ReplicaManager,name=AtMinIsrPartitionCount</td> <td>0</td> </tr> + <tr> + <td>Producer Id counts</td> + <td>kafka.server:type=ReplicaManager,name=ProducerIdCount</td> + <td>Count of all producer ids created by transactional and idempotent producers in each replica on the broker</td> + </tr> <tr> <td>Partition counts</td> <td>kafka.server:type=ReplicaManager,name=PartitionCount</td> diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java index efa1a6e63dc..774e5be1ac1 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java @@ -121,6 +121,9 @@ public class ProducerStateManager { private volatile File logDir; + // The same as producers.size, but for lock-free access. + private volatile int producerIdCount = 0; + // Keep track of the last timestamp from the oldest transaction. This is used // to detect (approximately) when a transaction has been left hanging on a partition. // We make the field volatile so that it can be safely accessed without a lock. @@ -162,6 +165,25 @@ public class ProducerStateManager { snapshots = loadSnapshots(); } + public int producerIdCount() { + return producerIdCount; + } + + private void addProducerId(long producerId, ProducerStateEntry entry) { + producers.put(producerId, entry); + producerIdCount = producers.size(); + } + + private void removeProducerIds(List<Long> keys) { + producers.keySet().removeAll(keys); + producerIdCount = producers.size(); + } + + private void clearProducerIds() { + producers.clear(); + producerIdCount = 0; + } + /** * Load producer state snapshots by scanning the logDir. */ @@ -306,7 +328,7 @@ public class ProducerStateManager { // Visible for testing public void loadProducerEntry(ProducerStateEntry entry) { long producerId = entry.producerId(); - producers.put(producerId, entry); + addProducerId(producerId, entry); entry.currentTxnFirstOffset().ifPresent(offset -> ongoingTxns.put(offset, new TxnMetadata(producerId, offset))); } @@ -322,7 +344,7 @@ public class ProducerStateManager { .filter(entry -> isProducerExpired(currentTimeMs, entry.getValue())) .map(Map.Entry::getKey) .collect(Collectors.toList()); - producers.keySet().removeAll(keys); + removeProducerIds(keys); } /** @@ -342,7 +364,7 @@ public class ProducerStateManager { } if (logEndOffset != mapEndOffset()) { - producers.clear(); + clearProducerIds(); ongoingTxns.clear(); updateOldestTxnTimestamp(); @@ -374,7 +396,7 @@ public class ProducerStateManager { if (currentEntry != null) { currentEntry.update(updatedEntry); } else { - producers.put(appendInfo.producerId(), updatedEntry); + addProducerId(appendInfo.producerId(), updatedEntry); } appendInfo.startedTransactions().forEach(txn -> ongoingTxns.put(txn.firstOffset.messageOffset, txn)); @@ -479,7 +501,7 @@ public class ProducerStateManager { * Truncate the producer id mapping and remove all snapshots. This resets the state of the mapping. */ public void truncateFullyAndStartAt(long offset) throws IOException { - producers.clear(); + clearProducerIds(); ongoingTxns.clear(); unreplicatedTxns.clear(); for (SnapshotFile snapshotFile : snapshots.values()) {