FrankYang0529 commented on code in PR #17516:
URL: https://github.com/apache/kafka/pull/17516#discussion_r1839785185


##########
core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala:
##########
@@ -132,6 +133,84 @@ class GroupAuthorizerIntegrationTest extends 
BaseRequestTest {
     assertEquals(Set(topic), consumeException.unauthorizedTopics.asScala)
   }
 
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly"))
+  def testConsumeUnsubscribeWithoutGroupPermission(quorum: String, 
groupProtocol: String): Unit = {
+    val topic = "topic"
+
+    createTopic(topic, listenerName = interBrokerListenerName)
+
+    // allow topic read/write permission to poll/send record
+    addAndVerifyAcls(
+      Set(createAcl(AclOperation.WRITE, AclPermissionType.ALLOW), 
createAcl(AclOperation.READ, AclPermissionType.ALLOW)),
+      new ResourcePattern(ResourceType.TOPIC, topic, PatternType.LITERAL)
+    )
+    val producer = createProducer()
+    producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, 
"message".getBytes)).get()
+    producer.close()
+
+    // allow group read permission to join group
+    val group = "group"
+    addAndVerifyAcls(
+      Set(createAcl(AclOperation.READ, AclPermissionType.ALLOW)),
+      new ResourcePattern(ResourceType.GROUP, group, PatternType.LITERAL)
+    )
+
+    val props = new Properties()
+    props.put(ConsumerConfig.GROUP_ID_CONFIG, group)
+    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
+    val consumer = createConsumer(configOverrides = props)
+    consumer.subscribe(List(topic).asJava)
+    TestUtils.pollUntilAtLeastNumRecords(consumer, numRecords = 1)
+
+    removeAndVerifyAcls(
+      Set(createAcl(AclOperation.READ, AclPermissionType.ALLOW)),
+      new ResourcePattern(ResourceType.GROUP, group, PatternType.LITERAL)
+    )
+
+    assertDoesNotThrow(new Executable {
+      override def execute(): Unit = consumer.unsubscribe()
+    })
+  }
+
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testConsumeCloseWithoutGroupPermission(quorum: String, groupProtocol: 
String): Unit = {

Review Comment:
   Hi @lianetm, I would like to confirm again: do you mean that we disable 
`close` test cases currently, revert change in 
`AsyncKafkaConsumer#releaseAssignmentAndLeaveGroup` function, and then we will 
do the change after #16686 is merged? Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to