[kafka] branch trunk updated: HOTFIX: null check keys of ProducerRecord when computing sizeInBytes (#12288)

2022-06-13 Thread ableegoldman
This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new 3189a8648f HOTFIX: null check keys of ProducerRecord when computing 
sizeInBytes (#12288)
3189a8648f is described below

commit 3189a8648f3abf3e16c5932dfa37e0d28d97a016
Author: A. Sophie Blee-Goldman 
AuthorDate: Mon Jun 13 22:27:06 2022 -0700

HOTFIX: null check keys of ProducerRecord when computing sizeInBytes 
(#12288)

Minor followup to #12235 that adds a null check on the record key in the 
new ClientUtils#producerRecordSizeInBytes utility method, as there are valid 
cases in which we might be sending records with null keys to the Producer, such 
as a simple 
builder.stream("non-keyed-input-topic").filter(...).to("output-topic")

Reviewers: Matthias J. Sax , Bill Bejeck 

---
 .../streams/processor/internals/ClientUtils.java   | 10 ++---
 .../processor/internals/ClientUtilsTest.java   | 44 +-
 .../org/apache/kafka/streams/TestTopicsTest.java   | 12 ++
 3 files changed, 50 insertions(+), 16 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ClientUtils.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ClientUtils.java
index ea48c64617..1177e29d82 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ClientUtils.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ClientUtils.java
@@ -174,7 +174,7 @@ public class ClientUtils {
 
 public static long producerRecordSizeInBytes(final ProducerRecord record) {
 return recordSizeInBytes(
-record.key().length,
+record.key() == null ? 0 : record.key().length,
 record.value() == null ? 0 : record.value().length,
 record.topic(),
 record.headers()
@@ -190,10 +190,10 @@ public class ClientUtils {
 );
 }
 
-public static long recordSizeInBytes(final long keyBytes,
- final long valueBytes,
- final String topic,
- final Headers headers) {
+private static long recordSizeInBytes(final long keyBytes,
+  final long valueBytes,
+  final String topic,
+  final Headers headers) {
 long headerSizeInBytes = 0L;
 
 if (headers != null) {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ClientUtilsTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ClientUtilsTest.java
index 775268496e..d715a3f975 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ClientUtilsTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ClientUtilsTest.java
@@ -69,10 +69,11 @@ public class ClientUtilsTest {
 
 private static final Headers HEADERS = new RecordHeaders(asList(
 new RecordHeader("h1", "headerVal1".getBytes()),   // 2 + 10 --> 12 
bytes
-new RecordHeader("h2", "headerVal2".getBytes())
-));// 2 + 10 --> 12 bytes
+new RecordHeader("h2", "headerVal2".getBytes())// 2 + 10 --> 12 
bytes
+));
 private static final int HEADERS_BYTES = 24;
 
+// 20 bytes
 private static final int RECORD_METADATA_BYTES =
 8 + // timestamp
 8 + // offset
@@ -86,6 +87,14 @@ public class ClientUtilsTest {
 HEADERS_BYTES +
 RECORD_METADATA_BYTES;
 
+// 54 bytes
+private static final long NULL_KEY_SIZE_IN_BYTES =
+VALUE_BYTES +
+TOPIC_BYTES +
+HEADERS_BYTES +
+RECORD_METADATA_BYTES;
+
+// 52 bytes
 private static final long TOMBSTONE_SIZE_IN_BYTES =
 KEY_BYTES +
 TOPIC_BYTES +
@@ -202,6 +211,37 @@ public class ClientUtilsTest {
 assertThat(producerRecordSizeInBytes(record), equalTo(SIZE_IN_BYTES));
 }
 
+@Test
+public void shouldComputeSizeInBytesForConsumerRecordWithNullKey() {
+final ConsumerRecord record = new ConsumerRecord<>(
+TOPIC,
+1,
+0,
+0L,
+TimestampType.CREATE_TIME,
+0,
+5,
+null,
+VALUE,
+HEADERS,
+Optional.empty()
+);
+assertThat(consumerRecordSizeInBytes(record), 
equalTo(NULL_KEY_SIZE_IN_BYTES));
+}
+
+@Test
+public void shouldComputeSizeInBytesForProducerRecordWithNullKey() {
+final ProducerRecord record = new ProducerRecord<>(
+TOPIC,
+1,
+0L,
+null,
+VALUE,
+HEADERS
+);
+

[kafka] branch trunk updated: KAFKA-13935 Fix static usages of IBP in KRaft mode (#12250)

2022-06-13 Thread davidarthur
This is an automated email from the ASF dual-hosted git repository.

davidarthur pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new cc384054c6 KAFKA-13935 Fix static usages of IBP in KRaft mode (#12250)
cc384054c6 is described below

commit cc384054c6e63abd011f7687523b1292062b049d
Author: David Arthur 
AuthorDate: Mon Jun 13 14:23:28 2022 -0400

KAFKA-13935 Fix static usages of IBP in KRaft mode (#12250)

* Set the minimum supported MetadataVersion to 3.0-IV1
* Remove MetadataVersion.UNINITIALIZED
* Relocate RPC version mapping for fetch protocols into MetadataVersion
* Replace static IBP calls with dynamic calls to MetadataCache

A side effect of removing the UNINITIALIZED metadata version is that the 
FeatureControlManager and FeatureImage will initialize themselves with the 
minimum KRaft version (3.0-IV1).

The rationale for setting the minimum version to 3.0-IV1 is so that we can 
avoid any cases of KRaft mode running with an old log message format (KIP-724 
was introduced in 3.0-IV1). As a side-effect of increasing this minimum 
version, the feature level values decreased by one.

Reviewers: Jason Gustafson , Jun Rao 
---
 .../main/scala/kafka/server/BrokerFeatures.scala   |  2 +-
 core/src/main/scala/kafka/server/KafkaConfig.scala | 51 +---
 .../main/scala/kafka/server/KafkaRaftServer.scala  | 16 ++--
 .../scala/kafka/server/RemoteLeaderEndPoint.scala  | 22 --
 .../scala/kafka/server/ReplicaFetcherManager.scala |  9 ++-
 .../scala/kafka/server/ReplicaFetcherThread.scala  |  8 +-
 .../main/scala/kafka/server/ReplicaManager.scala   |  6 +-
 .../server/metadata/BrokerMetadataPublisher.scala  |  6 +-
 core/src/main/scala/kafka/tools/StorageTool.scala  |  4 +-
 core/src/test/java/kafka/test/ClusterConfig.java   | 16 ++--
 .../java/kafka/test/ClusterTestExtensionsTest.java |  6 ++
 .../java/kafka/test/annotation/ClusterTest.java|  2 +-
 .../kafka/test/junit/ClusterTestExtensions.java|  8 +-
 .../test/junit/RaftClusterInvocationContext.java   |  3 +-
 .../test/junit/ZkClusterInvocationContext.java |  2 +-
 .../java/kafka/testkit/KafkaClusterTestKit.java|  2 +-
 .../transaction/ProducerIdsIntegrationTest.scala   |  2 +-
 .../server/MetadataVersionIntegrationTest.scala|  9 ++-
 .../scala/unit/kafka/server/KafkaConfigTest.scala  | 28 +++
 .../unit/kafka/server/KafkaRaftServerTest.scala| 55 -
 .../kafka/server/ReplicaFetcherThreadTest.scala| 24 +++---
 .../unit/kafka/server/ReplicaManagerTest.scala | 13 +++-
 .../metadata/BrokerMetadataListenerTest.scala  |  2 +-
 .../jmh/fetcher/ReplicaFetcherThreadBenchmark.java |  6 +-
 .../apache/kafka/controller/BootstrapMetadata.java | 27 ---
 .../kafka/controller/ClusterControlManager.java|  6 +-
 .../kafka/controller/FeatureControlManager.java| 24 --
 .../apache/kafka/controller/QuorumController.java  | 17 ++--
 .../apache/kafka/controller/QuorumFeatures.java|  4 +-
 .../java/org/apache/kafka/image/FeaturesImage.java | 11 ++-
 .../java/org/apache/kafka/image/MetadataImage.java |  5 --
 .../kafka/controller/BootstrapMetadataTest.java| 16 ++--
 .../controller/ClusterControlManagerTest.java  |  2 +-
 .../controller/FeatureControlManagerTest.java  | 19 +++--
 .../kafka/controller/QuorumControllerTest.java | 24 +-
 .../kafka/controller/QuorumControllerTestEnv.java  |  6 +-
 .../kafka/server/common/MetadataVersion.java   | 90 ++
 .../kafka/server/common/MetadataVersionTest.java   |  8 +-
 38 files changed, 379 insertions(+), 182 deletions(-)

diff --git a/core/src/main/scala/kafka/server/BrokerFeatures.scala 
b/core/src/main/scala/kafka/server/BrokerFeatures.scala
index d385f1eb07..70ef7c71cb 100644
--- a/core/src/main/scala/kafka/server/BrokerFeatures.scala
+++ b/core/src/main/scala/kafka/server/BrokerFeatures.scala
@@ -73,7 +73,7 @@ object BrokerFeatures extends Logging {
   def createDefault(): BrokerFeatures = {
 new BrokerFeatures(Features.supportedFeatures(
   java.util.Collections.singletonMap(MetadataVersion.FEATURE_NAME,
-new SupportedVersionRange(MetadataVersion.IBP_3_0_IV0.featureLevel(), 
MetadataVersion.latest().featureLevel()
+new 
SupportedVersionRange(MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel(), 
MetadataVersion.latest().featureLevel()
   }
 
   def createEmpty(): BrokerFeatures = {
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 3d7df18cbb..b4e0b9449c 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -1790,38 +1790,25 @@ class KafkaConfig private(doLog: Boolean, val props: 
java.util.Map[_, _], dynami
   // We keep the user-provided String as `MetadataVersion.fromVersionString` 

[kafka] branch trunk updated: KAFKA-13846: Adding overloaded metricOrElseCreate method (#12121)

2022-06-13 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new 5cab11cf52 KAFKA-13846: Adding overloaded metricOrElseCreate method 
(#12121)
5cab11cf52 is described below

commit 5cab11cf525f6c06fcf9eb43f7f95ef33fe1cdbb
Author: vamossagar12 
AuthorDate: Mon Jun 13 23:06:39 2022 +0530

KAFKA-13846: Adding overloaded metricOrElseCreate method (#12121)

Reviewers: David Jacot , Justine Olshan 
, Guozhang Wang 
---
 .../org/apache/kafka/common/metrics/Metrics.java   | 40 +--
 .../org/apache/kafka/common/metrics/Sensor.java| 10 -
 .../kafka/connect/runtime/ConnectMetrics.java  |  8 +---
 .../internals/metrics/StreamsMetricsImplTest.java  | 46 ++
 4 files changed, 92 insertions(+), 12 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java 
b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
index 52b7794a4c..398819016c 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
@@ -509,7 +509,10 @@ public class Metrics implements Closeable {
 
Objects.requireNonNull(metricValueProvider),
 config == null ? this.config : config,
 time);
-registerMetric(m);
+KafkaMetric existingMetric = registerMetric(m);
+if (existingMetric != null) {
+throw new IllegalArgumentException("A metric named '" + metricName 
+ "' already exists, can't register another one.");
+}
 }
 
 /**
@@ -524,6 +527,26 @@ public class Metrics implements Closeable {
 addMetric(metricName, null, metricValueProvider);
 }
 
+/**
+ * Create or get an existing metric to monitor an object that implements 
MetricValueProvider.
+ * This metric won't be associated with any sensor. This is a way to 
expose existing values as metrics.
+ * This method takes care of synchronisation while updating/accessing 
metrics by concurrent threads.
+ *
+ * @param metricName The name of the metric
+ * @param metricValueProvider The metric value provider associated with 
this metric
+ * @return Existing KafkaMetric if already registered or else a newly 
created one
+ */
+public KafkaMetric addMetricIfAbsent(MetricName metricName, MetricConfig 
config, MetricValueProvider metricValueProvider) {
+KafkaMetric metric = new KafkaMetric(new Object(),
+Objects.requireNonNull(metricName),
+Objects.requireNonNull(metricValueProvider),
+config == null ? this.config : config,
+time);
+
+KafkaMetric existingMetric = registerMetric(metric);
+return existingMetric == null ? metric : existingMetric;
+}
+
 /**
  * Remove a metric if it exists and return it. Return null otherwise. If a 
metric is removed, `metricRemoval`
  * will be invoked for each reporter.
@@ -563,10 +586,18 @@ public class Metrics implements Closeable {
 }
 }
 
-synchronized void registerMetric(KafkaMetric metric) {
+/**
+ * Register a metric if not present or return an already existing metric 
otherwise.
+ * When a metric is newly registered, this method returns null
+ *
+ * @param metric The KafkaMetric to register
+ * @return KafkaMetric if the metric already exists, null otherwise
+ */
+synchronized KafkaMetric registerMetric(KafkaMetric metric) {
 MetricName metricName = metric.metricName();
-if (this.metrics.containsKey(metricName))
-throw new IllegalArgumentException("A metric named '" + metricName 
+ "' already exists, can't register another one.");
+if (this.metrics.containsKey(metricName)) {
+return this.metrics.get(metricName);
+}
 this.metrics.put(metricName, metric);
 for (MetricsReporter reporter : reporters) {
 try {
@@ -576,6 +607,7 @@ public class Metrics implements Closeable {
 }
 }
 log.trace("Registered metric named {}", metricName);
+return null;
 }
 
 /**
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java 
b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
index 5ae3b8d997..25f3c21a31 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
@@ -297,7 +297,10 @@ public final class Sensor {
 for (NamedMeasurable m : stat.stats()) {
 final KafkaMetric metric = new KafkaMetric(lock, m.name(), 
m.stat(), statConfig, time);
 if 

[kafka] branch trunk updated: MINOR: Use Exit.addShutdownHook instead of directly adding hooks to Runtime (#12283)

2022-06-13 Thread mimaison
This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new 4426b05e54 MINOR: Use Exit.addShutdownHook instead of directly adding 
hooks to Runtime (#12283)
4426b05e54 is described below

commit 4426b05e549e97c9d1f544083f4dcb71e109b819
Author: Divij Vaidya 
AuthorDate: Mon Jun 13 17:25:40 2022 +0200

MINOR: Use Exit.addShutdownHook instead of directly adding hooks to Runtime 
(#12283)


Reviewers: Mickael Maison , Igor Soarez  
, Kvicii 
---
 .../main/java/org/apache/kafka/server/metrics/KafkaYammerMetrics.java  | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git 
a/server-common/src/main/java/org/apache/kafka/server/metrics/KafkaYammerMetrics.java
 
b/server-common/src/main/java/org/apache/kafka/server/metrics/KafkaYammerMetrics.java
index e4b7afc3ea..329083350c 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/metrics/KafkaYammerMetrics.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/metrics/KafkaYammerMetrics.java
@@ -22,6 +22,7 @@ import com.yammer.metrics.core.MetricsRegistry;
 import org.apache.kafka.common.Reconfigurable;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.metrics.JmxReporter;
+import org.apache.kafka.common.utils.Exit;
 import org.apache.kafka.common.utils.Sanitizer;
 
 import java.util.Iterator;
@@ -57,7 +58,7 @@ public class KafkaYammerMetrics implements Reconfigurable {
 
 private KafkaYammerMetrics() {
 jmxReporter.start();
-Runtime.getRuntime().addShutdownHook(new 
Thread(jmxReporter::shutdown));
+Exit.addShutdownHook("kafka-jmx-shutdown-hook", jmxReporter::shutdown);
 }
 
 @Override



[kafka] branch trunk updated: KAFKA-13436: Omitted BrokerTopicMetrics metrics in the documentation (#11473)

2022-06-13 Thread mimaison
This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new 89c057397e KAFKA-13436: Omitted BrokerTopicMetrics metrics in the 
documentation (#11473)
89c057397e is described below

commit 89c057397ec5ae320aee7e12e1a984b2c14a7fa5
Author: Lee Dongjin 
AuthorDate: Mon Jun 13 23:46:19 2022 +0900

KAFKA-13436: Omitted BrokerTopicMetrics metrics in the documentation 
(#11473)


Reviewers: Mickael Maison 
---
 docs/ops.html | 61 +--
 1 file changed, 43 insertions(+), 18 deletions(-)

diff --git a/docs/ops.html b/docs/ops.html
index 5ace510c4c..76fee9cc86 100644
--- a/docs/ops.html
+++ b/docs/ops.html
@@ -1401,18 +1401,18 @@ $ bin/kafka-acls.sh \
   
   
 Message in rate
-kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec
-
+
kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec,topic=([-.\w]+)
+Incoming message rate per topic. Omitting 'topic=(...)' will yield 
the all-topic rate.
   
   
 Byte in rate from clients
-kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec
-
+
kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec,topic=([-.\w]+)
+Byte in (from the clients) rate per topic. Omitting 'topic=(...)' 
will yield the all-topic rate.
   
   
 Byte in rate from other brokers
-
kafka.server:type=BrokerTopicMetrics,name=ReplicationBytesInPerSec
-
+
kafka.server:type=BrokerTopicMetrics,name=ReplicationBytesInPerSec,topic=([-.\w]+)
+Byte in (from the other brokers) rate per topic. Omitting 
'topic=(...)' will yield the all-topic rate.
   
   
 Controller Request rate from Broker
@@ -1441,7 +1441,27 @@ $ bin/kafka-acls.sh \
 Error rate
 
kafka.network:type=RequestMetrics,name=ErrorsPerSec,request=([-.\w]+),error=([-.\w]+)
 Number of errors in responses counted per-request-type, 
per-error-code. If a response contains
-multiple errors, all are counted. error=NONE indicates successful 
responses.
+  multiple errors, all are counted. error=NONE indicates successful 
responses.
+  
+  
+Produce request rate
+
kafka.server:type=BrokerTopicMetrics,name=TotalProduceRequestsPerSec,topic=([-.\w]+)
+Produce request rate per topic. Omitting 'topic=(...)' will yield 
the all-topic rate.
+  
+  
+Fetch request rate
+
kafka.server:type=BrokerTopicMetrics,name=TotalFetchRequestsPerSec,topic=([-.\w]+)
+Fetch request (from clients or followers) rate per topic. Omitting 
'topic=(...)' will yield the all-topic rate.
+  
+  
+Failed produce request rate
+
kafka.server:type=BrokerTopicMetrics,name=FailedProduceRequestsPerSec,topic=([-.\w]+)
+Failed Produce request rate per topic. Omitting 'topic=(...)' will 
yield the all-topic rate.
+  
+  
+Failed fetch request rate
+
kafka.server:type=BrokerTopicMetrics,name=FailedFetchRequestsPerSec,topic=([-.\w]+)
+Failed Fetch request (from clients or followers) rate per topic. 
Omitting 'topic=(...)' will yield the all-topic rate.
   
   
 Request size in bytes
@@ -1461,7 +1481,7 @@ $ bin/kafka-acls.sh \
   
 Message conversion rate
 
kafka.server:type=BrokerTopicMetrics,name={Produce|Fetch}MessageConversionsPerSec,topic=([-.\w]+)
-Number of records which required message format conversion.
+Message format conversion rate, for Produce or Fetch requests, per 
topic. Omitting 'topic=(...)' will yield the all-topic rate.
   
   
 Request Queue Size
@@ -1470,33 +1490,38 @@ $ bin/kafka-acls.sh \
   
   
 Byte out rate to clients
-kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec
-
+
kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec,topic=([-.\w]+)
+Byte out (to the clients) rate per topic. Omitting 'topic=(...)' 
will yield the all-topic rate.
   
   
 Byte out rate to other brokers
-
kafka.server:type=BrokerTopicMetrics,name=ReplicationBytesOutPerSec
-
+
kafka.server:type=BrokerTopicMetrics,name=ReplicationBytesOutPerSec,topic=([-.\w]+)
+Byte out (to the other brokers) rate per topic. Omitting 
'topic=(...)' will yield the all-topic rate.
+  
+  
+Rejected byte rate
+
kafka.server:type=BrokerTopicMetrics,name=BytesRejectedPerSec,topic=([-.\w]+)
+Rejected byte rate per topic, due to the record batch size being 
greater than max.message.bytes configuration. Omitting 'topic=(...)' will yield 
the all-topic rate.
   
   
 Message validation failure rate due to no key specified for