lianetm commented on code in PR #17516:
URL: https://github.com/apache/kafka/pull/17516#discussion_r1837107494


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1276,10 +1277,12 @@ private void releaseAssignmentAndLeaveGroup(final Timer 
timer) {
         UnsubscribeEvent unsubscribeEvent = new 
UnsubscribeEvent(calculateDeadlineMs(timer));
         applicationEventHandler.add(unsubscribeEvent);
         try {
-            // If users subscribe to an invalid topic name, they will get 
InvalidTopicException in error events,
-            // because network thread keeps trying to send MetadataRequest in 
the background.
-            // Ignore it to avoid unsubscribe failed.
-            processBackgroundEvents(unsubscribeEvent.future(), timer, e -> e 
instanceof InvalidTopicException);
+            // If users subscribe to a topic with invalid name or without 
permission, they will get some exceptions.
+            // Because network thread keeps trying to send MetadataRequest or 
ConsumerGroupHeartbeatRequest in the background,
+            // there will be some error events in the background queue.
+            // When running close, these exceptions should be ignored, or 
users can't close successfully.
+            processBackgroundEvents(unsubscribeEvent.future(), timer,
+                e -> e instanceof InvalidTopicException || e instanceof 
GroupAuthorizationException);

Review Comment:
   what about reverting these changes to keep this PR addressing the 
unsubscribe only? (aligned with the PR name). I think it will be convenient 
because there is another ongoing PR https://github.com/apache/kafka/pull/16686 
changing the close, and this "releaseAssignmentAndLeaveGroup" does not 
processBackgroundEvents anymore, so the issue you're trying to solve here 
disappears (we might remain with the other issue of events unaware of metadata 
errors, but that is already addressed in another PR 
https://github.com/apache/kafka/pull/17440 too so let's address those concerns 
there). Makes sense?



##########
core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala:
##########
@@ -132,6 +133,84 @@ class GroupAuthorizerIntegrationTest extends 
BaseRequestTest {
     assertEquals(Set(topic), consumeException.unauthorizedTopics.asScala)
   }
 
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly"))

Review Comment:
   this test passes for both consumers I expect right? We should enabled it for 
both



##########
core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala:
##########
@@ -124,12 +128,56 @@ class GroupAuthorizerIntegrationTest extends 
BaseRequestTest {
       () => producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, 
"message".getBytes)).get()).getCause
     assertTrue(produceException.isInstanceOf[TopicAuthorizationException])
     assertEquals(Set(topic), 
produceException.asInstanceOf[TopicAuthorizationException].unauthorizedTopics.asScala)
+  }
 
-    val consumer = createConsumer(configsToRemove = 
List(ConsumerConfig.GROUP_ID_CONFIG))
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testUnauthorizedConsumeUnsubscribe(quorum: String, groupProtocol: 
String): Unit = {
+    val topic = "topic"
+    val topicPartition = new TopicPartition(topic, 0)
+
+    createTopic(topic, listenerName = interBrokerListenerName)
+
+    val consumer = createConsumer()
+    consumer.assign(List(topicPartition).asJava)
+    val consumeException = assertThrows(classOf[TopicAuthorizationException],
+      () => TestUtils.pollUntilAtLeastNumRecords(consumer, numRecords = 1))
+    assertEquals(Set(topic), consumeException.unauthorizedTopics.asScala)
+
+    assertThrows(classOf[GroupAuthorizationException],
+      () => TestUtils.pollUntilAtLeastNumRecords(consumer, numRecords = 1))
+
+    // TODO: use background-event-queue-size metric to check there is 
background event
+    Thread.sleep(3000)
+
+    assertDoesNotThrow(new Executable {
+      override def execute(): Unit = consumer.unsubscribe()
+    })

Review Comment:
   > For ClassicKafkaConsumer#unsubscribe, it doesn't check future object from 
maybeLeaveGroup, so unsubscribe doesn't throw exceptions
   
   Agreed, the unused returned value is the reason why the error doesn't bubble 
up. Still I think we should keep the test for both consumers (it does pass for 
the classic I expect, this is just the reason why it passes)
   
   > I have another question here. IIUC, we don't need to swallow 
TopicAuthorizationException, because we don't check topic permission when doing 
consumer group heartbeat request. So I think we only need to swallow 
GroupAuthorizationException here. WDYT
   
   The trick is that the client internally sends metadata request on poll, and 
is on those that we could get the topic auth error (that would bubble up in the 
consumer on processBackgroundEvents). Because of that I think we need to 
consider that we could receive both (group auth received in a HB response, 
topic auth received in a metadata response)



##########
core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala:
##########
@@ -132,6 +133,84 @@ class GroupAuthorizerIntegrationTest extends 
BaseRequestTest {
     assertEquals(Set(topic), consumeException.unauthorizedTopics.asScala)
   }
 
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly"))
+  def testConsumeUnsubscribeWithoutGroupPermission(quorum: String, 
groupProtocol: String): Unit = {
+    val topic = "topic"
+
+    createTopic(topic, listenerName = interBrokerListenerName)
+
+    // allow topic read/write permission to poll/send record
+    addAndVerifyAcls(
+      Set(createAcl(AclOperation.WRITE, AclPermissionType.ALLOW), 
createAcl(AclOperation.READ, AclPermissionType.ALLOW)),
+      new ResourcePattern(ResourceType.TOPIC, topic, PatternType.LITERAL)
+    )
+    val producer = createProducer()
+    producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, 
"message".getBytes)).get()
+    producer.close()
+
+    // allow group read permission to join group
+    val group = "group"
+    addAndVerifyAcls(
+      Set(createAcl(AclOperation.READ, AclPermissionType.ALLOW)),
+      new ResourcePattern(ResourceType.GROUP, group, PatternType.LITERAL)
+    )
+
+    val props = new Properties()
+    props.put(ConsumerConfig.GROUP_ID_CONFIG, group)
+    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
+    val consumer = createConsumer(configOverrides = props)
+    consumer.subscribe(List(topic).asJava)
+    TestUtils.pollUntilAtLeastNumRecords(consumer, numRecords = 1)
+
+    removeAndVerifyAcls(
+      Set(createAcl(AclOperation.READ, AclPermissionType.ALLOW)),
+      new ResourcePattern(ResourceType.GROUP, group, PatternType.LITERAL)
+    )
+
+    assertDoesNotThrow(new Executable {
+      override def execute(): Unit = consumer.unsubscribe()
+    })
+  }
+
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testConsumeCloseWithoutGroupPermission(quorum: String, groupProtocol: 
String): Unit = {

Review Comment:
   This test is valuable, but if we agree on leaving this PR for the 
unsubscribe to leverage the close/callbacks changes in 
https://github.com/apache/kafka/pull/16686 we should probably bring this test 
back in a separate PR that one goes in? What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to