dajac commented on code in PR #18513:
URL: https://github.com/apache/kafka/pull/18513#discussion_r1937231738


##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -2204,270 +2172,238 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
         Utils.closeQuietly(producer, "producer")
       }
 
-      val EMPTY_GROUP_INSTANCE_ID = ""
       val testGroupId = "test_group_id"
       val testClientId = "test_client_id"
       val testInstanceId1 = "test_instance_id_1"
       val testInstanceId2 = "test_instance_id_2"
       val fakeGroupId = "fake_group_id"
 
-      def createProperties(groupInstanceId: String): Properties = {
-        val newConsumerConfig = new Properties(consumerConfig)
-        // We need to disable the auto commit because after the members got 
removed from group, the offset commit
-        // will cause the member rejoining and the test will be flaky (check 
ConsumerCoordinator#OffsetCommitResponseHandler)
-        
newConsumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
-        newConsumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, 
testGroupId)
-        newConsumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, 
testClientId)
-        if (groupInstanceId != EMPTY_GROUP_INSTANCE_ID) {
-          
newConsumerConfig.setProperty(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, 
groupInstanceId)
-        }
-        newConsumerConfig
-      }
-
       // contains two static members and one dynamic member
-      val groupInstanceSet = Set(testInstanceId1, testInstanceId2, 
EMPTY_GROUP_INSTANCE_ID)
-      val consumerSet = groupInstanceSet.map { groupInstanceId => 
createConsumer(configOverrides = createProperties(groupInstanceId))}
+      val groupInstanceSet = Set(testInstanceId1, testInstanceId2, "")
       val topicSet = Set(testTopicName, testTopicName1, testTopicName2)
 
-      val latch = new CountDownLatch(consumerSet.size)
-      try {
-        def createConsumerThread[K,V](consumer: Consumer[K,V], topic: String): 
Thread = {
-          new Thread {
-            override def run : Unit = {
-              consumer.subscribe(Collections.singleton(topic))
-              try {
-                while (true) {
-                  consumer.poll(JDuration.ofSeconds(5))
-                  if (!consumer.assignment.isEmpty && latch.getCount > 0L)
-                    latch.countDown()
-                  try {
-                    consumer.commitSync()
-                  } catch {
-                    case _: CommitFailedException => // Ignore and retry on 
next iteration.
-                  }
-                }
-              } catch {
-                case _: InterruptException => // Suppress the output to stderr
-              }
-            }
-          }
+      // We need to disable the auto commit because after the members got 
removed from group, the offset commit
+      // will cause the member rejoining and the test will be flaky (check 
ConsumerCoordinator#OffsetCommitResponseHandler)
+      val defaultConsumerConfig = new Properties(consumerConfig)
+      
defaultConsumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
"false")
+      defaultConsumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, 
testClientId)
+
+      val backgroundConsumerSet = new BackgroundConsumerSet(testGroupId, 
defaultConsumerConfig)
+      groupInstanceSet.zip(topicSet).foreach { case (groupInstanceId, topic) =>
+        val configOverrides = new Properties()
+        if(groupInstanceId != "") {

Review Comment:
   nit: A space misses after the `if`.



##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -1838,250 +1838,218 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
         Utils.closeQuietly(producer, "producer")
       }
 
-      val EMPTY_GROUP_INSTANCE_ID = ""
       val testGroupId = "test_group_id"
       val testClientId = "test_client_id"
       val testInstanceId1 = "test_instance_id_1"
       val testInstanceId2 = "test_instance_id_2"
       val fakeGroupId = "fake_group_id"
 
-      def createProperties(groupInstanceId: String): Properties = {
-        val newConsumerConfig = new Properties(consumerConfig)
-        // We need to disable the auto commit because after the members got 
removed from group, the offset commit
-        // will cause the member rejoining and the test will be flaky (check 
ConsumerCoordinator#OffsetCommitResponseHandler)
-        
newConsumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
-        newConsumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, 
testGroupId)
-        newConsumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, 
testClientId)
-        if (groupInstanceId != EMPTY_GROUP_INSTANCE_ID) {
-          
newConsumerConfig.setProperty(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, 
groupInstanceId)
-        }
-        newConsumerConfig
-      }
-
       // contains two static members and one dynamic member
