dajac commented on a change in pull request #9979:
URL: https://github.com/apache/kafka/pull/9979#discussion_r565057033



##########
File path: 
clients/src/main/resources/common/message/DescribeProducersResponse.json
##########
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 61,
+  "type": "response",
+  "name": "DescribeProducersResponse",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  "fields": [
+    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", 
"ignorable": true,

Review comment:
       nit: `ignorable` not really necessary as we start from version 0.

##########
File path: 
clients/src/main/resources/common/message/DescribeProducersResponse.json
##########
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 61,
+  "type": "response",
+  "name": "DescribeProducersResponse",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  "fields": [
+    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", 
"ignorable": true,
+      "about": "The duration in milliseconds for which the request was 
throttled due to a quota violation, or zero if the request did not violate any 
quota." },
+    { "name": "Topics", "type": "[]TopicResponse", "versions": "0+",
+      "about": "Each topic in the response.", "fields": [
+      { "name": "Name", "type": "string", "versions": "0+", "entityType": 
"topicName",
+        "about": "The topic name" },
+      { "name": "Partitions", "type": "[]PartitionResponse", "versions": "0+",
+        "about": "Each partition in the response.", "fields": [
+        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
+          "about": "The partition index." },
+        { "name": "ErrorCode", "type": "int16", "versions": "0+",
+          "about": "The partition error code, or 0 if there was no error." },

Review comment:
       Should we consider adding `ErrorMessage` as well? We tend to have it 
nowadays in many responses but I am not sure if it is a generality.

##########
File path: 
clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
##########
@@ -511,6 +513,9 @@ public void testSerialization() throws Exception {
         checkRequest(createAlterClientQuotasRequest(), true);
         checkErrorResponse(createAlterClientQuotasRequest(), 
unknownServerException, true);
         checkResponse(createAlterClientQuotasResponse(), 0, true);
+        checkRequest(createDescribeProducersRequest(), true);
+        checkErrorResponse(createDescribeProducersRequest(), 
unknownServerException, true);
+        checkResponse(createDescribeProducersResponse(), 0, true);

Review comment:
       nit: This test case is getting quite big, too big. Last time I added a 
new request/response, I went with a dedicated unit test for it. See 
`testDescribeClusterSerialization`. I wanted to refactor it to generalize this 
pattern but did not have the time to do it yet. What do you think about this?

##########
File path: core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
##########
@@ -3082,4 +3082,76 @@ class KafkaApisTest {
 
     assertEquals(expectedSize, 
KafkaApis.sizeOfThrottledPartitions(FetchResponseData.HIGHEST_SUPPORTED_VERSION,
 response, quota))
   }
+
+  @Test
+  def testDescribeProducersAuthorization(): Unit = {
+    val tp1 = new TopicPartition("foo", 0)
+    val tp2 = new TopicPartition("bar", 3)
+
+    setupBasicMetadataCache(tp1.topic, 4) // We will only access the first 
topic
+    val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer])
+    val data = new DescribeProducersRequestData().setTopics(List(
+      new DescribeProducersRequestData.TopicRequest()
+        .setName(tp1.topic)
+        .setPartitionIndexes(List(Int.box(tp1.partition)).asJava),
+      new DescribeProducersRequestData.TopicRequest()
+        .setName(tp2.topic)
+        .setPartitionIndexes(List(Int.box(tp2.partition)).asJava)
+    ).asJava)
+    val describeProducersRequest = new 
DescribeProducersRequest.Builder(data).build()
+    val request = buildRequest(describeProducersRequest)
+    val capturedResponse = expectNoThrottling()
+
+    def buildExpectedActions(topic: String): util.List[Action] = {
+      val pattern = new ResourcePattern(ResourceType.TOPIC, topic, 
PatternType.LITERAL)
+      val action = new Action(AclOperation.READ, pattern, 1, true, true)
+      Collections.singletonList(action)
+    }
+
+    EasyMock.expect(authorizer.authorize(anyObject[RequestContext], 
EasyMock.eq(buildExpectedActions(tp1.topic))))
+      .andReturn(Seq(AuthorizationResult.ALLOWED).asJava)
+      .once()
+
+    EasyMock.expect(authorizer.authorize(anyObject[RequestContext], 
EasyMock.eq(buildExpectedActions(tp2.topic))))
+      .andReturn(Seq(AuthorizationResult.DENIED).asJava)
+      .once()
+
+    EasyMock.expect(replicaManager.activeProducerState(tp1))
+      .andReturn(new DescribeProducersResponseData.PartitionResponse()
+        .setErrorCode(Errors.NONE.code)
+        .setPartitionIndex(tp1.partition)
+        .setActiveProducers(List(
+          new DescribeProducersResponseData.ProducerState()
+            .setProducerId(12345L)
+            .setProducerEpoch(15)
+            .setLastSequence(100)
+            .setLastTimestamp(time.milliseconds())
+            .setCurrentTxnStartOffset(-1)
+            .setCoordinatorEpoch(200)
+        ).asJava))
+
+    EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, 
txnCoordinator, authorizer)
+    createKafkaApis(authorizer = 
Some(authorizer)).handleDescribeProducersRequest(request)
+
+    val response = readResponse(describeProducersRequest, capturedResponse)
+      .asInstanceOf[DescribeProducersResponse]
+    assertEquals(2, response.data.topics.size())
+
+    val fooTopic = response.data.topics.asScala.find(_.name == tp1.topic).get
+    val fooPartition = fooTopic.partitions.asScala.find(_.partitionIndex == 
tp1.partition).get
+    assertEquals(Errors.NONE.code, fooPartition.errorCode)
+    assertEquals(1, fooPartition.activeProducers.size)
+    val fooProducer = fooPartition.activeProducers.get(0)
+    assertEquals(12345L, fooProducer.producerId)
+    assertEquals(15, fooProducer.producerEpoch)
+    assertEquals(100, fooProducer.lastSequence)
+    assertEquals(time.milliseconds(), fooProducer.lastTimestamp)
+    assertEquals(-1, fooProducer.currentTxnStartOffset)
+    assertEquals(200, fooProducer.coordinatorEpoch)
+
+    val barTopic = response.data.topics.asScala.find(_.name == tp2.topic).get
+    val barPartition = barTopic.partitions.asScala.find(_.partitionIndex == 
tp2.partition).get
+    assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED.code, 
barPartition.errorCode)
+  }

Review comment:
       I wonder if we could add a third topic to test the 
`UNKNOWN_TOPIC_OR_PARTITION` case. What do you think?

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -3365,6 +3366,44 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
+  def handleDescribeProducersRequest(request: RequestChannel.Request): Unit = {
+    val describeProducersRequest = request.body[DescribeProducersRequest]
+
+    def partitionError(topicPartition: TopicPartition, error: Errors): 
DescribeProducersResponseData.PartitionResponse = {
+      new DescribeProducersResponseData.PartitionResponse()
+        .setPartitionIndex(topicPartition.partition)
+        .setErrorCode(error.code)
+    }
+
+    val response = new DescribeProducersResponseData()
+    describeProducersRequest.data.topics.forEach { topicRequest =>
+      val topicResponse = new DescribeProducersResponseData.TopicResponse()
+        .setName(topicRequest.name)
+      val topicError = if (!authHelper.authorize(request.context, READ, TOPIC, 
topicRequest.name))
+        Some(Errors.TOPIC_AUTHORIZATION_FAILED)
+      else if (!metadataCache.contains(topicRequest.name))
+        Some(Errors.UNKNOWN_TOPIC_OR_PARTITION)
+      else
+        None
+
+      topicRequest.partitionIndexes.forEach { partitionId =>
+        val topicPartition = new TopicPartition(topicRequest.name, partitionId)
+        val partitionResponse = topicError match {
+          case Some(error) => partitionError(topicPartition, error)
+          case None => replicaManager.activeProducerState(topicPartition)
+        }
+        topicResponse.partitions.add(partitionResponse)
+      }
+
+      if (!topicResponse.partitions.isEmpty) {
+        response.topics.add(topicResponse)
+      }

Review comment:
       What's the rational for this? Is it because we can't convey a possible 
topic-level error in the response if we don't have at least one partition for 
it?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to