lianetm commented on code in PR #17516: URL: https://github.com/apache/kafka/pull/17516#discussion_r1837107494
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ########## @@ -1276,10 +1277,12 @@ private void releaseAssignmentAndLeaveGroup(final Timer timer) { UnsubscribeEvent unsubscribeEvent = new UnsubscribeEvent(calculateDeadlineMs(timer)); applicationEventHandler.add(unsubscribeEvent); try { - // If users subscribe to an invalid topic name, they will get InvalidTopicException in error events, - // because network thread keeps trying to send MetadataRequest in the background. - // Ignore it to avoid unsubscribe failed. - processBackgroundEvents(unsubscribeEvent.future(), timer, e -> e instanceof InvalidTopicException); + // If users subscribe to a topic with invalid name or without permission, they will get some exceptions. + // Because network thread keeps trying to send MetadataRequest or ConsumerGroupHeartbeatRequest in the background, + // there will be some error events in the background queue. + // When running close, these exceptions should be ignored, or users can't close successfully. + processBackgroundEvents(unsubscribeEvent.future(), timer, + e -> e instanceof InvalidTopicException || e instanceof GroupAuthorizationException); Review Comment: what about reverting these changes to keep this PR addressing the unsubscribe only? (aligned with the PR name). I think it will be convenient because there is another ongoing PR https://github.com/apache/kafka/pull/16686 changing the close, and this "releaseAssignmentAndLeaveGroup" does not processBackgroundEvents anymore, so the issue you're trying to solve here disappears (we might remain with the other issue of events unaware of metadata errors, but that is already addressed in another PR https://github.com/apache/kafka/pull/17440 too so let's address those concerns there). Makes sense? ########## core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala: ########## @@ -132,6 +133,84 @@ class GroupAuthorizerIntegrationTest extends BaseRequestTest { assertEquals(Set(topic), consumeException.unauthorizedTopics.asScala) } + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly")) Review Comment: this test passes for both consumers I expect right? We should enabled it for both ########## core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala: ########## @@ -124,12 +128,56 @@ class GroupAuthorizerIntegrationTest extends BaseRequestTest { () => producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, "message".getBytes)).get()).getCause assertTrue(produceException.isInstanceOf[TopicAuthorizationException]) assertEquals(Set(topic), produceException.asInstanceOf[TopicAuthorizationException].unauthorizedTopics.asScala) + } - val consumer = createConsumer(configsToRemove = List(ConsumerConfig.GROUP_ID_CONFIG)) + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testUnauthorizedConsumeUnsubscribe(quorum: String, groupProtocol: String): Unit = { + val topic = "topic" + val topicPartition = new TopicPartition(topic, 0) + + createTopic(topic, listenerName = interBrokerListenerName) + + val consumer = createConsumer() + consumer.assign(List(topicPartition).asJava) + val consumeException = assertThrows(classOf[TopicAuthorizationException], + () => TestUtils.pollUntilAtLeastNumRecords(consumer, numRecords = 1)) + assertEquals(Set(topic), consumeException.unauthorizedTopics.asScala) + + assertThrows(classOf[GroupAuthorizationException], + () => TestUtils.pollUntilAtLeastNumRecords(consumer, numRecords = 1)) + + // TODO: use background-event-queue-size metric to check there is background event + Thread.sleep(3000) + + assertDoesNotThrow(new Executable { + override def execute(): Unit = consumer.unsubscribe() + }) Review Comment: > For ClassicKafkaConsumer#unsubscribe, it doesn't check future object from maybeLeaveGroup, so unsubscribe doesn't throw exceptions Agreed, the unused returned value is the reason why the error doesn't bubble up. Still I think we should keep the test for both consumers (it does pass for the classic I expect, this is just the reason why it passes) > I have another question here. IIUC, we don't need to swallow TopicAuthorizationException, because we don't check topic permission when doing consumer group heartbeat request. So I think we only need to swallow GroupAuthorizationException here. WDYT The trick is that the client internally sends metadata request on poll, and is on those that we could get the topic auth error (that would bubble up in the consumer on processBackgroundEvents). Because of that I think we need to consider that we could receive both (group auth received in a HB response, topic auth received in a metadata response) ########## core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala: ########## @@ -132,6 +133,84 @@ class GroupAuthorizerIntegrationTest extends BaseRequestTest { assertEquals(Set(topic), consumeException.unauthorizedTopics.asScala) } + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly")) + def testConsumeUnsubscribeWithoutGroupPermission(quorum: String, groupProtocol: String): Unit = { + val topic = "topic" + + createTopic(topic, listenerName = interBrokerListenerName) + + // allow topic read/write permission to poll/send record + addAndVerifyAcls( + Set(createAcl(AclOperation.WRITE, AclPermissionType.ALLOW), createAcl(AclOperation.READ, AclPermissionType.ALLOW)), + new ResourcePattern(ResourceType.TOPIC, topic, PatternType.LITERAL) + ) + val producer = createProducer() + producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, "message".getBytes)).get() + producer.close() + + // allow group read permission to join group + val group = "group" + addAndVerifyAcls( + Set(createAcl(AclOperation.READ, AclPermissionType.ALLOW)), + new ResourcePattern(ResourceType.GROUP, group, PatternType.LITERAL) + ) + + val props = new Properties() + props.put(ConsumerConfig.GROUP_ID_CONFIG, group) + props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") + val consumer = createConsumer(configOverrides = props) + consumer.subscribe(List(topic).asJava) + TestUtils.pollUntilAtLeastNumRecords(consumer, numRecords = 1) + + removeAndVerifyAcls( + Set(createAcl(AclOperation.READ, AclPermissionType.ALLOW)), + new ResourcePattern(ResourceType.GROUP, group, PatternType.LITERAL) + ) + + assertDoesNotThrow(new Executable { + override def execute(): Unit = consumer.unsubscribe() + }) + } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testConsumeCloseWithoutGroupPermission(quorum: String, groupProtocol: String): Unit = { Review Comment: This test is valuable, but if we agree on leaving this PR for the unsubscribe to leverage the close/callbacks changes in https://github.com/apache/kafka/pull/16686 we should probably bring this test back in a separate PR that one goes in? What do you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org