kirktrue commented on code in PR #16686:
URL: https://github.com/apache/kafka/pull/16686#discussion_r1702416223


##########
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##########
@@ -911,4 +911,42 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 
     assertThrows(classOf[WakeupException], () => 
consumer.position(topicPartition, Duration.ofSeconds(100)))
   }
+
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly"))
+  def testCloseLeavesGroupOnInterrupt(quorum: String, groupProtocol: String): 
Unit = {
+    val adminClient = createAdminClient()
+    val consumer = createConsumer()
+    val groupId = consumerConfig.getProperty("group.id")
+
+    def hasMembers: Boolean = {
+      try {
+        val groupDescription = adminClient.describeConsumerGroups 
(Collections.singletonList (groupId) ).describedGroups.get (groupId).get
+        groupDescription.members.size() > 0
+      } catch {
+        case _: ExecutionException | _: InterruptedException =>
+          false
+      }
+    }
+
+    val listener = new TestConsumerReassignmentListener()
+    consumer.subscribe(List(topic).asJava, listener)
+    awaitRebalance(consumer, listener)
+
+    assertEquals(1, listener.callsToAssigned)
+    assertEquals(0, listener.callsToRevoked)
+    TestUtils.waitUntilTrue(() => hasMembers, s"Consumer did not join the 
consumer group within ${JTestUtils.DEFAULT_MAX_WAIT_MS} of subscribe")
+
+    try {
+      Thread.currentThread().interrupt()
+      assertThrows(classOf[InterruptException], () => consumer.close())
+    } finally {
+      // Clear the interrupted flag so we don't create problems for subsequent 
tests.
+      Thread.interrupted()
+    }
+
+    assertEquals(1, listener.callsToAssigned)
+    assertEquals(1, listener.callsToRevoked)
+    TestUtils.waitUntilTrue(() => !hasMembers, s"Consumer did not leave the 
consumer group within ${JTestUtils.DEFAULT_MAX_WAIT_MS} of interrupt/close")

Review Comment:
   I explored the alternate approach you referred to with changing the 
configuration. It seemed to work, inasmuch as the tests pass locally for both 
`GroupProtocol` types. However, per the other comment about reproducibility 
locally vs. on CI, I'm not sure if I made any real difference.
   
   However, I later updated the test to simply wait half of the configured 
session timeout, which I believe achieves the same effect. In this test, the 
session timeout is never explicitly set anywhere; it defaults to 45000 ms per 
the `ConsumerConfig` default. Waiting half of the session timeout means that 
the call to `waitUntilTrue` will fail before the broker gets around to kicking 
the consumer out of the group.
   
   Correct me if I'm wrong, as I probably am because my AC is broken and my 
brain is sweating 😄



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to