lianetm commented on code in PR #17516:
URL: https://github.com/apache/kafka/pull/17516#discussion_r1831783325


##########
core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala:
##########
@@ -124,12 +128,56 @@ class GroupAuthorizerIntegrationTest extends 
BaseRequestTest {
       () => producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, 
"message".getBytes)).get()).getCause
     assertTrue(produceException.isInstanceOf[TopicAuthorizationException])
     assertEquals(Set(topic), 
produceException.asInstanceOf[TopicAuthorizationException].unauthorizedTopics.asScala)
+  }
 
-    val consumer = createConsumer(configsToRemove = 
List(ConsumerConfig.GROUP_ID_CONFIG))
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testUnauthorizedConsumeUnsubscribe(quorum: String, groupProtocol: 
String): Unit = {
+    val topic = "topic"
+    val topicPartition = new TopicPartition(topic, 0)
+
+    createTopic(topic, listenerName = interBrokerListenerName)
+
+    val consumer = createConsumer()
+    consumer.assign(List(topicPartition).asJava)
+    val consumeException = assertThrows(classOf[TopicAuthorizationException],
+      () => TestUtils.pollUntilAtLeastNumRecords(consumer, numRecords = 1))
+    assertEquals(Set(topic), consumeException.unauthorizedTopics.asScala)
+
+    assertThrows(classOf[GroupAuthorizationException],
+      () => TestUtils.pollUntilAtLeastNumRecords(consumer, numRecords = 1))
+
+    // TODO: use background-event-queue-size metric to check there is 
background event
+    Thread.sleep(3000)
+
+    assertDoesNotThrow(new Executable {
+      override def execute(): Unit = consumer.unsubscribe()
+    })

Review Comment:
   sharing more details about my expectations of what the classic consumer does 
(so the contract we should keep):
   
   1. do not throw topic or group auth exceptions on unsubscribe simply because 
it does not wait for the leave group response after sending the request on 
`maybeLeaveGroup`
   
https://github.com/apache/kafka/blob/8cbd2edfe782997380683cbfa7451a4f2de893f0/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java#L567
 
   3. could throw topic/group auth on close because it does wait for responses 
(inflight requests) after the `maybeLeaveGroup`
   
https://github.com/apache/kafka/blob/8cbd2edfe782997380683cbfa7451a4f2de893f0/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1140
   
   So we need to throw those on close if they exist, but not on unsubscribe 
(let's align the expectations on the test with this understanding I would say, 
and we'll validate it). Let me know what you think/find. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to