FrankYang0529 commented on code in PR #17516: URL: https://github.com/apache/kafka/pull/17516#discussion_r1836296265
########## core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala: ########## @@ -124,12 +128,56 @@ class GroupAuthorizerIntegrationTest extends BaseRequestTest { () => producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, "message".getBytes)).get()).getCause assertTrue(produceException.isInstanceOf[TopicAuthorizationException]) assertEquals(Set(topic), produceException.asInstanceOf[TopicAuthorizationException].unauthorizedTopics.asScala) + } - val consumer = createConsumer(configsToRemove = List(ConsumerConfig.GROUP_ID_CONFIG)) + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testUnauthorizedConsumeUnsubscribe(quorum: String, groupProtocol: String): Unit = { + val topic = "topic" + val topicPartition = new TopicPartition(topic, 0) + + createTopic(topic, listenerName = interBrokerListenerName) + + val consumer = createConsumer() + consumer.assign(List(topicPartition).asJava) + val consumeException = assertThrows(classOf[TopicAuthorizationException], + () => TestUtils.pollUntilAtLeastNumRecords(consumer, numRecords = 1)) + assertEquals(Set(topic), consumeException.unauthorizedTopics.asScala) + + assertThrows(classOf[GroupAuthorizationException], + () => TestUtils.pollUntilAtLeastNumRecords(consumer, numRecords = 1)) + + // TODO: use background-event-queue-size metric to check there is background event + Thread.sleep(3000) + + assertDoesNotThrow(new Executable { + override def execute(): Unit = consumer.unsubscribe() + }) Review Comment: Hi @lianetm, thanks for sharing the details in `ClassicKafkaConsumer`. For `ClassicKafkaConsumer#unsubscribe`, it doesn't check future object from `maybeLeaveGroup`, so `unsubscribe` doesn't throw exceptions. I remain `unsubscribe` function test for `AsyncKafkaConsumer` only, and keep `close` function test for both consumers. I have another question here. IIUC, we don't need to swallow `TopicAuthorizationException`, because we don't check topic permission when doing consumer group heartbeat request. So I think we only need to swallow `GroupAuthorizationException` here. WDYT? Thanks. https://github.com/apache/kafka/blob/9db5ed00a8369d5c696e836661230110ea2ea44d/core/src/main/scala/kafka/server/KafkaApis.scala#L3812-L3815 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org