[kafka] branch trunk updated: HOTFIX: null check keys of ProducerRecord when computing sizeInBytes (#12288)
This is an automated email from the ASF dual-hosted git repository. ableegoldman pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 3189a8648f HOTFIX: null check keys of ProducerRecord when computing sizeInBytes (#12288) 3189a8648f is described below commit 3189a8648f3abf3e16c5932dfa37e0d28d97a016 Author: A. Sophie Blee-Goldman AuthorDate: Mon Jun 13 22:27:06 2022 -0700 HOTFIX: null check keys of ProducerRecord when computing sizeInBytes (#12288) Minor followup to #12235 that adds a null check on the record key in the new ClientUtils#producerRecordSizeInBytes utility method, as there are valid cases in which we might be sending records with null keys to the Producer, such as a simple builder.stream("non-keyed-input-topic").filter(...).to("output-topic") Reviewers: Matthias J. Sax , Bill Bejeck --- .../streams/processor/internals/ClientUtils.java | 10 ++--- .../processor/internals/ClientUtilsTest.java | 44 +- .../org/apache/kafka/streams/TestTopicsTest.java | 12 ++ 3 files changed, 50 insertions(+), 16 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ClientUtils.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ClientUtils.java index ea48c64617..1177e29d82 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ClientUtils.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ClientUtils.java @@ -174,7 +174,7 @@ public class ClientUtils { public static long producerRecordSizeInBytes(final ProducerRecord record) { return recordSizeInBytes( -record.key().length, +record.key() == null ? 0 : record.key().length, record.value() == null ? 0 : record.value().length, record.topic(), record.headers() @@ -190,10 +190,10 @@ public class ClientUtils { ); } -public static long recordSizeInBytes(final long keyBytes, - final long valueBytes, - final String topic, - final Headers headers) { +private static long recordSizeInBytes(final long keyBytes, + final long valueBytes, + final String topic, + final Headers headers) { long headerSizeInBytes = 0L; if (headers != null) { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ClientUtilsTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ClientUtilsTest.java index 775268496e..d715a3f975 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ClientUtilsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ClientUtilsTest.java @@ -69,10 +69,11 @@ public class ClientUtilsTest { private static final Headers HEADERS = new RecordHeaders(asList( new RecordHeader("h1", "headerVal1".getBytes()), // 2 + 10 --> 12 bytes -new RecordHeader("h2", "headerVal2".getBytes()) -));// 2 + 10 --> 12 bytes +new RecordHeader("h2", "headerVal2".getBytes())// 2 + 10 --> 12 bytes +)); private static final int HEADERS_BYTES = 24; +// 20 bytes private static final int RECORD_METADATA_BYTES = 8 + // timestamp 8 + // offset @@ -86,6 +87,14 @@ public class ClientUtilsTest { HEADERS_BYTES + RECORD_METADATA_BYTES; +// 54 bytes +private static final long NULL_KEY_SIZE_IN_BYTES = +VALUE_BYTES + +TOPIC_BYTES + +HEADERS_BYTES + +RECORD_METADATA_BYTES; + +// 52 bytes private static final long TOMBSTONE_SIZE_IN_BYTES = KEY_BYTES + TOPIC_BYTES + @@ -202,6 +211,37 @@ public class ClientUtilsTest { assertThat(producerRecordSizeInBytes(record), equalTo(SIZE_IN_BYTES)); } +@Test +public void shouldComputeSizeInBytesForConsumerRecordWithNullKey() { +final ConsumerRecord record = new ConsumerRecord<>( +TOPIC, +1, +0, +0L, +TimestampType.CREATE_TIME, +0, +5, +null, +VALUE, +HEADERS, +Optional.empty() +); +assertThat(consumerRecordSizeInBytes(record), equalTo(NULL_KEY_SIZE_IN_BYTES)); +} + +@Test +public void shouldComputeSizeInBytesForProducerRecordWithNullKey() { +final ProducerRecord record = new ProducerRecord<>( +TOPIC, +1, +0L, +null, +VALUE, +HEADERS +); +
[kafka] branch trunk updated: KAFKA-13935 Fix static usages of IBP in KRaft mode (#12250)
This is an automated email from the ASF dual-hosted git repository. davidarthur pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new cc384054c6 KAFKA-13935 Fix static usages of IBP in KRaft mode (#12250) cc384054c6 is described below commit cc384054c6e63abd011f7687523b1292062b049d Author: David Arthur AuthorDate: Mon Jun 13 14:23:28 2022 -0400 KAFKA-13935 Fix static usages of IBP in KRaft mode (#12250) * Set the minimum supported MetadataVersion to 3.0-IV1 * Remove MetadataVersion.UNINITIALIZED * Relocate RPC version mapping for fetch protocols into MetadataVersion * Replace static IBP calls with dynamic calls to MetadataCache A side effect of removing the UNINITIALIZED metadata version is that the FeatureControlManager and FeatureImage will initialize themselves with the minimum KRaft version (3.0-IV1). The rationale for setting the minimum version to 3.0-IV1 is so that we can avoid any cases of KRaft mode running with an old log message format (KIP-724 was introduced in 3.0-IV1). As a side-effect of increasing this minimum version, the feature level values decreased by one. Reviewers: Jason Gustafson , Jun Rao --- .../main/scala/kafka/server/BrokerFeatures.scala | 2 +- core/src/main/scala/kafka/server/KafkaConfig.scala | 51 +--- .../main/scala/kafka/server/KafkaRaftServer.scala | 16 ++-- .../scala/kafka/server/RemoteLeaderEndPoint.scala | 22 -- .../scala/kafka/server/ReplicaFetcherManager.scala | 9 ++- .../scala/kafka/server/ReplicaFetcherThread.scala | 8 +- .../main/scala/kafka/server/ReplicaManager.scala | 6 +- .../server/metadata/BrokerMetadataPublisher.scala | 6 +- core/src/main/scala/kafka/tools/StorageTool.scala | 4 +- core/src/test/java/kafka/test/ClusterConfig.java | 16 ++-- .../java/kafka/test/ClusterTestExtensionsTest.java | 6 ++ .../java/kafka/test/annotation/ClusterTest.java| 2 +- .../kafka/test/junit/ClusterTestExtensions.java| 8 +- .../test/junit/RaftClusterInvocationContext.java | 3 +- .../test/junit/ZkClusterInvocationContext.java | 2 +- .../java/kafka/testkit/KafkaClusterTestKit.java| 2 +- .../transaction/ProducerIdsIntegrationTest.scala | 2 +- .../server/MetadataVersionIntegrationTest.scala| 9 ++- .../scala/unit/kafka/server/KafkaConfigTest.scala | 28 +++ .../unit/kafka/server/KafkaRaftServerTest.scala| 55 - .../kafka/server/ReplicaFetcherThreadTest.scala| 24 +++--- .../unit/kafka/server/ReplicaManagerTest.scala | 13 +++- .../metadata/BrokerMetadataListenerTest.scala | 2 +- .../jmh/fetcher/ReplicaFetcherThreadBenchmark.java | 6 +- .../apache/kafka/controller/BootstrapMetadata.java | 27 --- .../kafka/controller/ClusterControlManager.java| 6 +- .../kafka/controller/FeatureControlManager.java| 24 -- .../apache/kafka/controller/QuorumController.java | 17 ++-- .../apache/kafka/controller/QuorumFeatures.java| 4 +- .../java/org/apache/kafka/image/FeaturesImage.java | 11 ++- .../java/org/apache/kafka/image/MetadataImage.java | 5 -- .../kafka/controller/BootstrapMetadataTest.java| 16 ++-- .../controller/ClusterControlManagerTest.java | 2 +- .../controller/FeatureControlManagerTest.java | 19 +++-- .../kafka/controller/QuorumControllerTest.java | 24 +- .../kafka/controller/QuorumControllerTestEnv.java | 6 +- .../kafka/server/common/MetadataVersion.java | 90 ++ .../kafka/server/common/MetadataVersionTest.java | 8 +- 38 files changed, 379 insertions(+), 182 deletions(-) diff --git a/core/src/main/scala/kafka/server/BrokerFeatures.scala b/core/src/main/scala/kafka/server/BrokerFeatures.scala index d385f1eb07..70ef7c71cb 100644 --- a/core/src/main/scala/kafka/server/BrokerFeatures.scala +++ b/core/src/main/scala/kafka/server/BrokerFeatures.scala @@ -73,7 +73,7 @@ object BrokerFeatures extends Logging { def createDefault(): BrokerFeatures = { new BrokerFeatures(Features.supportedFeatures( java.util.Collections.singletonMap(MetadataVersion.FEATURE_NAME, -new SupportedVersionRange(MetadataVersion.IBP_3_0_IV0.featureLevel(), MetadataVersion.latest().featureLevel() +new SupportedVersionRange(MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel(), MetadataVersion.latest().featureLevel() } def createEmpty(): BrokerFeatures = { diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 3d7df18cbb..b4e0b9449c 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -1790,38 +1790,25 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami // We keep the user-provided String as `MetadataVersion.fromVersionString`
[kafka] branch trunk updated: KAFKA-13846: Adding overloaded metricOrElseCreate method (#12121)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 5cab11cf52 KAFKA-13846: Adding overloaded metricOrElseCreate method (#12121) 5cab11cf52 is described below commit 5cab11cf525f6c06fcf9eb43f7f95ef33fe1cdbb Author: vamossagar12 AuthorDate: Mon Jun 13 23:06:39 2022 +0530 KAFKA-13846: Adding overloaded metricOrElseCreate method (#12121) Reviewers: David Jacot , Justine Olshan , Guozhang Wang --- .../org/apache/kafka/common/metrics/Metrics.java | 40 +-- .../org/apache/kafka/common/metrics/Sensor.java| 10 - .../kafka/connect/runtime/ConnectMetrics.java | 8 +--- .../internals/metrics/StreamsMetricsImplTest.java | 46 ++ 4 files changed, 92 insertions(+), 12 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java index 52b7794a4c..398819016c 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java @@ -509,7 +509,10 @@ public class Metrics implements Closeable { Objects.requireNonNull(metricValueProvider), config == null ? this.config : config, time); -registerMetric(m); +KafkaMetric existingMetric = registerMetric(m); +if (existingMetric != null) { +throw new IllegalArgumentException("A metric named '" + metricName + "' already exists, can't register another one."); +} } /** @@ -524,6 +527,26 @@ public class Metrics implements Closeable { addMetric(metricName, null, metricValueProvider); } +/** + * Create or get an existing metric to monitor an object that implements MetricValueProvider. + * This metric won't be associated with any sensor. This is a way to expose existing values as metrics. + * This method takes care of synchronisation while updating/accessing metrics by concurrent threads. + * + * @param metricName The name of the metric + * @param metricValueProvider The metric value provider associated with this metric + * @return Existing KafkaMetric if already registered or else a newly created one + */ +public KafkaMetric addMetricIfAbsent(MetricName metricName, MetricConfig config, MetricValueProvider metricValueProvider) { +KafkaMetric metric = new KafkaMetric(new Object(), +Objects.requireNonNull(metricName), +Objects.requireNonNull(metricValueProvider), +config == null ? this.config : config, +time); + +KafkaMetric existingMetric = registerMetric(metric); +return existingMetric == null ? metric : existingMetric; +} + /** * Remove a metric if it exists and return it. Return null otherwise. If a metric is removed, `metricRemoval` * will be invoked for each reporter. @@ -563,10 +586,18 @@ public class Metrics implements Closeable { } } -synchronized void registerMetric(KafkaMetric metric) { +/** + * Register a metric if not present or return an already existing metric otherwise. + * When a metric is newly registered, this method returns null + * + * @param metric The KafkaMetric to register + * @return KafkaMetric if the metric already exists, null otherwise + */ +synchronized KafkaMetric registerMetric(KafkaMetric metric) { MetricName metricName = metric.metricName(); -if (this.metrics.containsKey(metricName)) -throw new IllegalArgumentException("A metric named '" + metricName + "' already exists, can't register another one."); +if (this.metrics.containsKey(metricName)) { +return this.metrics.get(metricName); +} this.metrics.put(metricName, metric); for (MetricsReporter reporter : reporters) { try { @@ -576,6 +607,7 @@ public class Metrics implements Closeable { } } log.trace("Registered metric named {}", metricName); +return null; } /** diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java index 5ae3b8d997..25f3c21a31 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java @@ -297,7 +297,10 @@ public final class Sensor { for (NamedMeasurable m : stat.stats()) { final KafkaMetric metric = new KafkaMetric(lock, m.name(), m.stat(), statConfig, time); if
[kafka] branch trunk updated: MINOR: Use Exit.addShutdownHook instead of directly adding hooks to Runtime (#12283)
This is an automated email from the ASF dual-hosted git repository. mimaison pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 4426b05e54 MINOR: Use Exit.addShutdownHook instead of directly adding hooks to Runtime (#12283) 4426b05e54 is described below commit 4426b05e549e97c9d1f544083f4dcb71e109b819 Author: Divij Vaidya AuthorDate: Mon Jun 13 17:25:40 2022 +0200 MINOR: Use Exit.addShutdownHook instead of directly adding hooks to Runtime (#12283) Reviewers: Mickael Maison , Igor Soarez , Kvicii --- .../main/java/org/apache/kafka/server/metrics/KafkaYammerMetrics.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server-common/src/main/java/org/apache/kafka/server/metrics/KafkaYammerMetrics.java b/server-common/src/main/java/org/apache/kafka/server/metrics/KafkaYammerMetrics.java index e4b7afc3ea..329083350c 100644 --- a/server-common/src/main/java/org/apache/kafka/server/metrics/KafkaYammerMetrics.java +++ b/server-common/src/main/java/org/apache/kafka/server/metrics/KafkaYammerMetrics.java @@ -22,6 +22,7 @@ import com.yammer.metrics.core.MetricsRegistry; import org.apache.kafka.common.Reconfigurable; import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.metrics.JmxReporter; +import org.apache.kafka.common.utils.Exit; import org.apache.kafka.common.utils.Sanitizer; import java.util.Iterator; @@ -57,7 +58,7 @@ public class KafkaYammerMetrics implements Reconfigurable { private KafkaYammerMetrics() { jmxReporter.start(); -Runtime.getRuntime().addShutdownHook(new Thread(jmxReporter::shutdown)); +Exit.addShutdownHook("kafka-jmx-shutdown-hook", jmxReporter::shutdown); } @Override
[kafka] branch trunk updated: KAFKA-13436: Omitted BrokerTopicMetrics metrics in the documentation (#11473)
This is an automated email from the ASF dual-hosted git repository. mimaison pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 89c057397e KAFKA-13436: Omitted BrokerTopicMetrics metrics in the documentation (#11473) 89c057397e is described below commit 89c057397ec5ae320aee7e12e1a984b2c14a7fa5 Author: Lee Dongjin AuthorDate: Mon Jun 13 23:46:19 2022 +0900 KAFKA-13436: Omitted BrokerTopicMetrics metrics in the documentation (#11473) Reviewers: Mickael Maison --- docs/ops.html | 61 +-- 1 file changed, 43 insertions(+), 18 deletions(-) diff --git a/docs/ops.html b/docs/ops.html index 5ace510c4c..76fee9cc86 100644 --- a/docs/ops.html +++ b/docs/ops.html @@ -1401,18 +1401,18 @@ $ bin/kafka-acls.sh \ Message in rate -kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec - + kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec,topic=([-.\w]+) +Incoming message rate per topic. Omitting 'topic=(...)' will yield the all-topic rate. Byte in rate from clients -kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec - + kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec,topic=([-.\w]+) +Byte in (from the clients) rate per topic. Omitting 'topic=(...)' will yield the all-topic rate. Byte in rate from other brokers - kafka.server:type=BrokerTopicMetrics,name=ReplicationBytesInPerSec - + kafka.server:type=BrokerTopicMetrics,name=ReplicationBytesInPerSec,topic=([-.\w]+) +Byte in (from the other brokers) rate per topic. Omitting 'topic=(...)' will yield the all-topic rate. Controller Request rate from Broker @@ -1441,7 +1441,27 @@ $ bin/kafka-acls.sh \ Error rate kafka.network:type=RequestMetrics,name=ErrorsPerSec,request=([-.\w]+),error=([-.\w]+) Number of errors in responses counted per-request-type, per-error-code. If a response contains -multiple errors, all are counted. error=NONE indicates successful responses. + multiple errors, all are counted. error=NONE indicates successful responses. + + +Produce request rate + kafka.server:type=BrokerTopicMetrics,name=TotalProduceRequestsPerSec,topic=([-.\w]+) +Produce request rate per topic. Omitting 'topic=(...)' will yield the all-topic rate. + + +Fetch request rate + kafka.server:type=BrokerTopicMetrics,name=TotalFetchRequestsPerSec,topic=([-.\w]+) +Fetch request (from clients or followers) rate per topic. Omitting 'topic=(...)' will yield the all-topic rate. + + +Failed produce request rate + kafka.server:type=BrokerTopicMetrics,name=FailedProduceRequestsPerSec,topic=([-.\w]+) +Failed Produce request rate per topic. Omitting 'topic=(...)' will yield the all-topic rate. + + +Failed fetch request rate + kafka.server:type=BrokerTopicMetrics,name=FailedFetchRequestsPerSec,topic=([-.\w]+) +Failed Fetch request (from clients or followers) rate per topic. Omitting 'topic=(...)' will yield the all-topic rate. Request size in bytes @@ -1461,7 +1481,7 @@ $ bin/kafka-acls.sh \ Message conversion rate kafka.server:type=BrokerTopicMetrics,name={Produce|Fetch}MessageConversionsPerSec,topic=([-.\w]+) -Number of records which required message format conversion. +Message format conversion rate, for Produce or Fetch requests, per topic. Omitting 'topic=(...)' will yield the all-topic rate. Request Queue Size @@ -1470,33 +1490,38 @@ $ bin/kafka-acls.sh \ Byte out rate to clients -kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec - + kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec,topic=([-.\w]+) +Byte out (to the clients) rate per topic. Omitting 'topic=(...)' will yield the all-topic rate. Byte out rate to other brokers - kafka.server:type=BrokerTopicMetrics,name=ReplicationBytesOutPerSec - + kafka.server:type=BrokerTopicMetrics,name=ReplicationBytesOutPerSec,topic=([-.\w]+) +Byte out (to the other brokers) rate per topic. Omitting 'topic=(...)' will yield the all-topic rate. + + +Rejected byte rate + kafka.server:type=BrokerTopicMetrics,name=BytesRejectedPerSec,topic=([-.\w]+) +Rejected byte rate per topic, due to the record batch size being greater than max.message.bytes configuration. Omitting 'topic=(...)' will yield the all-topic rate. Message validation failure rate due to no key specified for