lianetm commented on code in PR #17516:
URL: https://github.com/apache/kafka/pull/17516#discussion_r1837124491


##########
core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala:
##########
@@ -132,6 +133,84 @@ class GroupAuthorizerIntegrationTest extends 
BaseRequestTest {
     assertEquals(Set(topic), consumeException.unauthorizedTopics.asScala)
   }
 
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly"))
+  def testConsumeUnsubscribeWithoutGroupPermission(quorum: String, 
groupProtocol: String): Unit = {
+    val topic = "topic"
+
+    createTopic(topic, listenerName = interBrokerListenerName)
+
+    // allow topic read/write permission to poll/send record
+    addAndVerifyAcls(
+      Set(createAcl(AclOperation.WRITE, AclPermissionType.ALLOW), 
createAcl(AclOperation.READ, AclPermissionType.ALLOW)),
+      new ResourcePattern(ResourceType.TOPIC, topic, PatternType.LITERAL)
+    )
+    val producer = createProducer()
+    producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, 
"message".getBytes)).get()
+    producer.close()
+
+    // allow group read permission to join group
+    val group = "group"
+    addAndVerifyAcls(
+      Set(createAcl(AclOperation.READ, AclPermissionType.ALLOW)),
+      new ResourcePattern(ResourceType.GROUP, group, PatternType.LITERAL)
+    )
+
+    val props = new Properties()
+    props.put(ConsumerConfig.GROUP_ID_CONFIG, group)
+    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
+    val consumer = createConsumer(configOverrides = props)
+    consumer.subscribe(List(topic).asJava)
+    TestUtils.pollUntilAtLeastNumRecords(consumer, numRecords = 1)
+
+    removeAndVerifyAcls(
+      Set(createAcl(AclOperation.READ, AclPermissionType.ALLOW)),
+      new ResourcePattern(ResourceType.GROUP, group, PatternType.LITERAL)
+    )
+
+    assertDoesNotThrow(new Executable {
+      override def execute(): Unit = consumer.unsubscribe()
+    })
+  }
+
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testConsumeCloseWithoutGroupPermission(quorum: String, groupProtocol: 
String): Unit = {

Review Comment:
   This test is valuable, but if we agree on leaving this PR for the 
unsubscribe to leverage the close/callbacks changes in 
https://github.com/apache/kafka/pull/16686 we should probably bring this test 
back in a separate PR, after that one goes in. What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to