-      val groupInstanceSet = Set(testInstanceId1, testInstanceId2, 
EMPTY_GROUP_INSTANCE_ID)
-      val consumerSet = groupInstanceSet.map { groupInstanceId => 
createConsumer(configOverrides = createProperties(groupInstanceId))}
+      val groupInstanceSet = Set(testInstanceId1, testInstanceId2, "")
       val topicSet = Set(testTopicName, testTopicName1, testTopicName2)
 
-      val latch = new CountDownLatch(consumerSet.size)
-      try {
-        def createConsumerThread[K,V](consumer: Consumer[K,V], topic: String): 
Thread = {
-          new Thread {
-            override def run : Unit = {
-              consumer.subscribe(Collections.singleton(topic))
-              try {
-                while (true) {
-                  consumer.poll(JDuration.ofSeconds(5))
-                  if (!consumer.assignment.isEmpty && latch.getCount > 0L)
-                    latch.countDown()
-                  try {
-                    consumer.commitSync()
-                  } catch {
-                    case _: CommitFailedException => // Ignore and retry on 
next iteration.
-                  }
-                }
-              } catch {
-                case _: InterruptException => // Suppress the output to stderr
-              }
-            }
-          }
+      // We need to disable the auto commit because after the members got 
removed from group, the offset commit
+      // will cause the member rejoining and the test will be flaky (check 
ConsumerCoordinator#OffsetCommitResponseHandler)
+      val defaultConsumerConfig = new Properties(consumerConfig)
+      
defaultConsumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
"false")
+      defaultConsumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, 
testClientId)
+
+      val backgroundConsumerSet = new BackgroundConsumerSet(testGroupId, 
defaultConsumerConfig)
+      groupInstanceSet.zip(topicSet).foreach { case (groupInstanceId, topic) =>
+        val configOverrides = new Properties()
+        if(groupInstanceId != "") {

Review Comment:
   nit: A space misses after the `if`.



##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -4004,6 +3922,82 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
       ConfigSource.DYNAMIC_TOPIC_CONFIG, false, false, 
Collections.emptyList(), null, null),
       topicConfigs.get(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG))
   }
+
+  class BackgroundConsumerSet(testGroupId: String, testClientId: String, 
defaultConsumerConfig: Properties) {
+    private val consumerSet: 
scala.collection.mutable.Set[Consumer[Array[Byte], Array[Byte]]] = 
scala.collection.mutable.Set.empty
+    private val consumerThreads: scala.collection.mutable.Set[Thread] = 
scala.collection.mutable.Set.empty
+    private var startLatch: CountDownLatch = new CountDownLatch(0)
+    private var stopLatch: CountDownLatch = new CountDownLatch(0)
+    private var consumerThreadRunning = new AtomicBoolean(false)
+
+    defaultConsumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, 
testGroupId)
+    defaultConsumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, 
testClientId)
+
+    def addConsumer(groupInstanceId: String, topic: String, configOverrides: 
Properties = new Properties()): Unit = {
+      val newConsumerConfig = 
defaultConsumerConfig.clone().asInstanceOf[Properties]
+      if (groupInstanceId != "") {
+        // static member
+        newConsumerConfig.setProperty(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, 
groupInstanceId)
+      }
+      newConsumerConfig.putAll(configOverrides)
+
+      val consumer = createConsumer(configOverrides = newConsumerConfig)
+      val consumerThread = createConsumerThread(consumer, topic)
+      consumerSet.add(consumer)

Review Comment:
   Hum... Does it actually work? The issue is that the KafkaConsumer does not 
support being called from multiple threads. Hence calling close from the "app" 
thread may just raise an exception. I would suggest to ensure that the thread 
is correctly stopped in close. I would also suggest to use `consumer.wakeup` 
instead of interrupting the thread.



##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -4003,6 +3939,77 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
       ConfigSource.DYNAMIC_TOPIC_CONFIG, false, false, 
Collections.emptyList(), null, null),
       topicConfigs.get(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG))
   }
