This is an automated email from the ASF dual-hosted git repository.

jolshan pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d479d129e0b KAFKA-13999: Add ProducerCount metrics (KIP-847) (#13078)
d479d129e0b is described below

commit d479d129e0b24f2c2173f2bfd1fb261ec2be757b
Author: Anastasia Vela <av...@confluent.io>
AuthorDate: Wed Mar 1 14:20:15 2023 -0800

    KAFKA-13999: Add ProducerCount metrics (KIP-847) (#13078)
    
    This is the PR for the implementation of KIP-847: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-847%3A+Add+ProducerIdCount+metrics
    Add ProducerIdCount metric at the broker level:
    
    kafka.server:type=ReplicaManager,name=ProducerIdCount
    Added unit tests below to ensure the metric reported the count correctly.
    
    ---------
    
    Co-authored-by: Artem Livshits 
<84364232+artemlivsh...@users.noreply.github.com>
    
    Reviewers: Ismael Juma <ism...@juma.me.uk>, Divij Vaidya 
<di...@amazon.com>, Christo Lolov <christo_lo...@yahoo.com>, Alexandre Dupriez 
<alexandre.dupr...@gmail.com>, Justine Olshan <jols...@confluent.io>
---
 core/src/main/scala/kafka/cluster/Partition.scala  |  5 ++
 core/src/main/scala/kafka/log/UnifiedLog.scala     | 13 +++-
 .../main/scala/kafka/server/ReplicaManager.scala   |  4 +
 .../unit/kafka/server/BrokerMetricNamesTest.scala  |  1 +
 .../unit/kafka/server/ReplicaManagerTest.scala     | 85 ++++++++++++++++++++++
 docs/ops.html                                      |  5 ++
 .../internals/log/ProducerStateManager.java        | 32 ++++++--
 7 files changed, 137 insertions(+), 8 deletions(-)

diff --git a/core/src/main/scala/kafka/cluster/Partition.scala 
b/core/src/main/scala/kafka/cluster/Partition.scala
index d9838ff819e..68118ca196c 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -357,6 +357,11 @@ class Partition(val topicPartition: TopicPartition,
 
   def isAddingReplica(replicaId: Int): Boolean = 
assignmentState.isAddingReplica(replicaId)
 
+  def producerIdCount: Int = log.map(_.producerIdCount).getOrElse(0)
+
+  // Visible for testing
+  def removeExpiredProducers(currentTimeMs: Long): Unit = 
log.foreach(_.removeExpiredProducers(currentTimeMs))
+
   def inSyncReplicaIds: Set[Int] = partitionState.isr
 
   def maybeAddListener(listener: PartitionListener): Boolean = {
diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala 
b/core/src/main/scala/kafka/log/UnifiedLog.scala
index 6c220ffbc86..28cb98bad06 100644
--- a/core/src/main/scala/kafka/log/UnifiedLog.scala
+++ b/core/src/main/scala/kafka/log/UnifiedLog.scala
@@ -470,11 +470,15 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
   }
 
-  val producerExpireCheck = 
scheduler.schedule("PeriodicProducerExpirationCheck", () => {
+  val producerExpireCheck = 
scheduler.schedule("PeriodicProducerExpirationCheck", () => 
removeExpiredProducers(time.milliseconds),
+    producerIdExpirationCheckIntervalMs, producerIdExpirationCheckIntervalMs)
+
+  // Visible for testing
+  def removeExpiredProducers(currentTimeMs: Long): Unit = {
     lock synchronized {
-      producerStateManager.removeExpiredProducers(time.milliseconds)
+      producerStateManager.removeExpiredProducers(currentTimeMs)
     }
-  }, producerIdExpirationCheckIntervalMs, producerIdExpirationCheckIntervalMs)
+  }
 
   // For compatibility, metrics are defined to be under `Log` class
   override def metricName(name: String, tags: scala.collection.Map[String, 
String]): MetricName = {
@@ -563,6 +567,9 @@ class UnifiedLog(@volatile var logStartOffset: Long,
     producerStateManager.hasLateTransaction(currentTimeMs)
   }
 
+  @threadsafe
+  def producerIdCount: Int = producerStateManager.producerIdCount
+
   def activeProducers: Seq[DescribeProducersResponseData.ProducerState] = {
     lock synchronized {
       producerStateManager.activeProducers.asScala.map { case (producerId, 
state) =>
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 597965d2d5c..b987db340c9 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -255,6 +255,7 @@ class ReplicaManager(val config: KafkaConfig,
   newGauge("AtMinIsrPartitionCount", () => 
leaderPartitionsIterator.count(_.isAtMinIsr))
   newGauge("ReassigningPartitions", () => reassigningPartitionsCount)
   newGauge("PartitionsWithLateTransactionsCount", () => lateTransactionsCount)
+  newGauge("ProducerIdCount", () => producerIdCount)
 
   def reassigningPartitionsCount: Int = 
leaderPartitionsIterator.count(_.isReassigning)
 
@@ -263,6 +264,8 @@ class ReplicaManager(val config: KafkaConfig,
     leaderPartitionsIterator.count(_.hasLateTransaction(currentTimeMs))
   }
 
+  def producerIdCount: Int = 
onlinePartitionsIterator.map(_.producerIdCount).sum
+
   val isrExpandRate: Meter = newMeter("IsrExpandsPerSec", "expands", 
TimeUnit.SECONDS)
   val isrShrinkRate: Meter = newMeter("IsrShrinksPerSec", "shrinks", 
TimeUnit.SECONDS)
   val failedIsrUpdatesRate: Meter = newMeter("FailedIsrUpdatesPerSec", 
"failedUpdates", TimeUnit.SECONDS)
@@ -1926,6 +1929,7 @@ class ReplicaManager(val config: KafkaConfig,
     removeMetric("AtMinIsrPartitionCount")
     removeMetric("ReassigningPartitions")
     removeMetric("PartitionsWithLateTransactionsCount")
+    removeMetric("ProducerIdCount")
   }
 
   def beginControlledShutdown(): Unit = {
diff --git a/core/src/test/scala/unit/kafka/server/BrokerMetricNamesTest.scala 
b/core/src/test/scala/unit/kafka/server/BrokerMetricNamesTest.scala
index dc69076619d..1e3adc30ee2 100644
--- a/core/src/test/scala/unit/kafka/server/BrokerMetricNamesTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BrokerMetricNamesTest.scala
@@ -48,6 +48,7 @@ class BrokerMetricNamesTest(cluster: ClusterInstance) {
       "LeaderCount", "PartitionCount", "OfflineReplicaCount", 
"UnderReplicatedPartitions",
       "UnderMinIsrPartitionCount", "AtMinIsrPartitionCount", 
"ReassigningPartitions",
       "IsrExpandsPerSec", "IsrShrinksPerSec", "FailedIsrUpdatesPerSec",
+      "ProducerIdCount",
     )
     expectedMetricNames.foreach { metricName =>
       assertEquals(1, metrics.keySet.asScala.count(_.getMBeanName == 
s"$expectedPrefix=$metricName"))
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index f8d3ae3d490..91387ea315b 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -59,12 +59,14 @@ import org.apache.kafka.metadata.LeaderConstants.NO_LEADER
 import org.apache.kafka.metadata.LeaderRecoveryState
 import org.apache.kafka.server.common.OffsetAndEpoch
 import org.apache.kafka.server.common.MetadataVersion.IBP_2_6_IV0
+import org.apache.kafka.server.metrics.KafkaYammerMetrics
 import org.apache.kafka.server.util.MockScheduler
 import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchIsolation, 
FetchParams, FetchPartitionData, LogConfig, LogDirFailureChannel, 
LogOffsetMetadata, ProducerStateManager, ProducerStateManagerConfig}
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.ValueSource
+import com.yammer.metrics.core.Gauge
 import org.mockito.invocation.InvocationOnMock
 import org.mockito.stubbing.Answer
 import org.mockito.ArgumentMatchers
@@ -462,6 +464,89 @@ class ReplicaManagerTest {
     }
   }
 
+  @Test
+  def testProducerIdCountMetrics(): Unit = {
+    val timer = new MockTimer(time)
+    val replicaManager = setupReplicaManagerWithMockedPurgatories(timer)
+
+    try {
+      val brokerList = Seq[Integer](0, 1).asJava
+
+      // Create a couple partition for the topic.
+      val partition0 = replicaManager.createPartition(new 
TopicPartition(topic, 0))
+      partition0.createLogIfNotExists(isNew = false, isFutureReplica = false,
+        new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints), 
None)
+      val partition1 = replicaManager.createPartition(new 
TopicPartition(topic, 1))
+      partition1.createLogIfNotExists(isNew = false, isFutureReplica = false,
+        new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints), 
None)
+
+      // Make this replica the leader for the partitions.
+      Seq(0, 1).foreach { partition =>
+        val leaderAndIsrRequest = new 
LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, 
brokerEpoch,
+          Seq(new LeaderAndIsrPartitionState()
+            .setTopicName(topic)
+            .setPartitionIndex(partition)
+            .setControllerEpoch(0)
+            .setLeader(0)
+            .setLeaderEpoch(0)
+            .setIsr(brokerList)
+            .setPartitionEpoch(0)
+            .setReplicas(brokerList)
+            .setIsNew(true)).asJava,
+          Collections.singletonMap(topic, Uuid.randomUuid()),
+          Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava,
+          false).build()
+        replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) 
=> ())
+        replicaManager.getPartitionOrException(new TopicPartition(topic, 
partition))
+          .localLogOrException
+      }
+
+      def appendRecord(pid: Long, sequence: Int, partition: Int): Unit = {
+        val epoch = 42.toShort
+        val records = 
MemoryRecords.withIdempotentRecords(CompressionType.NONE, pid, epoch, sequence,
+          new SimpleRecord(s"message $sequence".getBytes))
+        appendRecords(replicaManager, new TopicPartition(topic, partition), 
records).onFire { response =>
+          assertEquals(Errors.NONE, response.error)
+        }
+      }
+
+      def replicaManagerMetricValue(): Int = {
+        KafkaYammerMetrics.defaultRegistry().allMetrics().asScala.filter { 
case (metricName, _) =>
+          metricName.getName == "ProducerIdCount" && metricName.getType == 
replicaManager.getClass.getSimpleName
+        }.head._2.asInstanceOf[Gauge[Int]].value
+      }
+
+      // Initially all metrics are 0.
+      assertEquals(0, replicaManagerMetricValue())
+
+      val pid1 = 123L
+      // Produce a record from 1st pid to 1st partition.
+      appendRecord(pid1, 0, 0)
+      assertEquals(1, replicaManagerMetricValue())
+
+      // Produce another record from 1st pid to 1st partition, metrics 
shouldn't change.
+      appendRecord(pid1, 1, 0)
+      assertEquals(1, replicaManagerMetricValue())
+
+      // Produce a record from 2nd pid to 1st partition
+      val pid2 = 456L
+      appendRecord(pid2, 1, 0)
+      assertEquals(2, replicaManagerMetricValue())
+
+      // Produce a record from 1st pid to 2nd partition
+      appendRecord(pid1, 0, 1)
+      assertEquals(3, replicaManagerMetricValue())
+
+      // Simulate producer id expiration.
+      // We use -1 because the timestamp in this test is set to -1, so when
+      // the expiration check subtracts timestamp, we get max value.
+      partition0.removeExpiredProducers(Long.MaxValue - 1);
+      assertEquals(1, replicaManagerMetricValue())
+    } finally {
+      replicaManager.shutdown(checkpointHW = false)
+    }
+  }
+
   @Test
   def testPartitionsWithLateTransactionsCount(): Unit = {
     val timer = new MockTimer(time)
diff --git a/docs/ops.html b/docs/ops.html
index 96ea6d0e71a..8adeeca75cd 100644
--- a/docs/ops.html
+++ b/docs/ops.html
@@ -1604,6 +1604,11 @@ $ bin/kafka-acls.sh \
         <td>kafka.server:type=ReplicaManager,name=AtMinIsrPartitionCount</td>
         <td>0</td>
       </tr>
+      <tr>
+        <td>Producer Id counts</td>
+        <td>kafka.server:type=ReplicaManager,name=ProducerIdCount</td>
+        <td>Count of all producer ids created by transactional and idempotent 
producers in each replica on the broker</td>
+      </tr>
       <tr>
         <td>Partition counts</td>
         <td>kafka.server:type=ReplicaManager,name=PartitionCount</td>
diff --git 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java
 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java
index efa1a6e63dc..774e5be1ac1 100644
--- 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java
+++ 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java
@@ -121,6 +121,9 @@ public class ProducerStateManager {
 
     private volatile File logDir;
 
+    // The same as producers.size, but for lock-free access.
+    private volatile int producerIdCount = 0;
+
     // Keep track of the last timestamp from the oldest transaction. This is 
used
     // to detect (approximately) when a transaction has been left hanging on a 
partition.
     // We make the field volatile so that it can be safely accessed without a 
lock.
@@ -162,6 +165,25 @@ public class ProducerStateManager {
         snapshots = loadSnapshots();
     }
 
+    public int producerIdCount() {
+        return producerIdCount;
+    }
+
+    private void addProducerId(long producerId, ProducerStateEntry entry) {
+        producers.put(producerId, entry);
+        producerIdCount = producers.size();
+    }
+
+    private void removeProducerIds(List<Long> keys) {
+        producers.keySet().removeAll(keys);
+        producerIdCount = producers.size();
+    }
+
+    private void clearProducerIds() {
+        producers.clear();
+        producerIdCount = 0;
+    }
+
     /**
      * Load producer state snapshots by scanning the logDir.
      */
@@ -306,7 +328,7 @@ public class ProducerStateManager {
     // Visible for testing
     public void loadProducerEntry(ProducerStateEntry entry) {
         long producerId = entry.producerId();
-        producers.put(producerId, entry);
+        addProducerId(producerId, entry);
         entry.currentTxnFirstOffset().ifPresent(offset -> 
ongoingTxns.put(offset, new TxnMetadata(producerId, offset)));
     }
 
@@ -322,7 +344,7 @@ public class ProducerStateManager {
                 .filter(entry -> isProducerExpired(currentTimeMs, 
entry.getValue()))
                 .map(Map.Entry::getKey)
                 .collect(Collectors.toList());
-        producers.keySet().removeAll(keys);
+        removeProducerIds(keys);
     }
 
     /**
@@ -342,7 +364,7 @@ public class ProducerStateManager {
         }
 
         if (logEndOffset != mapEndOffset()) {
-            producers.clear();
+            clearProducerIds();
             ongoingTxns.clear();
             updateOldestTxnTimestamp();
 
@@ -374,7 +396,7 @@ public class ProducerStateManager {
         if (currentEntry != null) {
             currentEntry.update(updatedEntry);
         } else {
-            producers.put(appendInfo.producerId(), updatedEntry);
+            addProducerId(appendInfo.producerId(), updatedEntry);
         }
 
         appendInfo.startedTransactions().forEach(txn -> 
ongoingTxns.put(txn.firstOffset.messageOffset, txn));
@@ -479,7 +501,7 @@ public class ProducerStateManager {
      * Truncate the producer id mapping and remove all snapshots. This resets 
the state of the mapping.
      */
     public void truncateFullyAndStartAt(long offset) throws IOException {
-        producers.clear();
+        clearProducerIds();
         ongoingTxns.clear();
         unreplicatedTxns.clear();
         for (SnapshotFile snapshotFile : snapshots.values()) {

Reply via email to