This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch 2.3 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.3 by this push: new 697f156 MINOR: Fix unnecessary metadata fetch before group assignment (#8095) 697f156 is described below commit 697f1567bef5709028c24ab14fdf85f0e26551d4 Author: Jason Gustafson <ja...@confluent.io> AuthorDate: Wed Feb 12 11:45:06 2020 -0800 MINOR: Fix unnecessary metadata fetch before group assignment (#8095) The recent increase in the flakiness of one of the offset reset tests (KAFKA-9538) traces back to https://github.com/apache/kafka/pull/7941. After investigation, we found that following this patch, the consumer was sending an additional metadata request prior to performing the group assignment. This slight timing difference was enough to trigger the test failures. The problem turned out to be due to a bug in `SubscriptionState.groupSubscribe`, which no longer counted the local subscri [...] Without the fix, we saw 30-50% test failures locally. With it, I could no longer reproduce the failure. However, #6561 is probably still needed to improve the resilience of this test. Reviewers: Rajini Sivaram <rajinisiva...@googlemail.com> --- .../consumer/internals/SubscriptionState.java | 12 ++++++----- .../consumer/internals/SubscriptionStateTest.java | 20 ++++++++++++++++-- .../kafka/admin/ConsumerGroupCommandTest.scala | 16 ++++++++++----- .../kafka/admin/ResetConsumerGroupOffsetTest.scala | 24 ++++++++++++++++------ 4 files changed, 54 insertions(+), 18 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java index f4f4d08..89712d9 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java @@ -160,15 +160,17 @@ public class SubscriptionState { } /** - * Add topics to the current group subscription. This is used by the group leader to ensure + * Set the current group subscription. This is used by the group leader to ensure * that it receives metadata updates for all topics that the group is interested in. - * @param topics The topics to add to the group subscription + * + * @param topics All topics from the group subscription + * @return true if the group subscription contains topics which are not part of the local subscription */ synchronized boolean groupSubscribe(Collection<String> topics) { if (!partitionsAutoAssigned()) throw new IllegalStateException(SUBSCRIPTION_EXCEPTION_MESSAGE); - groupSubscription = new HashSet<>(groupSubscription); - return groupSubscription.addAll(topics); + groupSubscription = new HashSet<>(topics); + return !subscription.containsAll(groupSubscription); } /** @@ -293,7 +295,7 @@ public class SubscriptionState { } /** - * Get the subcription topics for which metadata is required . For the leader, this will include + * Get the subscription topics for which metadata is required. For the leader, this will include * the union of the subscriptions of all group members. For followers, it is just that member's * subscription. This is used when querying topic metadata to detect the metadata changes which would * require rebalancing. The leader fetches metadata for all topics in the group so that it diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java index 217b3ce..35ef154 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java @@ -105,13 +105,29 @@ public class SubscriptionStateTest { } @Test + public void testGroupSubscribe() { + state.subscribe(singleton(topic1), rebalanceListener); + assertEquals(singleton(topic1), state.metadataTopics()); + + assertFalse(state.groupSubscribe(singleton(topic1))); + assertEquals(singleton(topic1), state.metadataTopics()); + + assertTrue(state.groupSubscribe(Utils.mkSet(topic, topic1))); + assertEquals(Utils.mkSet(topic, topic1), state.metadataTopics()); + + // `groupSubscribe` does not accumulate + assertFalse(state.groupSubscribe(singleton(topic1))); + assertEquals(singleton(topic1), state.metadataTopics()); + } + + @Test public void partitionAssignmentChangeOnPatternSubscription() { state.subscribe(Pattern.compile(".*"), rebalanceListener); // assigned partitions should remain unchanged assertTrue(state.assignedPartitions().isEmpty()); assertEquals(0, state.numAssignedPartitions()); - state.subscribeFromPattern(new HashSet<>(Collections.singletonList(topic))); + state.subscribeFromPattern(Collections.singleton(topic)); // assigned partitions should remain unchanged assertTrue(state.assignedPartitions().isEmpty()); assertEquals(0, state.numAssignedPartitions()); @@ -230,7 +246,7 @@ public class SubscriptionStateTest { @Test public void cantAssignPartitionForUnmatchedPattern() { state.subscribe(Pattern.compile(".*t"), rebalanceListener); - state.subscribeFromPattern(new HashSet<>(Collections.singletonList(topic))); + state.subscribeFromPattern(Collections.singleton(topic)); assertFalse(state.assignFromSubscribed(Collections.singletonList(t1p0))); } diff --git a/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala index d5eea98..c830ccd 100644 --- a/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala +++ b/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala @@ -64,12 +64,10 @@ class ConsumerGroupCommandTest extends KafkaServerTestHarness { } def committedOffsets(topic: String = topic, group: String = group): Map[TopicPartition, Long] = { - val props = new Properties - props.put("bootstrap.servers", brokerList) - props.put("group.id", group) - val consumer = new KafkaConsumer(props, new StringDeserializer, new StringDeserializer) + val consumer = createNoAutoCommitConsumer(group) try { - consumer.partitionsFor(topic).asScala.flatMap { partitionInfo => + val partitions = consumer.partitionsFor(topic).asScala.toSet + partitions.flatMap { partitionInfo => val tp = new TopicPartition(partitionInfo.topic, partitionInfo.partition) val committed = consumer.committed(tp) if (committed == null) @@ -82,6 +80,14 @@ class ConsumerGroupCommandTest extends KafkaServerTestHarness { } } + def createNoAutoCommitConsumer(group: String): KafkaConsumer[String, String] = { + val props = new Properties + props.put("bootstrap.servers", brokerList) + props.put("group.id", group) + props.put("enable.auto.commit", "false") + new KafkaConsumer(props, new StringDeserializer, new StringDeserializer) + } + def getConsumerGroupService(args: Array[String]): ConsumerGroupService = { val opts = new ConsumerGroupCommandOptions(args) val service = new ConsumerGroupService(opts) diff --git a/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala b/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala index baf1d05..d69ec9d 100644 --- a/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala +++ b/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala @@ -25,6 +25,8 @@ import org.apache.kafka.common.TopicPartition import org.junit.Assert._ import org.junit.Test +import scala.collection.Seq + class TimeConversionTests { @Test @@ -466,12 +468,22 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest { executor.shutdown() } - private def awaitConsumerProgress(topic: String = topic, group: String = group, count: Long): Unit = { - TestUtils.waitUntilTrue(() => { - val offsets = committedOffsets(topic = topic, group = group).values - count == offsets.sum - }, "Expected that consumer group has consumed all messages from topic/partition. " + - s"Expected offset: $count. Actual offset: ${committedOffsets(topic, group).values.sum}") + private def awaitConsumerProgress(topic: String = topic, + group: String = group, + count: Long): Unit = { + val consumer = createNoAutoCommitConsumer(group) + try { + TestUtils.waitUntilTrue(() => { + val committed = committedOffsets(topic, group) + val total = committed.values.sum + total == count + }, "Expected that consumer group has consumed all messages from topic/partition. " + + s"Expected offset: $count. Actual offset: ${committedOffsets(topic, group).values.sum}") + + } finally { + consumer.close() + } + } private def resetAndAssertOffsets(args: Array[String],