This is an automated email from the ASF dual-hosted git repository. davidarthur pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new a900794ace4 KAFKA-15196 Additional ZK migration metrics (#14028) a900794ace4 is described below commit a900794ace4dcf1f9dadee27fbd8b63979532a18 Author: David Arthur <mum...@gmail.com> AuthorDate: Wed Jul 26 12:54:59 2023 -0400 KAFKA-15196 Additional ZK migration metrics (#14028) This patch adds several metrics defined in KIP-866: * MigratingZkBrokerCount: the number of zk brokers registered with KRaft * ZkWriteDeltaTimeMs: time spent writing MetadataDelta to ZK * ZkWriteSnapshotTimeMs: time spent writing MetadataImage to ZK * Adds value 4 for "ZK" to ZkMigrationState Also fixes a typo in the metric name introduced in #14009 (ZKWriteBehindLag -> ZkWriteBehindLag) Reviewers: Luke Chen <show...@gmail.com>, Colin P. McCabe <cmcc...@apache.org> --- .../scala/kafka/controller/KafkaController.scala | 6 +++- .../apache/kafka/controller/QuorumController.java | 2 ++ .../metrics/ControllerMetadataMetrics.java | 26 +++++++++++++- .../ControllerMetadataMetricsPublisher.java | 6 ++++ .../metrics/ControllerMetricsChanges.java | 13 +++++++ .../metrics/QuorumControllerMetrics.java | 31 +++++++++++++--- .../metadata/migration/KRaftMigrationDriver.java | 18 +++++++--- .../metadata/migration/KRaftMigrationZkWriter.java | 9 ++++- .../kafka/metadata/migration/ZkMigrationState.java | 9 ++++- .../metrics/ControllerMetadataMetricsTest.java | 1 + .../metrics/ControllerMetricsChangesTest.java | 42 ++++++++++++++++++---- .../metrics/QuorumControllerMetricsTest.java | 14 ++++---- 12 files changed, 152 insertions(+), 25 deletions(-) diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index fa2575d9d8b..baba44f943b 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -22,7 +22,7 @@ import java.util.concurrent.TimeUnit import kafka.api._ import kafka.common._ import kafka.cluster.Broker -import kafka.controller.KafkaController.{ActiveBrokerCountMetricName, ActiveControllerCountMetricName, AlterReassignmentsCallback, ControllerStateMetricName, ElectLeadersCallback, FencedBrokerCountMetricName, GlobalPartitionCountMetricName, GlobalTopicCountMetricName, ListReassignmentsCallback, OfflinePartitionsCountMetricName, PreferredReplicaImbalanceCountMetricName, ReplicasIneligibleToDeleteCountMetricName, ReplicasToDeleteCountMetricName, TopicsIneligibleToDeleteCountMetricName, Top [...] +import kafka.controller.KafkaController.{ActiveBrokerCountMetricName, ActiveControllerCountMetricName, AlterReassignmentsCallback, ControllerStateMetricName, ElectLeadersCallback, FencedBrokerCountMetricName, GlobalPartitionCountMetricName, GlobalTopicCountMetricName, ListReassignmentsCallback, OfflinePartitionsCountMetricName, PreferredReplicaImbalanceCountMetricName, ReplicasIneligibleToDeleteCountMetricName, ReplicasToDeleteCountMetricName, TopicsIneligibleToDeleteCountMetricName, Top [...] import kafka.coordinator.transaction.ZkProducerIdManager import kafka.server._ import kafka.server.metadata.ZkFinalizedFeatureCache @@ -44,6 +44,7 @@ import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.requests.{AbstractControlRequest, ApiError, LeaderAndIsrResponse, UpdateFeaturesRequest, UpdateMetadataResponse} import org.apache.kafka.common.utils.{Time, Utils} import org.apache.kafka.metadata.LeaderRecoveryState +import org.apache.kafka.metadata.migration.ZkMigrationState import org.apache.kafka.server.common.{AdminOperationException, ProducerIdsBlock} import org.apache.kafka.server.metrics.KafkaMetricsGroup import org.apache.kafka.server.util.KafkaScheduler @@ -81,9 +82,11 @@ object KafkaController extends Logging { private val ReplicasIneligibleToDeleteCountMetricName = "ReplicasIneligibleToDeleteCount" private val ActiveBrokerCountMetricName = "ActiveBrokerCount" private val FencedBrokerCountMetricName = "FencedBrokerCount" + private val ZkMigrationStateMetricName = "ZkMigrationState" // package private for testing private[controller] val MetricNames = Set( + ZkMigrationStateMetricName, ActiveControllerCountMetricName, OfflinePartitionsCountMetricName, PreferredReplicaImbalanceCountMetricName, @@ -172,6 +175,7 @@ class KafkaController(val config: KafkaConfig, /* single-thread scheduler to clean expired tokens */ private val tokenCleanScheduler = new KafkaScheduler(1, true, "delegation-token-cleaner") + metricsGroup.newGauge(ZkMigrationStateMetricName, () => ZkMigrationState.ZK) metricsGroup.newGauge(ActiveControllerCountMetricName, () => if (isActive) 1 else 0) metricsGroup.newGauge(OfflinePartitionsCountMetricName, () => offlinePartitionCount) metricsGroup.newGauge(PreferredReplicaImbalanceCountMetricName, () => preferredReplicaImbalanceCount) diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java index 199c679cb35..6a608d1dd79 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java +++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java @@ -1273,6 +1273,8 @@ public final class QuorumController implements Controller { "has been completed."); } break; + default: + throw new IllegalStateException("Unsupported ZkMigrationState " + featureControl.zkMigrationState()); } } else { if (zkMigrationEnabled) { diff --git a/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetadataMetrics.java b/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetadataMetrics.java index 84f2bc1f9bb..37906b34b8b 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetadataMetrics.java +++ b/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetadataMetrics.java @@ -39,6 +39,8 @@ public final class ControllerMetadataMetrics implements AutoCloseable { "KafkaController", "FencedBrokerCount"); private final static MetricName ACTIVE_BROKER_COUNT = getMetricName( "KafkaController", "ActiveBrokerCount"); + private final static MetricName MIGRATING_ZK_BROKER_COUNT = getMetricName( + "KafkaController", "MigratingZkBrokerCount"); private final static MetricName GLOBAL_TOPIC_COUNT = getMetricName( "KafkaController", "GlobalTopicCount"); private final static MetricName GLOBAL_PARTITION_COUNT = getMetricName( @@ -55,6 +57,7 @@ public final class ControllerMetadataMetrics implements AutoCloseable { private final Optional<MetricsRegistry> registry; private final AtomicInteger fencedBrokerCount = new AtomicInteger(0); private final AtomicInteger activeBrokerCount = new AtomicInteger(0); + private final AtomicInteger migratingZkBrokerCount = new AtomicInteger(0); private final AtomicInteger globalTopicCount = new AtomicInteger(0); private final AtomicInteger globalPartitionCount = new AtomicInteger(0); private final AtomicInteger offlinePartitionCount = new AtomicInteger(0); @@ -65,7 +68,7 @@ public final class ControllerMetadataMetrics implements AutoCloseable { /** * Create a new ControllerMetadataMetrics object. * - * @param registry The metrics registry, or Optional.empty if this is a test and we don't have one. + * @param registry The metrics registry, or Optional.empty if this is a test and we don't have one. */ public ControllerMetadataMetrics(Optional<MetricsRegistry> registry) { this.registry = registry; @@ -117,6 +120,14 @@ public final class ControllerMetadataMetrics implements AutoCloseable { return (int) zkMigrationState(); } })); + + registry.ifPresent(r -> r.newGauge(MIGRATING_ZK_BROKER_COUNT, new Gauge<Integer>() { + @Override + public Integer value() { + return migratingZkBrokerCount(); + } + })); + } public void setFencedBrokerCount(int brokerCount) { @@ -143,6 +154,18 @@ public final class ControllerMetadataMetrics implements AutoCloseable { return this.activeBrokerCount.get(); } + public void setMigratingZkBrokerCount(int brokerCount) { + this.migratingZkBrokerCount.set(brokerCount); + } + + public void addToMigratingZkBrokerCount(int brokerCountDelta) { + this.migratingZkBrokerCount.addAndGet(brokerCountDelta); + } + + public int migratingZkBrokerCount() { + return this.migratingZkBrokerCount.get(); + } + public void setGlobalTopicCount(int topicCount) { this.globalTopicCount.set(topicCount); } @@ -212,6 +235,7 @@ public final class ControllerMetadataMetrics implements AutoCloseable { registry.ifPresent(r -> Arrays.asList( FENCED_BROKER_COUNT, ACTIVE_BROKER_COUNT, + MIGRATING_ZK_BROKER_COUNT, GLOBAL_TOPIC_COUNT, GLOBAL_PARTITION_COUNT, OFFLINE_PARTITION_COUNT, diff --git a/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetadataMetricsPublisher.java b/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetadataMetricsPublisher.java index c72bdd9818f..7459fe657af 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetadataMetricsPublisher.java +++ b/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetadataMetricsPublisher.java @@ -124,15 +124,21 @@ public class ControllerMetadataMetricsPublisher implements MetadataPublisher { metrics.setGlobalTopicCount(newImage.topics().topicsById().size()); int fencedBrokers = 0; int activeBrokers = 0; + int zkBrokers = 0; for (BrokerRegistration broker : newImage.cluster().brokers().values()) { if (broker.fenced()) { fencedBrokers++; } else { activeBrokers++; } + if (broker.isMigratingZkBroker()) { + zkBrokers++; + } } metrics.setFencedBrokerCount(fencedBrokers); metrics.setActiveBrokerCount(activeBrokers); + metrics.setMigratingZkBrokerCount(zkBrokers); + int totalPartitions = 0; int offlinePartitions = 0; int partitionsWithoutPreferredLeader = 0; diff --git a/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetricsChanges.java b/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetricsChanges.java index b3bfb44076c..12956b3d610 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetricsChanges.java +++ b/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetricsChanges.java @@ -43,6 +43,7 @@ class ControllerMetricsChanges { private int fencedBrokersChange = 0; private int activeBrokersChange = 0; + private int migratingZkBrokersChange = 0; private int globalTopicsChange = 0; private int globalPartitionsChange = 0; private int offlinePartitionsChange = 0; @@ -56,6 +57,10 @@ class ControllerMetricsChanges { return activeBrokersChange; } + public int migratingZkBrokersChange() { + return migratingZkBrokersChange; + } + public int globalTopicsChange() { return globalTopicsChange; } @@ -75,18 +80,23 @@ class ControllerMetricsChanges { void handleBrokerChange(BrokerRegistration prev, BrokerRegistration next) { boolean wasFenced = false; boolean wasActive = false; + boolean wasZk = false; if (prev != null) { wasFenced = prev.fenced(); wasActive = !prev.fenced(); + wasZk = prev.isMigratingZkBroker(); } boolean isFenced = false; boolean isActive = false; + boolean isZk = false; if (next != null) { isFenced = next.fenced(); isActive = !next.fenced(); + isZk = next.isMigratingZkBroker(); } fencedBrokersChange += delta(wasFenced, isFenced); activeBrokersChange += delta(wasActive, isActive); + migratingZkBrokersChange += delta(wasZk, isZk); } void handleDeletedTopic(TopicImage deletedTopic) { @@ -141,6 +151,9 @@ class ControllerMetricsChanges { if (activeBrokersChange != 0) { metrics.addToActiveBrokerCount(activeBrokersChange); } + if (migratingZkBrokersChange != 0) { + metrics.addToMigratingZkBrokerCount(migratingZkBrokersChange); + } if (globalTopicsChange != 0) { metrics.addToGlobalTopicCount(globalTopicsChange); } diff --git a/metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java b/metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java index 6c7aa581a79..225d6d0fb8a 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java +++ b/metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java @@ -45,7 +45,11 @@ public class QuorumControllerMetrics implements AutoCloseable { private final static MetricName EVENT_QUEUE_PROCESSING_TIME_MS = getMetricName( "ControllerEventManager", "EventQueueProcessingTimeMs"); private final static MetricName ZK_WRITE_BEHIND_LAG = getMetricName( - "KafkaController", "ZKWriteBehindLag"); + "KafkaController", "ZkWriteBehindLag"); + private final static MetricName ZK_WRITE_SNAPSHOT_TIME_MS = getMetricName( + "KafkaController", "ZkWriteSnapshotTimeMs"); + private final static MetricName ZK_WRITE_DELTA_TIME_MS = getMetricName( + "KafkaController", "ZkWriteDeltaTimeMs"); private final static MetricName LAST_APPLIED_RECORD_OFFSET = getMetricName( "KafkaController", "LastAppliedRecordOffset"); private final static MetricName LAST_COMMITTED_RECORD_OFFSET = getMetricName( @@ -71,6 +75,9 @@ public class QuorumControllerMetrics implements AutoCloseable { private final AtomicLong dualWriteOffset = new AtomicLong(0); private final Consumer<Long> eventQueueTimeUpdater; private final Consumer<Long> eventQueueProcessingTimeUpdater; + private final Consumer<Long> zkWriteSnapshotTimeHandler; + private final Consumer<Long> zkWriteDeltaTimeHandler; + private final AtomicLong timedOutHeartbeats = new AtomicLong(0); private final AtomicLong operationsStarted = new AtomicLong(0); private final AtomicLong operationsTimedOut = new AtomicLong(0); @@ -88,7 +95,7 @@ public class QuorumControllerMetrics implements AutoCloseable { public QuorumControllerMetrics( Optional<MetricsRegistry> registry, Time time, - boolean zkMigrationState + boolean zkMigrationEnabled ) { this.registry = registry; this.active = false; @@ -148,7 +155,8 @@ public class QuorumControllerMetrics implements AutoCloseable { return newActiveControllers(); } })); - if (zkMigrationState) { + + if (zkMigrationEnabled) { registry.ifPresent(r -> r.newGauge(ZK_WRITE_BEHIND_LAG, new Gauge<Long>() { @Override public Long value() { @@ -158,6 +166,11 @@ public class QuorumControllerMetrics implements AutoCloseable { else return lastCommittedRecordOffset() - dualWriteOffset(); } })); + this.zkWriteSnapshotTimeHandler = newHistogram(ZK_WRITE_SNAPSHOT_TIME_MS, true); + this.zkWriteDeltaTimeHandler = newHistogram(ZK_WRITE_DELTA_TIME_MS, true); + } else { + this.zkWriteSnapshotTimeHandler = __ -> { }; + this.zkWriteDeltaTimeHandler = __ -> { }; } } @@ -177,6 +190,14 @@ public class QuorumControllerMetrics implements AutoCloseable { eventQueueProcessingTimeUpdater.accept(durationMs); } + public void updateZkWriteSnapshotTimeMs(long durationMs) { + zkWriteSnapshotTimeHandler.accept(durationMs); + } + + public void updateZkWriteDeltaTimeMs(long durationMs) { + zkWriteDeltaTimeHandler.accept(durationMs); + } + public void setLastAppliedRecordOffset(long offset) { lastAppliedRecordOffset.set(offset); } @@ -255,7 +276,9 @@ public class QuorumControllerMetrics implements AutoCloseable { EVENT_QUEUE_OPERATIONS_STARTED_COUNT, EVENT_QUEUE_OPERATIONS_TIMED_OUT_COUNT, NEW_ACTIVE_CONTROLLERS_COUNT, - ZK_WRITE_BEHIND_LAG + ZK_WRITE_BEHIND_LAG, + ZK_WRITE_SNAPSHOT_TIME_MS, + ZK_WRITE_DELTA_TIME_MS ).forEach(r::removeMetric)); } diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java index aed201f9005..89278c0d78b 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java @@ -475,12 +475,17 @@ public class KRaftMigrationDriver implements MetadataPublisher { } Map<String, Integer> dualWriteCounts = new TreeMap<>(); + long startTime = time.nanoseconds(); if (isSnapshot) { zkMetadataWriter.handleSnapshot(image, countingOperationConsumer( - dualWriteCounts, KRaftMigrationDriver.this::applyMigrationOperation)); + dualWriteCounts, KRaftMigrationDriver.this::applyMigrationOperation)); + controllerMetrics.updateZkWriteSnapshotTimeMs(NANOSECONDS.toMillis(time.nanoseconds() - startTime)); } else { - zkMetadataWriter.handleDelta(prevImage, image, delta, countingOperationConsumer( - dualWriteCounts, KRaftMigrationDriver.this::applyMigrationOperation)); + if (zkMetadataWriter.handleDelta(prevImage, image, delta, countingOperationConsumer( + dualWriteCounts, KRaftMigrationDriver.this::applyMigrationOperation))) { + // Only record delta write time if we changed something. Otherwise, no-op records will skew timings. + controllerMetrics.updateZkWriteDeltaTimeMs(NANOSECONDS.toMillis(time.nanoseconds() - startTime)); + } } if (dualWriteCounts.isEmpty()) { log.trace("Did not make any ZK writes when handling KRaft {}", isSnapshot ? "snapshot" : "delta"); @@ -556,6 +561,8 @@ public class KRaftMigrationDriver implements MetadataPublisher { log.error("KRaft controller indicates a completed migration, but the migration driver is somehow active."); transitionTo(MigrationDriverState.INACTIVE); break; + default: + throw new IllegalStateException("Unsupported ZkMigrationState " + zkMigrationState); } } } @@ -658,8 +665,11 @@ public class KRaftMigrationDriver implements MetadataPublisher { if (migrationState == MigrationDriverState.SYNC_KRAFT_TO_ZK) { log.info("Performing a full metadata sync from KRaft to ZK."); Map<String, Integer> dualWriteCounts = new TreeMap<>(); + long startTime = time.nanoseconds(); zkMetadataWriter.handleSnapshot(image, countingOperationConsumer( - dualWriteCounts, KRaftMigrationDriver.this::applyMigrationOperation)); + dualWriteCounts, KRaftMigrationDriver.this::applyMigrationOperation)); + long endTime = time.nanoseconds(); + controllerMetrics.updateZkWriteSnapshotTimeMs(NANOSECONDS.toMillis(startTime - endTime)); log.info("Made the following ZK writes when reconciling with KRaft state: {}", dualWriteCounts); transitionTo(MigrationDriverState.KRAFT_CONTROLLER_TO_BROKER_COMM); } diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java index 8dd5d6741b7..3cb95ec5bea 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java @@ -93,27 +93,34 @@ public class KRaftMigrationZkWriter { handleAclsSnapshot(image.acls(), operationConsumer); } - public void handleDelta( + public boolean handleDelta( MetadataImage previousImage, MetadataImage image, MetadataDelta delta, KRaftMigrationOperationConsumer operationConsumer ) { + boolean updated = false; if (delta.topicsDelta() != null) { handleTopicsDelta(previousImage.topics().topicIdToNameView()::get, image.topics(), delta.topicsDelta(), operationConsumer); + updated = true; } if (delta.configsDelta() != null) { handleConfigsDelta(image.configs(), delta.configsDelta(), operationConsumer); + updated = true; } if ((delta.clientQuotasDelta() != null) || (delta.scramDelta() != null)) { handleClientQuotasDelta(image, delta, operationConsumer); + updated = true; } if (delta.producerIdsDelta() != null) { handleProducerIdDelta(delta.producerIdsDelta(), operationConsumer); + updated = true; } if (delta.aclsDelta() != null) { handleAclsDelta(image.acls(), delta.aclsDelta(), operationConsumer); + updated = true; } + return updated; } /** diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/ZkMigrationState.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/ZkMigrationState.java index 8d5e584831c..ff8ebd08b38 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/migration/ZkMigrationState.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/ZkMigrationState.java @@ -62,7 +62,14 @@ public enum ZkMigrationState { * will persist indefinitely after the migration. In operational terms, this is the same as the NONE * state. */ - POST_MIGRATION((byte) 3); + POST_MIGRATION((byte) 3), + + /** + * The controller is a ZK controller. No migration has been performed. This state is never persisted + * and is only used by KafkaController in order to have a unified metric that indicates what kind of + * metadata state the controller is in. + */ + ZK((byte) 4); private final byte value; diff --git a/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetadataMetricsTest.java b/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetadataMetricsTest.java index 47ffcc3589f..9bec14d0c69 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetadataMetricsTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetadataMetricsTest.java @@ -41,6 +41,7 @@ public class ControllerMetadataMetricsTest { new HashSet<>(Arrays.asList( "kafka.controller:type=KafkaController,name=ActiveBrokerCount", "kafka.controller:type=KafkaController,name=FencedBrokerCount", + "kafka.controller:type=KafkaController,name=MigratingZkBrokerCount", "kafka.controller:type=KafkaController,name=GlobalPartitionCount", "kafka.controller:type=KafkaController,name=GlobalTopicCount", "kafka.controller:type=KafkaController,name=MetadataErrorCount", diff --git a/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetricsChangesTest.java b/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetricsChangesTest.java index ef75dc61170..2362629ae4a 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetricsChangesTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetricsChangesTest.java @@ -51,13 +51,27 @@ public class ControllerMetricsChangesTest { boolean fenced ) { return new BrokerRegistration(brokerId, - 100L, - Uuid.fromString("Pxi6QwS2RFuN8VSKjqJZyQ"), - Collections.emptyList(), - Collections.emptyMap(), - Optional.empty(), - fenced, - false); + 100L, + Uuid.fromString("Pxi6QwS2RFuN8VSKjqJZyQ"), + Collections.emptyList(), + Collections.emptyMap(), + Optional.empty(), + fenced, + false); + } + + private static BrokerRegistration zkBrokerRegistration( + int brokerId + ) { + return new BrokerRegistration(brokerId, + 100L, + Uuid.fromString("Pxi6QwS2RFuN8VSKjqJZyQ"), + Collections.emptyList(), + Collections.emptyMap(), + Optional.empty(), + false, + false, + true); } @Test @@ -103,6 +117,20 @@ public class ControllerMetricsChangesTest { assertEquals(1, changes.activeBrokersChange()); } + @Test + public void testHandleZkBroker() { + ControllerMetricsChanges changes = new ControllerMetricsChanges(); + changes.handleBrokerChange(null, zkBrokerRegistration(1)); + assertEquals(1, changes.migratingZkBrokersChange()); + changes.handleBrokerChange(null, zkBrokerRegistration(2)); + changes.handleBrokerChange(null, zkBrokerRegistration(3)); + assertEquals(3, changes.migratingZkBrokersChange()); + + changes.handleBrokerChange(zkBrokerRegistration(3), brokerRegistration(3, true)); + changes.handleBrokerChange(brokerRegistration(3, true), brokerRegistration(3, false)); + assertEquals(2, changes.migratingZkBrokersChange()); + } + @Test public void testHandleDeletedTopic() { ControllerMetricsChanges changes = new ControllerMetricsChanges(); diff --git a/metadata/src/test/java/org/apache/kafka/controller/metrics/QuorumControllerMetricsTest.java b/metadata/src/test/java/org/apache/kafka/controller/metrics/QuorumControllerMetricsTest.java index 9258577a0dd..936009e47d9 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/metrics/QuorumControllerMetricsTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/metrics/QuorumControllerMetricsTest.java @@ -59,7 +59,9 @@ public class QuorumControllerMetricsTest { "kafka.controller:type=KafkaController,name=TimedOutBrokerHeartbeatCount" )); if (inMigration) { - expected.add("kafka.controller:type=KafkaController,name=ZKWriteBehindLag"); + expected.add("kafka.controller:type=KafkaController,name=ZkWriteBehindLag"); + expected.add("kafka.controller:type=KafkaController,name=ZkWriteSnapshotTimeMs"); + expected.add("kafka.controller:type=KafkaController,name=ZkWriteDeltaTimeMs"); } ControllerMetricsTestUtils.assertMetricsForTypeEqual(registry, "kafka.controller", expected); } @@ -144,7 +146,7 @@ public class QuorumControllerMetricsTest { @SuppressWarnings("unchecked") Gauge<Long> zkWriteBehindLag = (Gauge<Long>) registry .allMetrics() - .get(metricName("KafkaController", "ZKWriteBehindLag")); + .get(metricName("KafkaController", "ZkWriteBehindLag")); assertEquals(10L, zkWriteBehindLag.value()); @SuppressWarnings("unchecked") @@ -184,8 +186,8 @@ public class QuorumControllerMetricsTest { metrics.updateDualWriteOffset(0); @SuppressWarnings("unchecked") Gauge<Long> zkWriteBehindLag = (Gauge<Long>) registry - .allMetrics() - .get(metricName("KafkaController", "ZKWriteBehindLag")); + .allMetrics() + .get(metricName("KafkaController", "ZkWriteBehindLag")); assertEquals(0, zkWriteBehindLag.value()); } finally { registry.shutdown(); @@ -197,8 +199,8 @@ public class QuorumControllerMetricsTest { metrics.setLastCommittedRecordOffset(100); @SuppressWarnings("unchecked") Gauge<Long> zkWriteBehindLag = (Gauge<Long>) registry - .allMetrics() - .get(metricName("KafkaController", "ZKWriteBehindLag")); + .allMetrics() + .get(metricName("KafkaController", "ZkWriteBehindLag")); assertEquals(10, zkWriteBehindLag.value()); } finally { registry.shutdown();