FrankYang0529 commented on code in PR #17516: URL: https://github.com/apache/kafka/pull/17516#discussion_r1839785185
########## core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala: ########## @@ -132,6 +133,84 @@ class GroupAuthorizerIntegrationTest extends BaseRequestTest { assertEquals(Set(topic), consumeException.unauthorizedTopics.asScala) } + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly")) + def testConsumeUnsubscribeWithoutGroupPermission(quorum: String, groupProtocol: String): Unit = { + val topic = "topic" + + createTopic(topic, listenerName = interBrokerListenerName) + + // allow topic read/write permission to poll/send record + addAndVerifyAcls( + Set(createAcl(AclOperation.WRITE, AclPermissionType.ALLOW), createAcl(AclOperation.READ, AclPermissionType.ALLOW)), + new ResourcePattern(ResourceType.TOPIC, topic, PatternType.LITERAL) + ) + val producer = createProducer() + producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, "message".getBytes)).get() + producer.close() + + // allow group read permission to join group + val group = "group" + addAndVerifyAcls( + Set(createAcl(AclOperation.READ, AclPermissionType.ALLOW)), + new ResourcePattern(ResourceType.GROUP, group, PatternType.LITERAL) + ) + + val props = new Properties() + props.put(ConsumerConfig.GROUP_ID_CONFIG, group) + props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") + val consumer = createConsumer(configOverrides = props) + consumer.subscribe(List(topic).asJava) + TestUtils.pollUntilAtLeastNumRecords(consumer, numRecords = 1) + + removeAndVerifyAcls( + Set(createAcl(AclOperation.READ, AclPermissionType.ALLOW)), + new ResourcePattern(ResourceType.GROUP, group, PatternType.LITERAL) + ) + + assertDoesNotThrow(new Executable { + override def execute(): Unit = consumer.unsubscribe() + }) + } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testConsumeCloseWithoutGroupPermission(quorum: String, groupProtocol: String): Unit = { Review Comment: Hi @lianetm, I would like to confirm again: do you mean that we disable `close` test cases currently, revert change in `AsyncKafkaConsumer#releaseAssignmentAndLeaveGroup` function, and then we will do the change after #16686 is merged? Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org