This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6e8d0d9850b KAFKA-14853 the serializer/deserialize which extends 
ClusterResourceListener is not added to Metadata (#13460)
6e8d0d9850b is described below

commit 6e8d0d9850b05fc1de0ceaf77834e68939f782c1
Author: Chia-Ping Tsai <chia7...@gmail.com>
AuthorDate: Wed Mar 29 16:02:04 2023 +0800

    KAFKA-14853 the serializer/deserialize which extends 
ClusterResourceListener is not added to Metadata (#13460)
    
    Reviewers: dengziming <dengziming1...@gmail.com>
---
 .../kafka/clients/consumer/KafkaConsumer.java      |  4 +-
 .../kafka/clients/producer/KafkaProducer.java      |  4 +-
 .../integration/kafka/api/BaseConsumerTest.scala   | 47 +++++++++++++++++++++-
 3 files changed, 49 insertions(+), 6 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 0088fbb5417..8576c6e052a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -727,8 +727,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
             }
             OffsetResetStrategy offsetResetStrategy = 
OffsetResetStrategy.valueOf(config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toUpperCase(Locale.ROOT));
             this.subscriptions = new SubscriptionState(logContext, 
offsetResetStrategy);
-            ClusterResourceListeners clusterResourceListeners = 
configureClusterResourceListeners(keyDeserializer,
-                    valueDeserializer, metrics.reporters(), interceptorList);
+            ClusterResourceListeners clusterResourceListeners = 
configureClusterResourceListeners(this.keyDeserializer,
+                    this.valueDeserializer, metrics.reporters(), 
interceptorList);
             this.metadata = new ConsumerMetadata(retryBackoffMs,
                     config.getLong(ConsumerConfig.METADATA_MAX_AGE_CONFIG),
                     
!config.getBoolean(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG),
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 56ec6b0df52..9eb252b7086 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -401,8 +401,8 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
                 this.interceptors = interceptors;
             else
                 this.interceptors = new 
ProducerInterceptors<>(interceptorList);
-            ClusterResourceListeners clusterResourceListeners = 
configureClusterResourceListeners(keySerializer,
-                    valueSerializer, interceptorList, reporters);
+            ClusterResourceListeners clusterResourceListeners = 
configureClusterResourceListeners(this.keySerializer,
+                    this.valueSerializer, interceptorList, reporters);
             this.maxRequestSize = 
config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
             this.totalMemorySize = 
config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);
             this.compressionType = 
CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));
diff --git a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala 
b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
index fe560405033..08a3f32fe1a 100644
--- a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
@@ -16,12 +16,16 @@
  */
 package kafka.api
 
-import org.apache.kafka.clients.consumer.ConsumerConfig
-import org.apache.kafka.common.PartitionInfo
+import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig}
+import org.apache.kafka.common.{ClusterResource, ClusterResourceListener, 
PartitionInfo}
 import org.apache.kafka.common.internals.Topic
+import org.apache.kafka.common.serialization.{Deserializer, Serializer}
 import org.junit.jupiter.api.Test
 import org.junit.jupiter.api.Assertions._
 
+import java.util.Properties
+import java.util.concurrent.atomic.AtomicInteger
 import scala.jdk.CollectionConverters._
 import scala.collection.Seq
 
@@ -49,6 +53,27 @@ abstract class BaseConsumerTest extends AbstractConsumerTest 
{
     sendAndAwaitAsyncCommit(consumer)
   }
 
+  @Test
+  def testClusterResourceListener(): Unit = {
+    val numRecords = 100
+    val producerProps = new Properties()
+    producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
classOf[BaseConsumerTest.TestClusterResourceListenerSerializer])
+    producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
classOf[BaseConsumerTest.TestClusterResourceListenerSerializer])
+
+    val producer: KafkaProducer[Array[Byte], Array[Byte]] = 
createProducer(keySerializer = null, valueSerializer = null, producerProps)
+    val startingTimestamp = System.currentTimeMillis()
+    sendRecords(producer, numRecords, tp, startingTimestamp = 
startingTimestamp)
+
+    val consumerProps = new Properties()
+    consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
classOf[BaseConsumerTest.TestClusterResourceListenerDeserializer])
+    consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
classOf[BaseConsumerTest.TestClusterResourceListenerDeserializer])
+    val consumer: KafkaConsumer[Array[Byte], Array[Byte]] = 
createConsumer(keyDeserializer = null, valueDeserializer = null, consumerProps)
+    consumer.subscribe(List(tp.topic()).asJava)
+    consumeAndVerifyRecords(consumer = consumer, numRecords = numRecords, 
startingOffset = 0, startingTimestamp = startingTimestamp)
+    assertNotEquals(0, BaseConsumerTest.updateProducerCount.get())
+    assertNotEquals(0, BaseConsumerTest.updateConsumerCount.get())
+  }
+
   @Test
   def testCoordinatorFailover(): Unit = {
     val listener = new TestConsumerReassignmentListener()
@@ -79,3 +104,21 @@ abstract class BaseConsumerTest extends 
AbstractConsumerTest {
     ensureNoRebalance(consumer, listener)
   }
 }
+
+object BaseConsumerTest {
+  val updateProducerCount = new AtomicInteger()
+  val updateConsumerCount = new AtomicInteger()
+
+  class TestClusterResourceListenerSerializer extends Serializer[Array[Byte]] 
with ClusterResourceListener {
+
+    override def onUpdate(clusterResource: ClusterResource): Unit = 
updateProducerCount.incrementAndGet();
+
+    override def serialize(topic: String, data: Array[Byte]): Array[Byte] = 
data
+  }
+
+  class TestClusterResourceListenerDeserializer extends 
Deserializer[Array[Byte]] with ClusterResourceListener {
+
+    override def onUpdate(clusterResource: ClusterResource): Unit = 
updateConsumerCount.incrementAndGet();
+    override def deserialize(topic: String, data: Array[Byte]): Array[Byte] = 
data
+  }
+}

Reply via email to