+
+  class BackgroundConsumerSet(testGroupId: String, defaultConsumerConfig: 
Properties) {
+    private val consumerSet: 
scala.collection.mutable.Set[Consumer[Array[Byte], Array[Byte]]] = 
scala.collection.mutable.Set.empty
+    private val consumerThreads: scala.collection.mutable.Set[Thread] = 
scala.collection.mutable.Set.empty
+    private var startLatch: CountDownLatch = new CountDownLatch(0)
+    private var stopLatch: CountDownLatch = new CountDownLatch(0)
+    private var consumerThreadRunning = new AtomicBoolean(false)
+
+    defaultConsumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, 
testGroupId)
+
+    def addConsumer(topic: String, configOverrides: Properties = new 
Properties()): Unit = {
+      val newConsumerConfig = 
defaultConsumerConfig.clone().asInstanceOf[Properties]
+      newConsumerConfig.putAll(configOverrides)
+
+      val consumer = createConsumer(configOverrides = newConsumerConfig)
+      val consumerThread = createConsumerThread(consumer, topic)
+      consumerSet.add(consumer)
+      consumerThreads.add(consumerThread)
+    }
+
+    def start(): Unit = {
+      startLatch = new CountDownLatch(consumerSet.size)
+      stopLatch = new CountDownLatch(consumerSet.size)
+      consumerThreadRunning = new AtomicBoolean(true)
+      consumerThreads.foreach(_.start())
+      assertTrue(startLatch.await(30000, TimeUnit.MILLISECONDS), "Failed to 
start consumer threads in time")
+    }
+
+    def stop(): Unit = {
+      consumerThreadRunning.set(false)

Review Comment:
   nit: Should we also wakeup the consumers after setting the flag?



##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -4003,6 +3939,77 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
       ConfigSource.DYNAMIC_TOPIC_CONFIG, false, false, 
Collections.emptyList(), null, null),
       topicConfigs.get(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG))
   }
+
+  class BackgroundConsumerSet(testGroupId: String, defaultConsumerConfig: 
Properties) {
+    private val consumerSet: 
scala.collection.mutable.Set[Consumer[Array[Byte], Array[Byte]]] = 
scala.collection.mutable.Set.empty
+    private val consumerThreads: scala.collection.mutable.Set[Thread] = 
scala.collection.mutable.Set.empty
+    private var startLatch: CountDownLatch = new CountDownLatch(0)
+    private var stopLatch: CountDownLatch = new CountDownLatch(0)
+    private var consumerThreadRunning = new AtomicBoolean(false)
+
+    defaultConsumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, 
testGroupId)
+
+    def addConsumer(topic: String, configOverrides: Properties = new 
Properties()): Unit = {
+      val newConsumerConfig = 
defaultConsumerConfig.clone().asInstanceOf[Properties]
+      newConsumerConfig.putAll(configOverrides)
+
+      val consumer = createConsumer(configOverrides = newConsumerConfig)
+      val consumerThread = createConsumerThread(consumer, topic)
+      consumerSet.add(consumer)
+      consumerThreads.add(consumerThread)
+    }
+
+    def start(): Unit = {
+      startLatch = new CountDownLatch(consumerSet.size)
+      stopLatch = new CountDownLatch(consumerSet.size)
+      consumerThreadRunning = new AtomicBoolean(true)
+      consumerThreads.foreach(_.start())
+      assertTrue(startLatch.await(30000, TimeUnit.MILLISECONDS), "Failed to 
start consumer threads in time")
+    }
+
+    def stop(): Unit = {
+      consumerThreadRunning.set(false)
+      assertTrue(stopLatch.await(30000, TimeUnit.MILLISECONDS), "Failed to 
stop consumer threads in time")
+    }
+
+    def close(): Unit = {
+      try {
+        consumerThreads.foreach {
+          consumerThread =>
+            consumerThread.interrupt()
+            consumerThread.join()
+        }
+      }
+      finally{

Review Comment:
   nit: `} finally {`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to