jolshan commented on a change in pull request #9944:
URL: https://github.com/apache/kafka/pull/9944#discussion_r655642933



##########
File path: core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
##########
@@ -116,61 +122,85 @@ class FetchRequestTest extends BaseRequestTest {
     val fetchRequest1 = createFetchRequest(shuffledTopicPartitions1)
     val fetchResponse1 = sendFetchRequest(leaderId, fetchRequest1)
     checkFetchResponse(shuffledTopicPartitions1, fetchResponse1, 
maxPartitionBytes, maxResponseBytes, messagesPerPartition)
+    val fetchRequest1V12 = createFetchRequest(shuffledTopicPartitions1, 
version = 12)
+    val fetchResponse1V12 = sendFetchRequest(leaderId, fetchRequest1V12)
+    checkFetchResponse(shuffledTopicPartitions1, fetchResponse1V12, 
maxPartitionBytes, maxResponseBytes, messagesPerPartition, 12)
 
     // 2. Same as 1, but shuffled again
     val shuffledTopicPartitions2 = 
random.shuffle(partitionsWithoutLargeMessages) ++ partitionsWithLargeMessages
     val fetchRequest2 = createFetchRequest(shuffledTopicPartitions2)
     val fetchResponse2 = sendFetchRequest(leaderId, fetchRequest2)
     checkFetchResponse(shuffledTopicPartitions2, fetchResponse2, 
maxPartitionBytes, maxResponseBytes, messagesPerPartition)
+    val fetchRequest2V12 = createFetchRequest(shuffledTopicPartitions2, 
version = 12)
+    val fetchResponse2V12 = sendFetchRequest(leaderId, fetchRequest2V12)
+    checkFetchResponse(shuffledTopicPartitions2, fetchResponse2V12, 
maxPartitionBytes, maxResponseBytes, messagesPerPartition, 12)
 
     // 3. Partition with message larger than the partition limit at the start 
of the list
     val shuffledTopicPartitions3 = Seq(partitionWithLargeMessage1, 
partitionWithLargeMessage2) ++
       random.shuffle(partitionsWithoutLargeMessages)
     val fetchRequest3 = createFetchRequest(shuffledTopicPartitions3, 
Map(partitionWithLargeMessage1 -> messagesPerPartition))
     val fetchResponse3 = sendFetchRequest(leaderId, fetchRequest3)
-    assertEquals(shuffledTopicPartitions3, 
fetchResponse3.responseData.keySet.asScala.toSeq)
-    val responseSize3 = fetchResponse3.responseData.asScala.values.map { 
partitionData =>
-      records(partitionData).map(_.sizeInBytes).sum
-    }.sum
-    assertTrue(responseSize3 <= maxResponseBytes)
-    val partitionData3 = 
fetchResponse3.responseData.get(partitionWithLargeMessage1)
-    assertEquals(Errors.NONE.code, partitionData3.errorCode)
-    assertTrue(partitionData3.highWatermark > 0)
-    val size3 = records(partitionData3).map(_.sizeInBytes).sum
-    assertTrue(size3 <= maxResponseBytes, s"Expected $size3 to be smaller than 
$maxResponseBytes")
-    assertTrue(size3 > maxPartitionBytes, s"Expected $size3 to be larger than 
$maxPartitionBytes")
-    assertTrue(maxPartitionBytes < FetchResponse.recordsSize(partitionData3))
+    val fetchRequest3V12 = createFetchRequest(shuffledTopicPartitions3, 
Map(partitionWithLargeMessage1 -> messagesPerPartition), 12)
+    val fetchResponse3V12 = sendFetchRequest(leaderId, fetchRequest3V12)
+    def evaluateResponse3(response: FetchResponse, version: Short = 
ApiKeys.FETCH.latestVersion()) = {
+      val responseData = response.responseData(topicNames, version)
+      assertEquals(shuffledTopicPartitions3, responseData.keySet.asScala.toSeq)
+      val responseSize = responseData.asScala.values.map { partitionData =>
+        records(partitionData).map(_.sizeInBytes).sum
+      }.sum
+      assertTrue(responseSize <= maxResponseBytes)
+      val partitionData = responseData.get(partitionWithLargeMessage1)
+      assertEquals(Errors.NONE.code, partitionData.errorCode)
+      assertTrue(partitionData.highWatermark > 0)
+      val size3 = records(partitionData).map(_.sizeInBytes).sum
+      assertTrue(size3 <= maxResponseBytes, s"Expected $size3 to be smaller 
than $maxResponseBytes")
+      assertTrue(size3 > maxPartitionBytes, s"Expected $size3 to be larger 
than $maxPartitionBytes")
+      assertTrue(maxPartitionBytes < partitionData.records.sizeInBytes)
+    }
+    evaluateResponse3(fetchResponse3)
+    evaluateResponse3(fetchResponse3V12, 12)
 
     // 4. Partition with message larger than the response limit at the start 
of the list
     val shuffledTopicPartitions4 = Seq(partitionWithLargeMessage2, 
partitionWithLargeMessage1) ++
       random.shuffle(partitionsWithoutLargeMessages)
     val fetchRequest4 = createFetchRequest(shuffledTopicPartitions4, 
Map(partitionWithLargeMessage2 -> messagesPerPartition))
     val fetchResponse4 = sendFetchRequest(leaderId, fetchRequest4)
-    assertEquals(shuffledTopicPartitions4, 
fetchResponse4.responseData.keySet.asScala.toSeq)
-    val nonEmptyPartitions4 = 
fetchResponse4.responseData.asScala.toSeq.collect {
-      case (tp, partitionData) if 
records(partitionData).map(_.sizeInBytes).sum > 0 => tp
+    val fetchRequest4V12 = createFetchRequest(shuffledTopicPartitions4, 
Map(partitionWithLargeMessage2 -> messagesPerPartition), 12)
+    val fetchResponse4V12 = sendFetchRequest(leaderId, fetchRequest4V12)
+    def evaluateResponse4(response: FetchResponse, version: Short = 
ApiKeys.FETCH.latestVersion()) = {
+      val responseData = response.responseData(topicNames, version)
+      assertEquals(shuffledTopicPartitions4, responseData.keySet.asScala.toSeq)
+      val nonEmptyPartitions = responseData.asScala.toSeq.collect {
+        case (tp, partitionData) if 
records(partitionData).map(_.sizeInBytes).sum > 0 => tp
+      }
+      assertEquals(Seq(partitionWithLargeMessage2), nonEmptyPartitions)
+      val partitionData = responseData.get(partitionWithLargeMessage2)
+      assertEquals(Errors.NONE.code, partitionData.errorCode)
+      assertTrue(partitionData.highWatermark > 0)
+      val size4 = records(partitionData).map(_.sizeInBytes).sum
+      assertTrue(size4 > maxResponseBytes, s"Expected $size4 to be larger than 
$maxResponseBytes")
+      assertTrue(maxResponseBytes < partitionData.records.sizeInBytes)
     }
-    assertEquals(Seq(partitionWithLargeMessage2), nonEmptyPartitions4)
-    val partitionData4 = 
fetchResponse4.responseData.get(partitionWithLargeMessage2)
-    assertEquals(Errors.NONE.code, partitionData4.errorCode)
-    assertTrue(partitionData4.highWatermark > 0)
-    val size4 = records(partitionData4).map(_.sizeInBytes).sum
-    assertTrue(size4 > maxResponseBytes, s"Expected $size4 to be larger than 
$maxResponseBytes")
-    assertTrue(maxResponseBytes < FetchResponse.recordsSize(partitionData4))
+    evaluateResponse4(fetchResponse4)
+    evaluateResponse4(fetchResponse4V12, 12)
   }
 
   @Test
   def testFetchRequestV2WithOversizedMessage(): Unit = {
     initProducer()
     val maxPartitionBytes = 200
     val (topicPartition, leaderId) = createTopics(numTopics = 1, numPartitions 
= 1).head
+    val topicIds = getTopicIds().asJava
+    val topicNames = topicIds.asScala.map(_.swap).asJava
     producer.send(new ProducerRecord(topicPartition.topic, 
topicPartition.partition,
       "key", new String(new Array[Byte](maxPartitionBytes + 1)))).get
-    val fetchRequest = FetchRequest.Builder.forConsumer(Int.MaxValue, 0, 
createPartitionMap(maxPartitionBytes,
-      Seq(topicPartition))).build(2)
+    val fetchRequest = FetchRequest.Builder.forConsumer(2, Int.MaxValue, 0, 
createPartitionMap(maxPartitionBytes,
+      Seq(topicPartition)), topicIds).build(2)
     val fetchResponse = sendFetchRequest(leaderId, fetchRequest)
-    val partitionData = fetchResponse.responseData.get(topicPartition)
+    val partitionData = fetchResponse.responseData(topicNames, 
2).get(topicPartition)
     assertEquals(Errors.NONE.code, partitionData.errorCode)
+    //assertEquals(Errors.NONE.code, partitionData.errorCode)

Review comment:
       Ah apologies I did not clean up as well as I should have.

##########
File path: 
core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
##########
@@ -154,17 +154,17 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
       classOf[PrincipalBuilder].getName)
   }
 
-  val requestKeyToError = (topicNames: Map[Uuid, String]) => Map[ApiKeys, 
Nothing => Errors](
+  val requestKeyToError = (topicNames: Map[Uuid, String], version: Short) => 
Map[ApiKeys, Nothing => Errors](
     ApiKeys.METADATA -> ((resp: requests.MetadataResponse) => 
resp.errors.asScala.find(_._1 == topic).getOrElse(("test", Errors.NONE))._2),
     ApiKeys.PRODUCE -> ((resp: requests.ProduceResponse) => {
       Errors.forCode(
         resp.data
           .responses.find(topic)
           .partitionResponses.asScala.find(_.index == part).get
           .errorCode
-      )      
+      )
     }),
-    ApiKeys.FETCH -> ((resp: requests.FetchResponse) => 
Errors.forCode(resp.responseData.asScala.find(_._1 == tp).get._2.errorCode)),
+    ApiKeys.FETCH -> ((resp: requests.FetchResponse) => 
Errors.forCode(resp.responseData(topicNames.asJava, version).asScala.find(_._1 
== tp).get._2.errorCode)),

Review comment:
       I can try to pick up all the changes I made that do this.

##########
File path: core/src/main/scala/kafka/server/FetchSession.scala
##########
@@ -471,16 +504,19 @@ class IncrementalFetchContext(private val time: Time,
       if (session.epoch != expectedEpoch) {
         info(s"Incremental fetch session ${session.id} expected epoch 
$expectedEpoch, but " +
           s"got ${session.epoch}.  Possible duplicate request.")
-        FetchResponse.of(Errors.INVALID_FETCH_SESSION_EPOCH, 0, session.id, 
new FetchSession.RESP_MAP)
+        FetchResponse.of(Errors.INVALID_FETCH_SESSION_EPOCH, 0, session.id, 
new FetchSession.RESP_MAP, Collections.emptyMap())
       } else {
+        var error = Errors.NONE
         // Iterate over the update list using PartitionIterator. This will 
prune updates which don't need to be sent
+        // It will also set the top-level error to INCONSISTENT_TOPIC_ID if 
any partitions had this error.
         val partitionIter = new PartitionIterator(updates.entrySet.iterator, 
true)
         while (partitionIter.hasNext) {
-          partitionIter.next()
+          if (partitionIter.next().getValue.errorCode() == 
Errors.INCONSISTENT_TOPIC_ID.code())
+            error = Errors.INCONSISTENT_TOPIC_ID

Review comment:
       @lbradstreet and I discussed this a bit. It seems that the metadata 
cache may be less accurate than the log itself and that is why we did away with 
the metadata check. I am also a little unsure (I'd have to check the code) but 
I'm not sure if the topicId can change. Are we saying that the partition and/or 
the underlying log can change in this code block? I think we can say we will 
read from the partition with that ID.
   
   ```
   val partition = getPartitionOrException(tp)
   val fetchTimeMs = time.milliseconds
   
   // Check if topic ID from the fetch request/session matches the ID in the log
   if (!hasConsistentTopicId(topicIdFromSession(partition.topic), 
partition.topicId))
     throw new InconsistentTopicIdException("Topic ID in the fetch session did 
not match the topic ID in the log.")
     . 
     .
     .
     val readInfo: LogReadInfo = partition.readRecords(
               lastFetchedEpoch = fetchInfo.lastFetchedEpoch,
               fetchOffset = fetchInfo.fetchOffset,
               currentLeaderEpoch = fetchInfo.currentLeaderEpoch,
               maxBytes = adjustedMaxBytes,
               fetchIsolation = fetchIsolation,
               fetchOnlyFromLeader = fetchOnlyFromLeader,
               minOneMessage = minOneMessage)
   
   ```

##########
File path: clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
##########
@@ -362,7 +363,8 @@
     BROKER_ID_NOT_REGISTERED(102, "The given broker ID was not registered.", 
BrokerIdNotRegisteredException::new),
     INCONSISTENT_TOPIC_ID(103, "The log's topic ID did not match the topic ID 
in the request", InconsistentTopicIdException::new),
     INCONSISTENT_CLUSTER_ID(104, "The clusterId in the request does not match 
that found on the server", InconsistentClusterIdException::new),
-    TRANSACTIONAL_ID_NOT_FOUND(105, "The transactionalId could not be found", 
TransactionalIdNotFoundException::new);
+    TRANSACTIONAL_ID_NOT_FOUND(105, "The transactionalId could not be found", 
TransactionalIdNotFoundException::new),
+    FETCH_SESSION_TOPIC_ID_ERROR(106, "The fetch session encountered 
inconsistent topic ID usage", FetchSessionTopicIdException::new);

Review comment:
       I think the main reason why I made the session ID error was that the 
inconsistent topic ID error's message was too specific for this use case. I 
suppose we could just make all the errors here session errors. I do like the 
inconsistent ID error specifying the log (and being on the partition with the 
issue), but we can change this.

##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
##########
@@ -71,6 +79,14 @@ public FetchResponseData data() {
         return data;
     }
 
+    /**
+     * From version 3 or later, the authorized and existing entries in 
`FetchRequest.fetchData` should be in the same order in `responseData`.
+     * Version 13 introduces topic IDs which can lead to a few new errors. If 
there is any unknown topic ID in the request, the
+     * response will contain a top-level UNKNOWN_TOPIC_ID error and 
UNKNOWN_TOPIC_ID errors on all the partitions.
+     * If a request's topic ID usage is inconsistent with the session, we will 
return a top level FETCH_SESSION_TOPIC_ID_ERROR error.
+     * We may also return INCONSISTENT_TOPIC_ID error as a top-level error as 
well as an error for a given partition when that partition in the session has a 
topic ID

Review comment:
       It is both a top level and partition error here. I kind of like being 
able to identify the partition (kind of wish the other errors could do this in 
some cases), but we can change this.

##########
File path: core/src/main/scala/kafka/server/metadata/RaftMetadataCache.scala
##########
@@ -226,6 +226,18 @@ class RaftMetadataCache(val brokerId: Int) extends 
MetadataCache with Logging {
     _currentImage.partitions.numTopicPartitions(topic)
   }
 
+  override def topicNamesToIds(): util.Map[String, Uuid] = {
+    _currentImage.partitions.copyReverseIdMap()
+  }
+
+  override def topicIdsToNames(): util.Map[Uuid, String] = {
+    _currentImage.partitions.copyIdMap()
+  }
+
+  override def topicIdInfo(): (util.Map[String, Uuid], util.Map[Uuid, String]) 
= {
+    (topicNamesToIds(), topicIdsToNames())

Review comment:
       Ok. I see what you mean here.

##########
File path: core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
##########
@@ -216,12 +217,21 @@ class ReplicaFetcherThread(name: String,
     try {
       val clientResponse = leaderEndpoint.sendRequest(fetchRequest)
       val fetchResponse = 
clientResponse.responseBody.asInstanceOf[FetchResponse]
-      if (!fetchSessionHandler.handleResponse(fetchResponse)) {
-        Map.empty
+      if (!fetchSessionHandler.handleResponse(fetchResponse, 
clientResponse.requestHeader().apiVersion())) {
+        if (fetchResponse.error == Errors.UNKNOWN_TOPIC_ID)
+          throw new UnknownTopicIdException("There was a topic ID in the 
request that was unknown to the server.")
+        else if (fetchResponse.error == Errors.FETCH_SESSION_TOPIC_ID_ERROR)
+          throw new FetchSessionTopicIdException("There was a topic ID in the 
request that was inconsistent with the session.")
+        else if (fetchResponse.error == Errors.INCONSISTENT_TOPIC_ID)
+          throw new InconsistentTopicIdException("There was a topic ID in the 
request that was inconsistent with the one in the logs.")
+        else

Review comment:
       Ah good point. I can look into this.

##########
File path: 
core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
##########
@@ -746,7 +753,34 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
       ApiKeys.DELETE_TOPICS -> deleteTopicsRequest
     )
 
-    sendRequests(requestKeyToRequest)
+    sendRequests(requestKeyToRequest, true)
+  }
+
+  /*
+   * even if the topic doesn't exist, request APIs should not leak the topic 
name
+   */
+  @Test
+  def testAuthorizationWithTopicNotExisting(): Unit = {
+    val id = Uuid.randomUuid()
+    val topicNames = Map(id -> "topic")
+    val requestKeyToRequest = mutable.LinkedHashMap[ApiKeys, AbstractRequest](
+      ApiKeys.METADATA -> createMetadataRequest(allowAutoTopicCreation = 
false),
+      ApiKeys.PRODUCE -> createProduceRequest,
+      ApiKeys.FETCH -> createFetchRequestWithUnknownTopic(id, 
ApiKeys.FETCH.latestVersion()),
+      ApiKeys.LIST_OFFSETS -> createListOffsetsRequest,
+      ApiKeys.OFFSET_COMMIT -> createOffsetCommitRequest,
+      ApiKeys.OFFSET_FETCH -> createOffsetFetchRequest,
+      ApiKeys.DELETE_TOPICS -> deleteTopicsRequest,
+      ApiKeys.DELETE_RECORDS -> deleteRecordsRequest,
+      ApiKeys.ADD_PARTITIONS_TO_TXN -> addPartitionsToTxnRequest,
+      ApiKeys.ADD_OFFSETS_TO_TXN -> addOffsetsToTxnRequest,
+      ApiKeys.CREATE_PARTITIONS -> createPartitionsRequest,
+      ApiKeys.DELETE_GROUPS -> deleteGroupsRequest,
+      ApiKeys.OFFSET_FOR_LEADER_EPOCH -> offsetsForLeaderEpochRequest,
+      ApiKeys.ELECT_LEADERS -> electLeadersRequest
+    )
+
+    sendRequests(requestKeyToRequest, false, topicNames)

Review comment:
       I think because we still want to build the request. My understanding is 
that the topic is non-existing on the receiving side, but we still want to 
receive and handle the response.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to