[ https://issues.apache.org/jira/browse/KAFKA-5846?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16653219#comment-16653219 ]
ASF GitHub Bot commented on KAFKA-5846: --------------------------------------- kamalcph closed pull request #3815: KAFKA-5846; Used Singleton NoOpConsumerRebalanceListener in the Kafka… URL: https://github.com/apache/kafka/pull/3815 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java index 74e8b060c73..2f341dc3d84 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java @@ -79,6 +79,14 @@ */ public interface ConsumerRebalanceListener { + ConsumerRebalanceListener NO_OP = new ConsumerRebalanceListener() { + @Override + public void onPartitionsRevoked(Collection<TopicPartition> partitions) {} + + @Override + public void onPartitionsAssigned(Collection<TopicPartition> partitions) {} + }; + /** * A callback method the user can implement to provide handling of offset commits to a customized store on the start * of a rebalance operation. This method will be called before a rebalance operation starts and after the consumer diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 33e037c3ced..0348817cec8 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -26,7 +26,6 @@ import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient; import org.apache.kafka.clients.consumer.internals.Fetcher; import org.apache.kafka.clients.consumer.internals.Heartbeat; -import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener; import org.apache.kafka.clients.consumer.internals.PartitionAssignor; import org.apache.kafka.clients.consumer.internals.SubscriptionState; import org.apache.kafka.common.Cluster; @@ -951,7 +950,7 @@ public void subscribe(Collection<String> topics, ConsumerRebalanceListener liste */ @Override public void subscribe(Collection<String> topics) { - subscribe(topics, new NoOpConsumerRebalanceListener()); + subscribe(topics, ConsumerRebalanceListener.NO_OP); } /** @@ -1009,7 +1008,7 @@ public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) { */ @Override public void subscribe(Pattern pattern) { - subscribe(pattern, new NoOpConsumerRebalanceListener()); + subscribe(pattern, ConsumerRebalanceListener.NO_OP); } /** diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java index cf1b07fabe2..12c388234a7 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.clients.consumer; -import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener; import org.apache.kafka.clients.consumer.internals.SubscriptionState; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Metric; @@ -95,7 +94,7 @@ public synchronized void rebalance(Collection<TopicPartition> newAssignment) { @Override public synchronized void subscribe(Collection<String> topics) { - subscribe(topics, new NoOpConsumerRebalanceListener()); + subscribe(topics, ConsumerRebalanceListener.NO_OP); } @Override @@ -123,7 +122,7 @@ public synchronized void subscribe(Pattern pattern, final ConsumerRebalanceListe @Override public synchronized void subscribe(Pattern pattern) { - subscribe(pattern, new NoOpConsumerRebalanceListener()); + subscribe(pattern, ConsumerRebalanceListener.NO_OP); } @Override diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index 1a82faa0a3d..fc39dcce46b 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -113,7 +113,7 @@ @SuppressWarnings("deprecation") public class FetcherTest { - private ConsumerRebalanceListener listener = new NoOpConsumerRebalanceListener(); + private ConsumerRebalanceListener listener = ConsumerRebalanceListener.NO_OP; private String topicName = "test"; private String groupId = "test-group"; private final String metricGroup = "consumer" + groupId + "-fetch-manager-metrics"; diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index 7c341e6fb9b..f79d514455b 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -27,7 +27,6 @@ import kafka.server.{BaseRequestTest, KafkaConfig} import kafka.utils.TestUtils import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig, NewPartitions} import org.apache.kafka.clients.consumer._ -import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener import org.apache.kafka.clients.producer._ import org.apache.kafka.common.acl.{AccessControlEntry, AccessControlEntryFilter, AclBinding, AclBindingFilter, AclOperation, AclPermissionType} import org.apache.kafka.common.config.ConfigResource @@ -591,7 +590,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { val producer = createProducer() sendRecords(producer, 1, tp) removeAllAcls() - + val consumer = createConsumer() consumer.assign(List(tp).asJava) consumeRecords(consumer) @@ -713,7 +712,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), groupResource) val consumer = createConsumer() - consumer.subscribe(Pattern.compile(topicPattern), new NoOpConsumerRebalanceListener) + consumer.subscribe(Pattern.compile(topicPattern), ConsumerRebalanceListener.NO_OP) consumer.poll(50) assertTrue(consumer.subscription.isEmpty) } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Use singleton NoOpConsumerRebalanceListener in subscribe() call where > listener is not specified > ----------------------------------------------------------------------------------------------- > > Key: KAFKA-5846 > URL: https://issues.apache.org/jira/browse/KAFKA-5846 > Project: Kafka > Issue Type: Task > Components: clients > Reporter: Ted Yu > Priority: Minor > > Currently KafkaConsumer creates instance of NoOpConsumerRebalanceListener for > each subscribe() call where ConsumerRebalanceListener is not specified: > {code} > public void subscribe(Pattern pattern) { > subscribe(pattern, new NoOpConsumerRebalanceListener()); > {code} > We can create a singleton NoOpConsumerRebalanceListener to be used in such > scenarios. -- This message was sent by Atlassian JIRA (v7.6.3#76005)