This is an automated email from the ASF dual-hosted git repository.

davidarthur pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a900794ace4 KAFKA-15196 Additional ZK migration metrics (#14028)
a900794ace4 is described below

commit a900794ace4dcf1f9dadee27fbd8b63979532a18
Author: David Arthur <mum...@gmail.com>
AuthorDate: Wed Jul 26 12:54:59 2023 -0400

    KAFKA-15196 Additional ZK migration metrics (#14028)
    
    This patch adds several metrics defined in KIP-866:
    
    * MigratingZkBrokerCount: the number of zk brokers registered with KRaft
    * ZkWriteDeltaTimeMs: time spent writing MetadataDelta to ZK
    * ZkWriteSnapshotTimeMs: time spent writing MetadataImage to ZK
    * Adds value 4 for "ZK" to ZkMigrationState
    
    Also fixes a typo in the metric name introduced in #14009 (ZKWriteBehindLag 
-> ZkWriteBehindLag)
    
    Reviewers: Luke Chen <show...@gmail.com>, Colin P. McCabe 
<cmcc...@apache.org>
---
 .../scala/kafka/controller/KafkaController.scala   |  6 +++-
 .../apache/kafka/controller/QuorumController.java  |  2 ++
 .../metrics/ControllerMetadataMetrics.java         | 26 +++++++++++++-
 .../ControllerMetadataMetricsPublisher.java        |  6 ++++
 .../metrics/ControllerMetricsChanges.java          | 13 +++++++
 .../metrics/QuorumControllerMetrics.java           | 31 +++++++++++++---
 .../metadata/migration/KRaftMigrationDriver.java   | 18 +++++++---
 .../metadata/migration/KRaftMigrationZkWriter.java |  9 ++++-
 .../kafka/metadata/migration/ZkMigrationState.java |  9 ++++-
 .../metrics/ControllerMetadataMetricsTest.java     |  1 +
 .../metrics/ControllerMetricsChangesTest.java      | 42 ++++++++++++++++++----
 .../metrics/QuorumControllerMetricsTest.java       | 14 ++++----
 12 files changed, 152 insertions(+), 25 deletions(-)

diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala 
b/core/src/main/scala/kafka/controller/KafkaController.scala
index fa2575d9d8b..baba44f943b 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -22,7 +22,7 @@ import java.util.concurrent.TimeUnit
 import kafka.api._
 import kafka.common._
 import kafka.cluster.Broker
-import kafka.controller.KafkaController.{ActiveBrokerCountMetricName, 
ActiveControllerCountMetricName, AlterReassignmentsCallback, 
ControllerStateMetricName, ElectLeadersCallback, FencedBrokerCountMetricName, 
GlobalPartitionCountMetricName, GlobalTopicCountMetricName, 
ListReassignmentsCallback, OfflinePartitionsCountMetricName, 
PreferredReplicaImbalanceCountMetricName, 
ReplicasIneligibleToDeleteCountMetricName, ReplicasToDeleteCountMetricName, 
TopicsIneligibleToDeleteCountMetricName, Top [...]
+import kafka.controller.KafkaController.{ActiveBrokerCountMetricName, 
ActiveControllerCountMetricName, AlterReassignmentsCallback, 
ControllerStateMetricName, ElectLeadersCallback, FencedBrokerCountMetricName, 
GlobalPartitionCountMetricName, GlobalTopicCountMetricName, 
ListReassignmentsCallback, OfflinePartitionsCountMetricName, 
PreferredReplicaImbalanceCountMetricName, 
ReplicasIneligibleToDeleteCountMetricName, ReplicasToDeleteCountMetricName, 
TopicsIneligibleToDeleteCountMetricName, Top [...]
 import kafka.coordinator.transaction.ZkProducerIdManager
 import kafka.server._
 import kafka.server.metadata.ZkFinalizedFeatureCache
@@ -44,6 +44,7 @@ import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.{AbstractControlRequest, ApiError, 
LeaderAndIsrResponse, UpdateFeaturesRequest, UpdateMetadataResponse}
 import org.apache.kafka.common.utils.{Time, Utils}
 import org.apache.kafka.metadata.LeaderRecoveryState
+import org.apache.kafka.metadata.migration.ZkMigrationState
 import org.apache.kafka.server.common.{AdminOperationException, 
ProducerIdsBlock}
 import org.apache.kafka.server.metrics.KafkaMetricsGroup
 import org.apache.kafka.server.util.KafkaScheduler
@@ -81,9 +82,11 @@ object KafkaController extends Logging {
   private val ReplicasIneligibleToDeleteCountMetricName = 
"ReplicasIneligibleToDeleteCount"
   private val ActiveBrokerCountMetricName = "ActiveBrokerCount"
   private val FencedBrokerCountMetricName = "FencedBrokerCount"
+  private val ZkMigrationStateMetricName = "ZkMigrationState"
 
   // package private for testing
   private[controller] val MetricNames = Set(
+    ZkMigrationStateMetricName,
     ActiveControllerCountMetricName,
     OfflinePartitionsCountMetricName,
     PreferredReplicaImbalanceCountMetricName,
@@ -172,6 +175,7 @@ class KafkaController(val config: KafkaConfig,
   /* single-thread scheduler to clean expired tokens */
   private val tokenCleanScheduler = new KafkaScheduler(1, true, 
"delegation-token-cleaner")
 
+  metricsGroup.newGauge(ZkMigrationStateMetricName, () => ZkMigrationState.ZK)
   metricsGroup.newGauge(ActiveControllerCountMetricName, () => if (isActive) 1 
else 0)
   metricsGroup.newGauge(OfflinePartitionsCountMetricName, () => 
offlinePartitionCount)
   metricsGroup.newGauge(PreferredReplicaImbalanceCountMetricName, () => 
preferredReplicaImbalanceCount)
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java 
b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index 199c679cb35..6a608d1dd79 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -1273,6 +1273,8 @@ public final class QuorumController implements Controller 
{
                                 "has been completed.");
                         }
                         break;
+                    default:
+                        throw new IllegalStateException("Unsupported 
ZkMigrationState " + featureControl.zkMigrationState());
                 }
             } else {
                 if (zkMigrationEnabled) {
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetadataMetrics.java
 
b/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetadataMetrics.java
index 84f2bc1f9bb..37906b34b8b 100644
--- 
a/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetadataMetrics.java
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetadataMetrics.java
@@ -39,6 +39,8 @@ public final class ControllerMetadataMetrics implements 
AutoCloseable {
         "KafkaController", "FencedBrokerCount");
     private final static MetricName ACTIVE_BROKER_COUNT = getMetricName(
         "KafkaController", "ActiveBrokerCount");
+    private final static MetricName MIGRATING_ZK_BROKER_COUNT = getMetricName(
+        "KafkaController", "MigratingZkBrokerCount");
     private final static MetricName GLOBAL_TOPIC_COUNT = getMetricName(
         "KafkaController", "GlobalTopicCount");
     private final static MetricName GLOBAL_PARTITION_COUNT = getMetricName(
@@ -55,6 +57,7 @@ public final class ControllerMetadataMetrics implements 
AutoCloseable {
     private final Optional<MetricsRegistry> registry;
     private final AtomicInteger fencedBrokerCount = new AtomicInteger(0);
     private final AtomicInteger activeBrokerCount = new AtomicInteger(0);
+    private final AtomicInteger migratingZkBrokerCount = new AtomicInteger(0);
     private final AtomicInteger globalTopicCount = new AtomicInteger(0);
     private final AtomicInteger globalPartitionCount = new AtomicInteger(0);
     private final AtomicInteger offlinePartitionCount = new AtomicInteger(0);
@@ -65,7 +68,7 @@ public final class ControllerMetadataMetrics implements 
AutoCloseable {
     /**
      * Create a new ControllerMetadataMetrics object.
      *
-     * @param registry  The metrics registry, or Optional.empty if this is a 
test and we don't have one.
+     * @param registry The metrics registry, or Optional.empty if this is a 
test and we don't have one.
      */
     public ControllerMetadataMetrics(Optional<MetricsRegistry> registry) {
         this.registry = registry;
@@ -117,6 +120,14 @@ public final class ControllerMetadataMetrics implements 
AutoCloseable {
                 return (int) zkMigrationState();
             }
         }));
+
+        registry.ifPresent(r -> r.newGauge(MIGRATING_ZK_BROKER_COUNT, new 
Gauge<Integer>() {
+            @Override
+            public Integer value() {
+                return migratingZkBrokerCount();
+            }
+        }));
+
     }
 
     public void setFencedBrokerCount(int brokerCount) {
@@ -143,6 +154,18 @@ public final class ControllerMetadataMetrics implements 
AutoCloseable {
         return this.activeBrokerCount.get();
     }
 
+    public void setMigratingZkBrokerCount(int brokerCount) {
+        this.migratingZkBrokerCount.set(brokerCount);
+    }
+
+    public void addToMigratingZkBrokerCount(int brokerCountDelta) {
+        this.migratingZkBrokerCount.addAndGet(brokerCountDelta);
+    }
+
+    public int migratingZkBrokerCount() {
+        return this.migratingZkBrokerCount.get();
+    }
+
     public void setGlobalTopicCount(int topicCount) {
         this.globalTopicCount.set(topicCount);
     }
@@ -212,6 +235,7 @@ public final class ControllerMetadataMetrics implements 
AutoCloseable {
         registry.ifPresent(r -> Arrays.asList(
             FENCED_BROKER_COUNT,
             ACTIVE_BROKER_COUNT,
+            MIGRATING_ZK_BROKER_COUNT,
             GLOBAL_TOPIC_COUNT,
             GLOBAL_PARTITION_COUNT,
             OFFLINE_PARTITION_COUNT,
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetadataMetricsPublisher.java
 
b/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetadataMetricsPublisher.java
index c72bdd9818f..7459fe657af 100644
--- 
a/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetadataMetricsPublisher.java
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetadataMetricsPublisher.java
@@ -124,15 +124,21 @@ public class ControllerMetadataMetricsPublisher 
implements MetadataPublisher {
         metrics.setGlobalTopicCount(newImage.topics().topicsById().size());
         int fencedBrokers = 0;
         int activeBrokers = 0;
+        int zkBrokers = 0;
         for (BrokerRegistration broker : 
newImage.cluster().brokers().values()) {
             if (broker.fenced()) {
                 fencedBrokers++;
             } else {
                 activeBrokers++;
             }
+            if (broker.isMigratingZkBroker()) {
+                zkBrokers++;
+            }
         }
         metrics.setFencedBrokerCount(fencedBrokers);
         metrics.setActiveBrokerCount(activeBrokers);
+        metrics.setMigratingZkBrokerCount(zkBrokers);
+
         int totalPartitions = 0;
         int offlinePartitions = 0;
         int partitionsWithoutPreferredLeader = 0;
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetricsChanges.java
 
b/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetricsChanges.java
index b3bfb44076c..12956b3d610 100644
--- 
a/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetricsChanges.java
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetricsChanges.java
@@ -43,6 +43,7 @@ class ControllerMetricsChanges {
 
     private int fencedBrokersChange = 0;
     private int activeBrokersChange = 0;
+    private int migratingZkBrokersChange = 0;
     private int globalTopicsChange = 0;
     private int globalPartitionsChange = 0;
     private int offlinePartitionsChange = 0;
@@ -56,6 +57,10 @@ class ControllerMetricsChanges {
         return activeBrokersChange;
     }
 
+    public int migratingZkBrokersChange() {
+        return migratingZkBrokersChange;
+    }
+
     public int globalTopicsChange() {
         return globalTopicsChange;
     }
@@ -75,18 +80,23 @@ class ControllerMetricsChanges {
     void handleBrokerChange(BrokerRegistration prev, BrokerRegistration next) {
         boolean wasFenced = false;
         boolean wasActive = false;
+        boolean wasZk = false;
         if (prev != null) {
             wasFenced = prev.fenced();
             wasActive = !prev.fenced();
+            wasZk = prev.isMigratingZkBroker();
         }
         boolean isFenced = false;
         boolean isActive = false;
+        boolean isZk = false;
         if (next != null) {
             isFenced = next.fenced();
             isActive = !next.fenced();
+            isZk = next.isMigratingZkBroker();
         }
         fencedBrokersChange += delta(wasFenced, isFenced);
         activeBrokersChange += delta(wasActive, isActive);
+        migratingZkBrokersChange += delta(wasZk, isZk);
     }
 
     void handleDeletedTopic(TopicImage deletedTopic) {
@@ -141,6 +151,9 @@ class ControllerMetricsChanges {
         if (activeBrokersChange != 0) {
             metrics.addToActiveBrokerCount(activeBrokersChange);
         }
+        if (migratingZkBrokersChange != 0) {
+            metrics.addToMigratingZkBrokerCount(migratingZkBrokersChange);
+        }
         if (globalTopicsChange != 0) {
             metrics.addToGlobalTopicCount(globalTopicsChange);
         }
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java
 
b/metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java
index 6c7aa581a79..225d6d0fb8a 100644
--- 
a/metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java
@@ -45,7 +45,11 @@ public class QuorumControllerMetrics implements 
AutoCloseable {
     private final static MetricName EVENT_QUEUE_PROCESSING_TIME_MS = 
getMetricName(
         "ControllerEventManager", "EventQueueProcessingTimeMs");
     private final static MetricName ZK_WRITE_BEHIND_LAG = getMetricName(
-        "KafkaController", "ZKWriteBehindLag");
+        "KafkaController", "ZkWriteBehindLag");
+    private final static MetricName ZK_WRITE_SNAPSHOT_TIME_MS = getMetricName(
+        "KafkaController", "ZkWriteSnapshotTimeMs");
+    private final static MetricName ZK_WRITE_DELTA_TIME_MS = getMetricName(
+        "KafkaController", "ZkWriteDeltaTimeMs");
     private final static MetricName LAST_APPLIED_RECORD_OFFSET = getMetricName(
         "KafkaController", "LastAppliedRecordOffset");
     private final static MetricName LAST_COMMITTED_RECORD_OFFSET = 
getMetricName(
@@ -71,6 +75,9 @@ public class QuorumControllerMetrics implements AutoCloseable 
{
     private final AtomicLong dualWriteOffset = new AtomicLong(0);
     private final Consumer<Long> eventQueueTimeUpdater;
     private final Consumer<Long> eventQueueProcessingTimeUpdater;
+    private final Consumer<Long> zkWriteSnapshotTimeHandler;
+    private final Consumer<Long> zkWriteDeltaTimeHandler;
+
     private final AtomicLong timedOutHeartbeats = new AtomicLong(0);
     private final AtomicLong operationsStarted = new AtomicLong(0);
     private final AtomicLong operationsTimedOut = new AtomicLong(0);
@@ -88,7 +95,7 @@ public class QuorumControllerMetrics implements AutoCloseable 
{
     public QuorumControllerMetrics(
         Optional<MetricsRegistry> registry,
         Time time,
-        boolean zkMigrationState
+        boolean zkMigrationEnabled
     ) {
         this.registry = registry;
         this.active = false;
@@ -148,7 +155,8 @@ public class QuorumControllerMetrics implements 
AutoCloseable {
                 return newActiveControllers();
             }
         }));
-        if (zkMigrationState) {
+
+        if (zkMigrationEnabled) {
             registry.ifPresent(r -> r.newGauge(ZK_WRITE_BEHIND_LAG, new 
Gauge<Long>() {
                 @Override
                 public Long value() {
@@ -158,6 +166,11 @@ public class QuorumControllerMetrics implements 
AutoCloseable {
                     else return lastCommittedRecordOffset() - 
dualWriteOffset();
                 }
             }));
+            this.zkWriteSnapshotTimeHandler = 
newHistogram(ZK_WRITE_SNAPSHOT_TIME_MS, true);
+            this.zkWriteDeltaTimeHandler = 
newHistogram(ZK_WRITE_DELTA_TIME_MS, true);
+        } else {
+            this.zkWriteSnapshotTimeHandler = __ -> { };
+            this.zkWriteDeltaTimeHandler = __ -> { };
         }
     }
 
@@ -177,6 +190,14 @@ public class QuorumControllerMetrics implements 
AutoCloseable {
         eventQueueProcessingTimeUpdater.accept(durationMs);
     }
 
+    public void updateZkWriteSnapshotTimeMs(long durationMs) {
+        zkWriteSnapshotTimeHandler.accept(durationMs);
+    }
+
+    public void updateZkWriteDeltaTimeMs(long durationMs) {
+        zkWriteDeltaTimeHandler.accept(durationMs);
+    }
+
     public void setLastAppliedRecordOffset(long offset) {
         lastAppliedRecordOffset.set(offset);
     }
@@ -255,7 +276,9 @@ public class QuorumControllerMetrics implements 
AutoCloseable {
             EVENT_QUEUE_OPERATIONS_STARTED_COUNT,
             EVENT_QUEUE_OPERATIONS_TIMED_OUT_COUNT,
             NEW_ACTIVE_CONTROLLERS_COUNT,
-            ZK_WRITE_BEHIND_LAG
+            ZK_WRITE_BEHIND_LAG,
+            ZK_WRITE_SNAPSHOT_TIME_MS,
+            ZK_WRITE_DELTA_TIME_MS
         ).forEach(r::removeMetric));
     }
 
diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
 
b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
index aed201f9005..89278c0d78b 100644
--- 
a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
+++ 
b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
@@ -475,12 +475,17 @@ public class KRaftMigrationDriver implements 
MetadataPublisher {
             }
 
             Map<String, Integer> dualWriteCounts = new TreeMap<>();
+            long startTime = time.nanoseconds();
             if (isSnapshot) {
                 zkMetadataWriter.handleSnapshot(image, 
countingOperationConsumer(
-                        dualWriteCounts, 
KRaftMigrationDriver.this::applyMigrationOperation));
+                    dualWriteCounts, 
KRaftMigrationDriver.this::applyMigrationOperation));
+                
controllerMetrics.updateZkWriteSnapshotTimeMs(NANOSECONDS.toMillis(time.nanoseconds()
 - startTime));
             } else {
-                zkMetadataWriter.handleDelta(prevImage, image, delta, 
countingOperationConsumer(
-                        dualWriteCounts, 
KRaftMigrationDriver.this::applyMigrationOperation));
+                if (zkMetadataWriter.handleDelta(prevImage, image, delta, 
countingOperationConsumer(
+                      dualWriteCounts, 
KRaftMigrationDriver.this::applyMigrationOperation))) {
+                    // Only record delta write time if we changed something. 
Otherwise, no-op records will skew timings.
+                    
controllerMetrics.updateZkWriteDeltaTimeMs(NANOSECONDS.toMillis(time.nanoseconds()
 - startTime));
+                }
             }
             if (dualWriteCounts.isEmpty()) {
                 log.trace("Did not make any ZK writes when handling KRaft {}", 
isSnapshot ? "snapshot" : "delta");
@@ -556,6 +561,8 @@ public class KRaftMigrationDriver implements 
MetadataPublisher {
                         log.error("KRaft controller indicates a completed 
migration, but the migration driver is somehow active.");
                         transitionTo(MigrationDriverState.INACTIVE);
                         break;
+                    default:
+                        throw new IllegalStateException("Unsupported 
ZkMigrationState " + zkMigrationState);
                 }
             }
         }
@@ -658,8 +665,11 @@ public class KRaftMigrationDriver implements 
MetadataPublisher {
             if (migrationState == MigrationDriverState.SYNC_KRAFT_TO_ZK) {
                 log.info("Performing a full metadata sync from KRaft to ZK.");
                 Map<String, Integer> dualWriteCounts = new TreeMap<>();
+                long startTime = time.nanoseconds();
                 zkMetadataWriter.handleSnapshot(image, 
countingOperationConsumer(
-                        dualWriteCounts, 
KRaftMigrationDriver.this::applyMigrationOperation));
+                    dualWriteCounts, 
KRaftMigrationDriver.this::applyMigrationOperation));
+                long endTime = time.nanoseconds();
+                
controllerMetrics.updateZkWriteSnapshotTimeMs(NANOSECONDS.toMillis(startTime - 
endTime));
                 log.info("Made the following ZK writes when reconciling with 
KRaft state: {}", dualWriteCounts);
                 
transitionTo(MigrationDriverState.KRAFT_CONTROLLER_TO_BROKER_COMM);
             }
diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java
 
b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java
index 8dd5d6741b7..3cb95ec5bea 100644
--- 
a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java
+++ 
b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java
@@ -93,27 +93,34 @@ public class KRaftMigrationZkWriter {
         handleAclsSnapshot(image.acls(), operationConsumer);
     }
 
-    public void handleDelta(
+    public boolean handleDelta(
         MetadataImage previousImage,
         MetadataImage image,
         MetadataDelta delta,
         KRaftMigrationOperationConsumer operationConsumer
     ) {
+        boolean updated = false;
         if (delta.topicsDelta() != null) {
             handleTopicsDelta(previousImage.topics().topicIdToNameView()::get, 
image.topics(), delta.topicsDelta(), operationConsumer);
+            updated = true;
         }
         if (delta.configsDelta() != null) {
             handleConfigsDelta(image.configs(), delta.configsDelta(), 
operationConsumer);
+            updated = true;
         }
         if ((delta.clientQuotasDelta() != null) || (delta.scramDelta() != 
null)) {
             handleClientQuotasDelta(image, delta, operationConsumer);
+            updated = true;
         }
         if (delta.producerIdsDelta() != null) {
             handleProducerIdDelta(delta.producerIdsDelta(), operationConsumer);
+            updated = true;
         }
         if (delta.aclsDelta() != null) {
             handleAclsDelta(image.acls(), delta.aclsDelta(), 
operationConsumer);
+            updated = true;
         }
+        return updated;
     }
 
     /**
diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/migration/ZkMigrationState.java
 
b/metadata/src/main/java/org/apache/kafka/metadata/migration/ZkMigrationState.java
index 8d5e584831c..ff8ebd08b38 100644
--- 
a/metadata/src/main/java/org/apache/kafka/metadata/migration/ZkMigrationState.java
+++ 
b/metadata/src/main/java/org/apache/kafka/metadata/migration/ZkMigrationState.java
@@ -62,7 +62,14 @@ public enum ZkMigrationState {
      * will persist indefinitely after the migration. In operational terms, 
this is the same as the NONE
      * state.
      */
-    POST_MIGRATION((byte) 3);
+    POST_MIGRATION((byte) 3),
+
+    /**
+     * The controller is a ZK controller. No migration has been performed. 
This state is never persisted
+     * and is only used by KafkaController in order to have a unified metric 
that indicates what kind of
+     * metadata state the controller is in.
+     */
+    ZK((byte) 4);
 
     private final byte value;
 
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetadataMetricsTest.java
 
b/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetadataMetricsTest.java
index 47ffcc3589f..9bec14d0c69 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetadataMetricsTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetadataMetricsTest.java
@@ -41,6 +41,7 @@ public class ControllerMetadataMetricsTest {
                     new HashSet<>(Arrays.asList(
                         
"kafka.controller:type=KafkaController,name=ActiveBrokerCount",
                         
"kafka.controller:type=KafkaController,name=FencedBrokerCount",
+                        
"kafka.controller:type=KafkaController,name=MigratingZkBrokerCount",
                         
"kafka.controller:type=KafkaController,name=GlobalPartitionCount",
                         
"kafka.controller:type=KafkaController,name=GlobalTopicCount",
                         
"kafka.controller:type=KafkaController,name=MetadataErrorCount",
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetricsChangesTest.java
 
b/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetricsChangesTest.java
index ef75dc61170..2362629ae4a 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetricsChangesTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetricsChangesTest.java
@@ -51,13 +51,27 @@ public class ControllerMetricsChangesTest {
         boolean fenced
     ) {
         return new BrokerRegistration(brokerId,
-                100L,
-                Uuid.fromString("Pxi6QwS2RFuN8VSKjqJZyQ"),
-                Collections.emptyList(),
-                Collections.emptyMap(),
-                Optional.empty(),
-                fenced,
-                false);
+            100L,
+            Uuid.fromString("Pxi6QwS2RFuN8VSKjqJZyQ"),
+            Collections.emptyList(),
+            Collections.emptyMap(),
+            Optional.empty(),
+            fenced,
+            false);
+    }
+
+    private static BrokerRegistration zkBrokerRegistration(
+        int brokerId
+    ) {
+        return new BrokerRegistration(brokerId,
+            100L,
+            Uuid.fromString("Pxi6QwS2RFuN8VSKjqJZyQ"),
+            Collections.emptyList(),
+            Collections.emptyMap(),
+            Optional.empty(),
+            false,
+            false,
+            true);
     }
 
     @Test
@@ -103,6 +117,20 @@ public class ControllerMetricsChangesTest {
         assertEquals(1, changes.activeBrokersChange());
     }
 
+    @Test
+    public void testHandleZkBroker() {
+        ControllerMetricsChanges changes = new ControllerMetricsChanges();
+        changes.handleBrokerChange(null, zkBrokerRegistration(1));
+        assertEquals(1, changes.migratingZkBrokersChange());
+        changes.handleBrokerChange(null, zkBrokerRegistration(2));
+        changes.handleBrokerChange(null, zkBrokerRegistration(3));
+        assertEquals(3, changes.migratingZkBrokersChange());
+
+        changes.handleBrokerChange(zkBrokerRegistration(3), 
brokerRegistration(3, true));
+        changes.handleBrokerChange(brokerRegistration(3, true), 
brokerRegistration(3, false));
+        assertEquals(2, changes.migratingZkBrokersChange());
+    }
+
     @Test
     public void testHandleDeletedTopic() {
         ControllerMetricsChanges changes = new ControllerMetricsChanges();
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/metrics/QuorumControllerMetricsTest.java
 
b/metadata/src/test/java/org/apache/kafka/controller/metrics/QuorumControllerMetricsTest.java
index 9258577a0dd..936009e47d9 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/metrics/QuorumControllerMetricsTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/metrics/QuorumControllerMetricsTest.java
@@ -59,7 +59,9 @@ public class QuorumControllerMetricsTest {
                     
"kafka.controller:type=KafkaController,name=TimedOutBrokerHeartbeatCount"
                 ));
                 if (inMigration) {
-                    
expected.add("kafka.controller:type=KafkaController,name=ZKWriteBehindLag");
+                    
expected.add("kafka.controller:type=KafkaController,name=ZkWriteBehindLag");
+                    
expected.add("kafka.controller:type=KafkaController,name=ZkWriteSnapshotTimeMs");
+                    
expected.add("kafka.controller:type=KafkaController,name=ZkWriteDeltaTimeMs");
                 }
                 ControllerMetricsTestUtils.assertMetricsForTypeEqual(registry, 
"kafka.controller", expected);
             }
@@ -144,7 +146,7 @@ public class QuorumControllerMetricsTest {
             @SuppressWarnings("unchecked")
             Gauge<Long> zkWriteBehindLag = (Gauge<Long>) registry
                     .allMetrics()
-                    .get(metricName("KafkaController", "ZKWriteBehindLag"));
+                    .get(metricName("KafkaController", "ZkWriteBehindLag"));
             assertEquals(10L, zkWriteBehindLag.value());
 
             @SuppressWarnings("unchecked")
@@ -184,8 +186,8 @@ public class QuorumControllerMetricsTest {
             metrics.updateDualWriteOffset(0);
             @SuppressWarnings("unchecked")
             Gauge<Long> zkWriteBehindLag = (Gauge<Long>) registry
-                    .allMetrics()
-                    .get(metricName("KafkaController", "ZKWriteBehindLag"));
+                .allMetrics()
+                .get(metricName("KafkaController", "ZkWriteBehindLag"));
             assertEquals(0, zkWriteBehindLag.value());
         } finally {
             registry.shutdown();
@@ -197,8 +199,8 @@ public class QuorumControllerMetricsTest {
             metrics.setLastCommittedRecordOffset(100);
             @SuppressWarnings("unchecked")
             Gauge<Long> zkWriteBehindLag = (Gauge<Long>) registry
-                    .allMetrics()
-                    .get(metricName("KafkaController", "ZKWriteBehindLag"));
+                .allMetrics()
+                .get(metricName("KafkaController", "ZkWriteBehindLag"));
             assertEquals(10, zkWriteBehindLag.value());
         } finally {
             registry.shutdown();

Reply via email to