FrankYang0529 commented on code in PR #17516:
URL: https://github.com/apache/kafka/pull/17516#discussion_r1836296265


##########
core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala:
##########
@@ -124,12 +128,56 @@ class GroupAuthorizerIntegrationTest extends 
BaseRequestTest {
       () => producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, 
"message".getBytes)).get()).getCause
     assertTrue(produceException.isInstanceOf[TopicAuthorizationException])
     assertEquals(Set(topic), 
produceException.asInstanceOf[TopicAuthorizationException].unauthorizedTopics.asScala)
+  }
 
-    val consumer = createConsumer(configsToRemove = 
List(ConsumerConfig.GROUP_ID_CONFIG))
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testUnauthorizedConsumeUnsubscribe(quorum: String, groupProtocol: 
String): Unit = {
+    val topic = "topic"
+    val topicPartition = new TopicPartition(topic, 0)
+
+    createTopic(topic, listenerName = interBrokerListenerName)
+
+    val consumer = createConsumer()
+    consumer.assign(List(topicPartition).asJava)
+    val consumeException = assertThrows(classOf[TopicAuthorizationException],
+      () => TestUtils.pollUntilAtLeastNumRecords(consumer, numRecords = 1))
+    assertEquals(Set(topic), consumeException.unauthorizedTopics.asScala)
+
+    assertThrows(classOf[GroupAuthorizationException],
+      () => TestUtils.pollUntilAtLeastNumRecords(consumer, numRecords = 1))
+
+    // TODO: use background-event-queue-size metric to check there is 
background event
+    Thread.sleep(3000)
+
+    assertDoesNotThrow(new Executable {
+      override def execute(): Unit = consumer.unsubscribe()
+    })

Review Comment:
   Hi @lianetm, thanks for sharing the details in `ClassicKafkaConsumer`. For 
`ClassicKafkaConsumer#unsubscribe`, it doesn't check future object from 
`maybeLeaveGroup`, so `unsubscribe` doesn't throw exceptions. I remain 
`unsubscribe` function test for `AsyncKafkaConsumer` only, and keep `close` 
function test for both consumers.
   
   I have another question here. IIUC, we don't need to swallow 
`TopicAuthorizationException`, because we don't check topic permission when 
doing consumer group heartbeat request. So I think we only need to swallow 
`GroupAuthorizationException` here.  WDYT? Thanks.
   
   
https://github.com/apache/kafka/blob/9db5ed00a8369d5c696e836661230110ea2ea44d/core/src/main/scala/kafka/server/KafkaApis.scala#L3812-L3815
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to