[ https://issues.apache.org/jira/browse/KAFKA-4095?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16535966#comment-16535966 ]
ASF GitHub Bot commented on KAFKA-4095: --------------------------------------- vahidhashemian closed pull request #2160: KAFKA-4095: Remove topic offsets and owners from ZK consumer groups upon topic deletion URL: https://github.com/apache/kafka/pull/2160 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala index e483ac23317..2805a89fc6f 100755 --- a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala +++ b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala @@ -17,6 +17,7 @@ package kafka.controller +import kafka.admin.AdminUtils import kafka.common.TopicAndPartition import kafka.server.ConfigType import kafka.utils.Logging @@ -243,6 +244,11 @@ class TopicDeletionManager(controller: KafkaController, eventManager: Controller // move respective partition to OfflinePartition and NonExistentPartition state partitionStateMachine.handleStateChanges(partitionsForDeletedTopic, OfflinePartition) partitionStateMachine.handleStateChanges(partitionsForDeletedTopic, NonExistentPartition) + + // remove topic offsets from all groups that consumed from it + // - old consumer based groups + AdminUtils.deleteAllConsumerGroupInfoForTopicInZK(controllerContext.zkUtils, topic) + topicsToBeDeleted -= topic partitionsToBeDeleted.retain(_.topic != topic) val zkUtils = controllerContext.zkUtils diff --git a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala index 700030801d4..51b743929c0 100644 --- a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala +++ b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala @@ -40,6 +40,8 @@ import scala.collection.mutable.ArrayBuffer class DescribeConsumerGroupTest extends KafkaServerTestHarness { private val topic = "foo" private val group = "test.group" + private val overridingProps = new Properties() + overridingProps.setProperty("delete.topic.enable", "true") @deprecated("This field will be removed in a future release", "0.11.0.0") private val oldConsumers = new ArrayBuffer[OldConsumer] @@ -49,7 +51,7 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness { // configure the servers and clients override def generateConfigs = { TestUtils.createBrokerConfigs(1, zkConnect, enableControlledShutdown = false).map { props => - KafkaConfig.fromProps(props) + KafkaConfig.fromProps(props, overridingProps) } } @@ -135,6 +137,47 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness { }, "Expected rows for consumers with no assigned partitions in describe group results.") } + @Test + @deprecated("This test has been deprecated and will be removed in a future release.", "0.11.0.1") + def testDescribeConsumersAfterDeletingTopic() { + val topic2 = "foo2" + AdminUtils.createTopic(zkUtils, topic2, 1, 1) + + TestUtils.createOffsetsTopic(zkUtils, servers) + createOldConsumer() + createOldConsumer(Some(topic2)) + val opts = new ConsumerGroupCommandOptions(Array("--zookeeper", zkConnect, "--describe", "--group", group)) + consumerGroupService = new ZkConsumerGroupService(opts) + TestUtils.waitUntilTrue(() => { + val (_, assignments) = consumerGroupService.describeGroup() + assignments.isDefined && assignments.get.count(_.group == group) == 2 + }, "Expected two rows in describe group results.") + + oldConsumers.foreach(_.stop()) + oldConsumers.clear() + + // deleting the topic should not remove the group (since there is still an existing topic the group consumes from) + AdminUtils.deleteTopic(zkUtils, topic2) + TestUtils.waitUntilTrue(() => { + !AdminUtils.topicExists(zkUtils, topic2) + }, "The first topic deletion did not succeed.") + + TestUtils.waitUntilTrue(() => { + val (_, assignments) = consumerGroupService.describeGroup() + assignments.isDefined && assignments.get.count(_.group == group) == 1 + }, "Expected one row in describe group results.") + + // deleting the topic should cause the group to be removed (since the group now consumes from no existing topic) + AdminUtils.deleteTopic(zkUtils, topic) + TestUtils.waitUntilTrue(() => { + !AdminUtils.topicExists(zkUtils, topic) + }, "The second topic deletion did not succeed.") + + TestUtils.waitUntilTrue(() => { + !consumerGroupService.describeGroup()._2.isDefined + }, "Expected no rows in describe group results after deleting all topics the group has consumed from.") + } + @Test def testDescribeNonExistingGroupWithNewConsumer() { TestUtils.createOffsetsTopic(zkUtils, servers) @@ -275,11 +318,13 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness { } @deprecated("This test has been deprecated and will be removed in a future release.", "0.11.1.0") - private def createOldConsumer(): Unit = { + private def createOldConsumer(subscribedTopic: Option[String] = None): OldConsumer = { val consumerProps = new Properties consumerProps.setProperty("group.id", group) consumerProps.setProperty("zookeeper.connect", zkConnect) - oldConsumers += new OldConsumer(Whitelist(topic), consumerProps) + val consumer = new OldConsumer(Whitelist(subscribedTopic.getOrElse(topic)), consumerProps) + oldConsumers += consumer + consumer } } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > When a topic is deleted and then created with the same name, 'committed' > offsets are not reset > ---------------------------------------------------------------------------------------------- > > Key: KAFKA-4095 > URL: https://issues.apache.org/jira/browse/KAFKA-4095 > Project: Kafka > Issue Type: Bug > Components: consumer > Affects Versions: 0.9.0.1, 0.10.0.0 > Reporter: Alex Glikson > Assignee: Vahid Hashemian > Priority: Major > > I encountered a very strange behavior of Kafka, which seems to be a bug. > After deleting a topic and re-creating it with the same name, I produced > certain amount of new messages, and then opened a consumer with the same ID > that I used before re-creating the topic (with auto.commit=false, > auto.offset.reset=earliest). While the latest offsets seemed up to date, the > *committed* offset (returned by committed() method) was an *old* offset, from > the time before the topic has been deleted and created. > I would have assumed that when a topic is deleted, all the associated > topic-partitions and consumer groups are recycled too. > I am using the Java client version 0.9, with Kafka server 0.10. -- This message was sent by Atlassian JIRA (v7.6.3#76005)