kirktrue commented on code in PR #16686: URL: https://github.com/apache/kafka/pull/16686#discussion_r1702416223
########## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ########## @@ -911,4 +911,42 @@ class PlaintextConsumerTest extends BaseConsumerTest { assertThrows(classOf[WakeupException], () => consumer.position(topicPartition, Duration.ofSeconds(100))) } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly")) + def testCloseLeavesGroupOnInterrupt(quorum: String, groupProtocol: String): Unit = { + val adminClient = createAdminClient() + val consumer = createConsumer() + val groupId = consumerConfig.getProperty("group.id") + + def hasMembers: Boolean = { + try { + val groupDescription = adminClient.describeConsumerGroups (Collections.singletonList (groupId) ).describedGroups.get (groupId).get + groupDescription.members.size() > 0 + } catch { + case _: ExecutionException | _: InterruptedException => + false + } + } + + val listener = new TestConsumerReassignmentListener() + consumer.subscribe(List(topic).asJava, listener) + awaitRebalance(consumer, listener) + + assertEquals(1, listener.callsToAssigned) + assertEquals(0, listener.callsToRevoked) + TestUtils.waitUntilTrue(() => hasMembers, s"Consumer did not join the consumer group within ${JTestUtils.DEFAULT_MAX_WAIT_MS} of subscribe") + + try { + Thread.currentThread().interrupt() + assertThrows(classOf[InterruptException], () => consumer.close()) + } finally { + // Clear the interrupted flag so we don't create problems for subsequent tests. + Thread.interrupted() + } + + assertEquals(1, listener.callsToAssigned) + assertEquals(1, listener.callsToRevoked) + TestUtils.waitUntilTrue(() => !hasMembers, s"Consumer did not leave the consumer group within ${JTestUtils.DEFAULT_MAX_WAIT_MS} of interrupt/close") Review Comment: I explored the alternate approach you referred to with changing the configuration. It seemed to work, inasmuch as the tests pass locally for both `GroupProtocol` types. However, per the other comment about reproducibility locally vs. on CI, I'm not sure if I made any real difference. However, I later updated the test to simply wait half of the configured session timeout, which I believe achieves the same effect. In this test, the session timeout is never explicitly set anywhere; it defaults to 45000 ms per the `ConsumerConfig` default. Waiting half of the session timeout means that the call to `waitUntilTrue` will fail before the broker gets around to kicking the consumer out of the group. Correct me if I'm wrong, as I probably am because my AC is broken and my brain is sweating 😄 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org