[ 
https://issues.apache.org/jira/browse/KAFKA-5846?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16653219#comment-16653219
 ] 

ASF GitHub Bot commented on KAFKA-5846:
---------------------------------------

kamalcph closed pull request #3815: KAFKA-5846; Used Singleton 
NoOpConsumerRebalanceListener in the Kafka…
URL: https://github.com/apache/kafka/pull/3815
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java
index 74e8b060c73..2f341dc3d84 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java
@@ -79,6 +79,14 @@
  */
 public interface ConsumerRebalanceListener {
 
+    ConsumerRebalanceListener NO_OP = new ConsumerRebalanceListener() {
+        @Override
+        public void onPartitionsRevoked(Collection<TopicPartition> partitions) 
{}
+
+        @Override
+        public void onPartitionsAssigned(Collection<TopicPartition> 
partitions) {}
+    };
+
     /**
      * A callback method the user can implement to provide handling of offset 
commits to a customized store on the start
      * of a rebalance operation. This method will be called before a rebalance 
operation starts and after the consumer
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 33e037c3ced..0348817cec8 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -26,7 +26,6 @@
 import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
 import org.apache.kafka.clients.consumer.internals.Fetcher;
 import org.apache.kafka.clients.consumer.internals.Heartbeat;
-import 
org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
 import org.apache.kafka.clients.consumer.internals.SubscriptionState;
 import org.apache.kafka.common.Cluster;
@@ -951,7 +950,7 @@ public void subscribe(Collection<String> topics, 
ConsumerRebalanceListener liste
      */
     @Override
     public void subscribe(Collection<String> topics) {
-        subscribe(topics, new NoOpConsumerRebalanceListener());
+        subscribe(topics, ConsumerRebalanceListener.NO_OP);
     }
 
     /**
@@ -1009,7 +1008,7 @@ public void subscribe(Pattern pattern, 
ConsumerRebalanceListener listener) {
      */
     @Override
     public void subscribe(Pattern pattern) {
-        subscribe(pattern, new NoOpConsumerRebalanceListener());
+        subscribe(pattern, ConsumerRebalanceListener.NO_OP);
     }
 
     /**
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index cf1b07fabe2..12c388234a7 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.clients.consumer;
 
-import 
org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.internals.SubscriptionState;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Metric;
@@ -95,7 +94,7 @@ public synchronized void rebalance(Collection<TopicPartition> 
newAssignment) {
 
     @Override
     public synchronized void subscribe(Collection<String> topics) {
-        subscribe(topics, new NoOpConsumerRebalanceListener());
+        subscribe(topics, ConsumerRebalanceListener.NO_OP);
     }
 
     @Override
@@ -123,7 +122,7 @@ public synchronized void subscribe(Pattern pattern, final 
ConsumerRebalanceListe
 
     @Override
     public synchronized void subscribe(Pattern pattern) {
-        subscribe(pattern, new NoOpConsumerRebalanceListener());
+        subscribe(pattern, ConsumerRebalanceListener.NO_OP);
     }
 
     @Override
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 1a82faa0a3d..fc39dcce46b 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -113,7 +113,7 @@
 
 @SuppressWarnings("deprecation")
 public class FetcherTest {
-    private ConsumerRebalanceListener listener = new 
NoOpConsumerRebalanceListener();
+    private ConsumerRebalanceListener listener = 
ConsumerRebalanceListener.NO_OP;
     private String topicName = "test";
     private String groupId = "test-group";
     private final String metricGroup = "consumer" + groupId + 
"-fetch-manager-metrics";
diff --git 
a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 7c341e6fb9b..f79d514455b 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -27,7 +27,6 @@ import kafka.server.{BaseRequestTest, KafkaConfig}
 import kafka.utils.TestUtils
 import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig, 
NewPartitions}
 import org.apache.kafka.clients.consumer._
-import 
org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener
 import org.apache.kafka.clients.producer._
 import org.apache.kafka.common.acl.{AccessControlEntry, 
AccessControlEntryFilter, AclBinding, AclBindingFilter, AclOperation, 
AclPermissionType}
 import org.apache.kafka.common.config.ConfigResource
@@ -591,7 +590,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     val producer = createProducer()
     sendRecords(producer, 1, tp)
     removeAllAcls()
-    
+
     val consumer = createConsumer()
     consumer.assign(List(tp).asJava)
     consumeRecords(consumer)
@@ -713,7 +712,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, 
Read)), groupResource)
 
     val consumer = createConsumer()
-    consumer.subscribe(Pattern.compile(topicPattern), new 
NoOpConsumerRebalanceListener)
+    consumer.subscribe(Pattern.compile(topicPattern), 
ConsumerRebalanceListener.NO_OP)
     consumer.poll(50)
     assertTrue(consumer.subscription.isEmpty)
   }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Use singleton NoOpConsumerRebalanceListener in subscribe() call where 
> listener is not specified
> -----------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-5846
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5846
>             Project: Kafka
>          Issue Type: Task
>          Components: clients
>            Reporter: Ted Yu
>            Priority: Minor
>
> Currently KafkaConsumer creates instance of NoOpConsumerRebalanceListener for 
> each subscribe() call where ConsumerRebalanceListener is not specified:
> {code}
>     public void subscribe(Pattern pattern) {
>         subscribe(pattern, new NoOpConsumerRebalanceListener());
> {code}
> We can create a singleton NoOpConsumerRebalanceListener to be used in such 
> scenarios.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to