[ 
https://issues.apache.org/jira/browse/KAFKA-4095?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16535966#comment-16535966
 ] 

ASF GitHub Bot commented on KAFKA-4095:
---------------------------------------

vahidhashemian closed pull request #2160: KAFKA-4095: Remove topic offsets and 
owners from ZK consumer groups upon topic deletion
URL: https://github.com/apache/kafka/pull/2160
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala 
b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
index e483ac23317..2805a89fc6f 100755
--- a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
+++ b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
@@ -17,6 +17,7 @@
 package kafka.controller
 
 
+import kafka.admin.AdminUtils
 import kafka.common.TopicAndPartition
 import kafka.server.ConfigType
 import kafka.utils.Logging
@@ -243,6 +244,11 @@ class TopicDeletionManager(controller: KafkaController, 
eventManager: Controller
     // move respective partition to OfflinePartition and NonExistentPartition 
state
     partitionStateMachine.handleStateChanges(partitionsForDeletedTopic, 
OfflinePartition)
     partitionStateMachine.handleStateChanges(partitionsForDeletedTopic, 
NonExistentPartition)
+
+    // remove topic offsets from all groups that consumed from it
+    // - old consumer based groups
+    
AdminUtils.deleteAllConsumerGroupInfoForTopicInZK(controllerContext.zkUtils, 
topic)
+
     topicsToBeDeleted -= topic
     partitionsToBeDeleted.retain(_.topic != topic)
     val zkUtils = controllerContext.zkUtils
diff --git 
a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala 
b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
index 700030801d4..51b743929c0 100644
--- a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
@@ -40,6 +40,8 @@ import scala.collection.mutable.ArrayBuffer
 class DescribeConsumerGroupTest extends KafkaServerTestHarness {
   private val topic = "foo"
   private val group = "test.group"
+  private val overridingProps = new Properties()
+  overridingProps.setProperty("delete.topic.enable", "true")
 
   @deprecated("This field will be removed in a future release", "0.11.0.0")
   private val oldConsumers = new ArrayBuffer[OldConsumer]
@@ -49,7 +51,7 @@ class DescribeConsumerGroupTest extends 
KafkaServerTestHarness {
   // configure the servers and clients
   override def generateConfigs = {
     TestUtils.createBrokerConfigs(1, zkConnect, enableControlledShutdown = 
false).map { props =>
-      KafkaConfig.fromProps(props)
+      KafkaConfig.fromProps(props, overridingProps)
     }
   }
 
@@ -135,6 +137,47 @@ class DescribeConsumerGroupTest extends 
KafkaServerTestHarness {
     }, "Expected rows for consumers with no assigned partitions in describe 
group results.")
   }
 
+  @Test
+  @deprecated("This test has been deprecated and will be removed in a future 
release.", "0.11.0.1")
+  def testDescribeConsumersAfterDeletingTopic() {
+    val topic2 = "foo2"
+    AdminUtils.createTopic(zkUtils, topic2, 1, 1)
+
+    TestUtils.createOffsetsTopic(zkUtils, servers)
+    createOldConsumer()
+    createOldConsumer(Some(topic2))
+    val opts = new ConsumerGroupCommandOptions(Array("--zookeeper", zkConnect, 
"--describe", "--group", group))
+    consumerGroupService = new ZkConsumerGroupService(opts)
+    TestUtils.waitUntilTrue(() => {
+        val (_, assignments) = consumerGroupService.describeGroup()
+        assignments.isDefined && assignments.get.count(_.group == group) == 2
+      }, "Expected two rows in describe group results.")
+
+    oldConsumers.foreach(_.stop())
+    oldConsumers.clear()
+
+    // deleting the topic should not remove the group (since there is still an 
existing topic the group consumes from)
+    AdminUtils.deleteTopic(zkUtils, topic2)
+    TestUtils.waitUntilTrue(() => {
+      !AdminUtils.topicExists(zkUtils, topic2)
+      }, "The first topic deletion did not succeed.")
+
+    TestUtils.waitUntilTrue(() => {
+        val (_, assignments) = consumerGroupService.describeGroup()
+        assignments.isDefined && assignments.get.count(_.group == group) == 1
+      }, "Expected one row in describe group results.")
+
+    // deleting the topic should cause the group to be removed (since the 
group now consumes from no existing topic)
+    AdminUtils.deleteTopic(zkUtils, topic)
+    TestUtils.waitUntilTrue(() => {
+      !AdminUtils.topicExists(zkUtils, topic)
+      }, "The second topic deletion did not succeed.")
+
+    TestUtils.waitUntilTrue(() => {
+        !consumerGroupService.describeGroup()._2.isDefined
+      }, "Expected no rows in describe group results after deleting all topics 
the group has consumed from.")
+  }
+
   @Test
   def testDescribeNonExistingGroupWithNewConsumer() {
     TestUtils.createOffsetsTopic(zkUtils, servers)
@@ -275,11 +318,13 @@ class DescribeConsumerGroupTest extends 
KafkaServerTestHarness {
   }
 
   @deprecated("This test has been deprecated and will be removed in a future 
release.", "0.11.1.0")
-  private def createOldConsumer(): Unit = {
+  private def createOldConsumer(subscribedTopic: Option[String] = None): 
OldConsumer = {
     val consumerProps = new Properties
     consumerProps.setProperty("group.id", group)
     consumerProps.setProperty("zookeeper.connect", zkConnect)
-    oldConsumers += new OldConsumer(Whitelist(topic), consumerProps)
+    val consumer = new 
OldConsumer(Whitelist(subscribedTopic.getOrElse(topic)), consumerProps)
+    oldConsumers += consumer
+    consumer
   }
 }
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> When a topic is deleted and then created with the same name, 'committed' 
> offsets are not reset
> ----------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-4095
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4095
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer
>    Affects Versions: 0.9.0.1, 0.10.0.0
>            Reporter: Alex Glikson
>            Assignee: Vahid Hashemian
>            Priority: Major
>
> I encountered a very strange behavior of Kafka, which seems to be a bug.
> After deleting a topic and re-creating it with the same name, I produced 
> certain amount of new messages, and then opened a consumer with the same ID 
> that I used before re-creating the topic (with auto.commit=false, 
> auto.offset.reset=earliest). While the latest offsets seemed up to date, the 
> *committed* offset (returned by committed() method) was an *old* offset, from 
> the time before the topic has been deleted and created.
> I would have assumed that when a topic is deleted, all the associated 
> topic-partitions and consumer groups are recycled too.
> I am using the Java client version 0.9, with Kafka server 0.10.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to