lianetm commented on code in PR #17516:
URL: https://github.com/apache/kafka/pull/17516#discussion_r1824955585
##########
core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala:
##########
@@ -124,12 +128,56 @@ class GroupAuthorizerIntegrationTest extends
BaseRequestTest {
() => producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic,
"message".getBytes)).get()).getCause
assertTrue(produceException.isInstanceOf[TopicAuthorizationException])
assertEquals(Set(topic),
produceException.asInstanceOf[TopicAuthorizationException].unauthorizedTopics.asScala)
+ }
- val consumer = createConsumer(configsToRemove =
List(ConsumerConfig.GROUP_ID_CONFIG))
+ @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+ @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+ def testUnauthorizedConsumeUnsubscribe(quorum: String, groupProtocol:
String): Unit = {
+ val topic = "topic"
+ val topicPartition = new TopicPartition(topic, 0)
+
+ createTopic(topic, listenerName = interBrokerListenerName)
+
+ val consumer = createConsumer()
+ consumer.assign(List(topicPartition).asJava)
+ val consumeException = assertThrows(classOf[TopicAuthorizationException],
+ () => TestUtils.pollUntilAtLeastNumRecords(consumer, numRecords = 1))
+ assertEquals(Set(topic), consumeException.unauthorizedTopics.asScala)
+
+ assertThrows(classOf[GroupAuthorizationException],
+ () => TestUtils.pollUntilAtLeastNumRecords(consumer, numRecords = 1))
+
+ // TODO: use background-event-queue-size metric to check there is
background event
+ Thread.sleep(3000)
+
+ assertDoesNotThrow(new Executable {
+ override def execute(): Unit = consumer.unsubscribe()
+ })
Review Comment:
uhm I think we may be not getting the classic consumer behaviour right here.
I would expect this assertion passes for the classic consumer here only because
of how the test is written (never finds a coordinator), but that doesn't mean
that the classic consumer does not throw topic or group auth exceptions on
unsubscribe (which is what the changes in this PR are trying to achieve for the
new consumer)
1. I believe classic does not throw GroupAuthException here only because it
never finds a coordinator, so the unsubscribes perform no action on the group
(but if it had had a known coordinator, the unsubscribe would send a leave
group that I expect would fail with GroupAuthException)
2. I believe classic does not throw TopicAuthException here only because
unsubscribe does not poll the network client if it doesn't have a coordinator
to send the leave group to.
I could definitely be missing something, but we could validate my
expectations with an integration test here:
- consumer subscribes to a group successfully (has acls to READ + GROUP +
"group-name-from-config")
- looses the acls (using `removeAndVerifyAcls`)
- consumer.unsubscribe -> I would expect that this should indeed throw a
group authorization exception received in the response to the leave group, or
the topic auth exception propagated from metadata (honestly not sure about the
precedence, but both should be there)
##########
core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala:
##########
@@ -124,12 +128,56 @@ class GroupAuthorizerIntegrationTest extends
BaseRequestTest {
() => producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic,
"message".getBytes)).get()).getCause
assertTrue(produceException.isInstanceOf[TopicAuthorizationException])
assertEquals(Set(topic),
produceException.asInstanceOf[TopicAuthorizationException].unauthorizedTopics.asScala)
+ }
- val consumer = createConsumer(configsToRemove =
List(ConsumerConfig.GROUP_ID_CONFIG))
+ @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+ @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+ def testUnauthorizedConsumeUnsubscribe(quorum: String, groupProtocol:
String): Unit = {
+ val topic = "topic"
+ val topicPartition = new TopicPartition(topic, 0)
+
+ createTopic(topic, listenerName = interBrokerListenerName)
+
+ val consumer = createConsumer()
+ consumer.assign(List(topicPartition).asJava)
+ val consumeException = assertThrows(classOf[TopicAuthorizationException],
+ () => TestUtils.pollUntilAtLeastNumRecords(consumer, numRecords = 1))
+ assertEquals(Set(topic), consumeException.unauthorizedTopics.asScala)
+
+ assertThrows(classOf[GroupAuthorizationException],
+ () => TestUtils.pollUntilAtLeastNumRecords(consumer, numRecords = 1))
+
+ // TODO: use background-event-queue-size metric to check there is
background event
+ Thread.sleep(3000)
+
+ assertDoesNotThrow(new Executable {
+ override def execute(): Unit = consumer.unsubscribe()
+ })
+ }
+
+ @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+ @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+ def testUnauthorizedConsumeClose(quorum: String, groupProtocol: String):
Unit = {
+ val topic = "topic"
+ val topicPartition = new TopicPartition(topic, 0)
+
+ createTopic(topic, listenerName = interBrokerListenerName)
+
+ val consumer = createConsumer()
consumer.assign(List(topicPartition).asJava)
val consumeException = assertThrows(classOf[TopicAuthorizationException],
() => TestUtils.pollUntilAtLeastNumRecords(consumer, numRecords = 1))
assertEquals(Set(topic), consumeException.unauthorizedTopics.asScala)
+
+ assertThrows(classOf[GroupAuthorizationException],
+ () => TestUtils.pollUntilAtLeastNumRecords(consumer, numRecords = 1))
+
+ // TODO: use background-event-queue-size metric to check there is
background event
+ Thread.sleep(3000)
+
+ assertDoesNotThrow(new Executable {
+ override def execute(): Unit = consumer.close()
Review Comment:
similar situation with close. I don't believe that the consumer swallows the
exceptions, I believe they just don't come out here because there is no known
coordinator (so no group request or client poll on close). If my understanding
is right, we shouldn't swallow them either on the new consumer
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]