jolshan commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r746836952



##########
File path: core/src/test/scala/unit/kafka/server/FetchSessionTest.scala
##########
@@ -877,34 +935,341 @@ class FetchSessionTest {
 
     // Create an incremental fetch request as though no topics changed.
     val reqData2 = new util.LinkedHashMap[TopicPartition, 
FetchRequest.PartitionData]
-    val request2 = createRequest(new JFetchMetadata(resp1.sessionId(), 1), 
reqData2, topicIds, EMPTY_PART_LIST, false)
+    val request2 = createRequest(new JFetchMetadata(resp1.sessionId(), 1), 
reqData2, EMPTY_PART_LIST, false)
     // Simulate ID changing on server.
     val topicNamesFooChanged =  Map(topicIds.get("bar") -> "bar", 
Uuid.randomUuid() -> "foo").asJava
-    val topicIdsFooChanged = topicNamesFooChanged.asScala.map(_.swap).asJava
     val context2 = fetchManager.newContext(
       request2.version,
       request2.metadata,
       request2.isFromFollower,
       request2.fetchData(topicNamesFooChanged),
       request2.forgottenTopics(topicNamesFooChanged),
-      topicIdsFooChanged
+      topicNamesFooChanged
     )
     assertEquals(classOf[IncrementalFetchContext], context2.getClass)
-    val respData2 = new util.LinkedHashMap[TopicPartition, 
FetchResponseData.PartitionData]
+    val respData2 = new util.LinkedHashMap[TopicIdPartition, 
FetchResponseData.PartitionData]
     // Likely if the topic ID is different in the broker, it will be different 
in the log. Simulate the log check finding an inconsistent ID.
-    respData2.put(new TopicPartition("foo", 0), new 
FetchResponseData.PartitionData()
+    respData2.put(tp0, new FetchResponseData.PartitionData()
       .setPartitionIndex(0)
       .setHighWatermark(-1)
       .setLastStableOffset(-1)
       .setLogStartOffset(-1)
       .setErrorCode(Errors.INCONSISTENT_TOPIC_ID.code))
     val resp2 = context2.updateAndGenerateResponseData(respData2)
 
-    assertEquals(Errors.INCONSISTENT_TOPIC_ID, resp2.error)
+    assertEquals(Errors.NONE, resp2.error)
+    assertTrue(resp2.sessionId > 0)
+    val responseData2 = resp2.responseData(topicNames, request2.version)
+    // We should have the inconsistent topic ID error on the partition
+    assertEquals(Errors.INCONSISTENT_TOPIC_ID.code, 
responseData2.get(tp0.topicPartition).errorCode)
+  }
+
+  private def noErrorResponse: FetchResponseData.PartitionData = {
+    new FetchResponseData.PartitionData()
+      .setPartitionIndex(1)
+      .setHighWatermark(10)
+      .setLastStableOffset(10)
+      .setLogStartOffset(10)
+  }
+
+  private def errorResponse(errorCode: Short): FetchResponseData.PartitionData 
 = {
+    new FetchResponseData.PartitionData()
+      .setPartitionIndex(0)
+      .setHighWatermark(-1)
+      .setLastStableOffset(-1)
+      .setLogStartOffset(-1)
+      .setErrorCode(errorCode)
+  }
+
+  @Test
+  def testResolveUnknownPartitions(): Unit = {
+    val time = new MockTime()
+    val cache = new FetchSessionCache(10, 1000)
+    val fetchManager = new FetchManager(time, cache)
+
+    def newContext(
+      metadata: JFetchMetadata,
+      partitions: Seq[TopicIdPartition],
+      topicNames: Map[Uuid, String] // Topic ID to name mapping known by the 
broker.
+    ): FetchContext = {
+      val data = new util.LinkedHashMap[TopicPartition, 
FetchRequest.PartitionData]
+      partitions.foreach { topicIdPartition =>
+        data.put(
+          topicIdPartition.topicPartition,
+          new FetchRequest.PartitionData(topicIdPartition.topicId, 0, 0, 100, 
Optional.empty())
+        )
+      }
+
+      val fetchRequest = createRequest(metadata, data, EMPTY_PART_LIST, false)
+
+      fetchManager.newContext(
+        fetchRequest.version,
+        fetchRequest.metadata,
+        fetchRequest.isFromFollower,
+        fetchRequest.fetchData(topicNames.asJava),
+        fetchRequest.forgottenTopics(topicNames.asJava),
+        topicNames.asJava
+      )
+    }
+
+    def updateAndGenerateResponseData(
+      context: FetchContext
+    ): Int = {
+      val data = new util.LinkedHashMap[TopicIdPartition, 
FetchResponseData.PartitionData]
+      context.foreachPartition { (topicIdPartition, _) =>
+        data.put(
+          topicIdPartition,
+          if (topicIdPartition.topicId == Uuid.ZERO_UUID)
+            errorResponse(Errors.UNKNOWN_TOPIC_ID.code)
+          else
+            noErrorResponse
+        )
+      }
+      context.updateAndGenerateResponseData(data).sessionId
+    }
+
+    val foo = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 0))
+    val bar = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("bar", 0))
+    val zar = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("zar", 0))
+
+    val fooUnresolved = new TopicIdPartition(foo.topicId, new 
TopicPartition(null, foo.partition))
+    val barUnresolved = new TopicIdPartition(bar.topicId, new 
TopicPartition(null, bar.partition))
+    val zarUnresolved = new TopicIdPartition(zar.topicId, new 
TopicPartition(null, zar.partition))
+
+    // The metadata cache does not know about the topic.
+    val context1 = newContext(
+      JFetchMetadata.INITIAL,
+      Seq(foo, bar, zar),
+      Map.empty[Uuid, String]
+    )
+
+    // So the context contains unresolved partitions.
+    assertEquals(classOf[FullFetchContext], context1.getClass)
+    assertPartitionsOrder(context1, Seq(fooUnresolved, barUnresolved, 
zarUnresolved))
+
+    // The response is sent back to create the session.
+    val sessionId = updateAndGenerateResponseData(context1)
+
+    // The metadata cache only knows about foo.
+    val context2 = newContext(
+      new JFetchMetadata(sessionId, 1),
+      Seq.empty,
+      Map(foo.topicId -> foo.topic)
+    )
+
+    // So foo is resolved but not the others.
+    assertEquals(classOf[IncrementalFetchContext], context2.getClass)
+    assertPartitionsOrder(context2, Seq(foo, barUnresolved, zarUnresolved))
+
+    updateAndGenerateResponseData(context2)
+
+    // The metadata cache knows about foo and bar.
+    val context3 = newContext(
+      new JFetchMetadata(sessionId, 2),
+      Seq(bar),
+      Map(foo.topicId -> foo.topic, bar.topicId -> bar.topic)
+    )
+
+    // So foo and bar are resolved.
+    assertEquals(classOf[IncrementalFetchContext], context3.getClass)
+    assertPartitionsOrder(context3, Seq(foo, bar, zarUnresolved))
+
+    updateAndGenerateResponseData(context3)
+
+    // The metadata cache knows about all topics.
+    val context4 = newContext(
+      new JFetchMetadata(sessionId, 3),
+      Seq.empty,
+      Map(foo.topicId -> foo.topic, bar.topicId -> bar.topic, zar.topicId -> 
zar.topic)
+    )
+
+    // So all topics are resolved.
+    assertEquals(classOf[IncrementalFetchContext], context4.getClass)
+    assertPartitionsOrder(context4, Seq(foo, bar, zar))
+
+    updateAndGenerateResponseData(context4)
+
+    // The metadata cache does not know about the topics anymore (e.g. 
deleted).
+    val context5 = newContext(
+      new JFetchMetadata(sessionId, 4),
+      Seq.empty,
+      Map.empty
+    )
+
+    // All topics remain resolved.
+    assertEquals(classOf[IncrementalFetchContext], context5.getClass)
+    assertPartitionsOrder(context4, Seq(foo, bar, zar))
+  }
+
+  // This test either simulates an update to a partition using all possible 
topic ID usage combinations.
+  // The possible change will be found in an update from the partition.
+  @ParameterizedTest
+  @MethodSource(Array("idUsageCombinations"))
+  def testUpdatedPartitionResolvesId(startsWithTopicIdsInMetadataCache: 
Boolean, endsWithTopicIdsInMetadataCache: Boolean): Unit = {
+    // TODO: make cleaner
+    val time = new MockTime()
+    val cache = new FetchSessionCache(10, 1000)
+    val fetchManager = new FetchManager(time, cache)
+    val fooId = Uuid.randomUuid()
+    val topicNames = Map(fooId -> "foo").asJava
+    val tp0 = new TopicPartition("foo", 0)
+    val tidp0 = new TopicIdPartition(fooId, tp0)
+    val nullTidp0 = new TopicIdPartition(fooId, new TopicPartition(null, 
tp0.partition))
+
+    // Create a new fetch session with foo-0
+    val reqData1 = new util.LinkedHashMap[TopicPartition, 
FetchRequest.PartitionData]
+    reqData1.put(tp0, new FetchRequest.PartitionData(fooId, 0, 0, 100,
+      Optional.empty()))
+    val request1 = createRequest(JFetchMetadata.INITIAL, reqData1, 
EMPTY_PART_LIST, false)
+    val topicNamesForRequest1 = if (startsWithTopicIdsInMetadataCache) 
topicNames else Map[Uuid, String]().asJava
+    // Start a fetch session. Simulate no error or unknown topic ID depending 
on whether we used topic IDs.
+    val context1 = fetchManager.newContext(
+      request1.version,
+      request1.metadata,
+      request1.isFromFollower,
+      request1.fetchData(topicNamesForRequest1),
+      request1.forgottenTopics(topicNamesForRequest1),
+      topicNamesForRequest1
+    )
+    assertEquals(classOf[FullFetchContext], context1.getClass)
+    val partitionsInSession1 = if (startsWithTopicIdsInMetadataCache) 
Seq(tidp0) else Seq(nullTidp0)
+    assertPartitionsOrder(context1, partitionsInSession1)
+    val respData1 = new util.LinkedHashMap[TopicIdPartition, 
FetchResponseData.PartitionData]
+    val errorCode1 = if (startsWithTopicIdsInMetadataCache) Errors.NONE.code 
else Errors.UNKNOWN_TOPIC_ID.code
+    val fooResponseTp1 = if (startsWithTopicIdsInMetadataCache) tidp0 else 
nullTidp0
+    val fooResponse1 = if (startsWithTopicIdsInMetadataCache) noErrorResponse 
else errorResponse(errorCode1)
+    respData1.put(fooResponseTp1, fooResponse1)
+    val resp1 = context1.updateAndGenerateResponseData(respData1)
+    assertEquals(Errors.NONE, resp1.error())
+    assertTrue(resp1.sessionId() != INVALID_SESSION_ID)
+    val responseData1 = resp1.responseData(topicNames, request1.version)
+    assertEquals(errorCode1, responseData1.get(tp0).errorCode)
+
+    // Create an incremental fetch request with an update to the partition.
+    val reqData2 = new util.LinkedHashMap[TopicPartition, 
FetchRequest.PartitionData]
+    reqData2.put(tp0, new FetchRequest.PartitionData(fooId, 0, 0, 100,
+      Optional.empty()))
+    val request2 = createRequest(new JFetchMetadata(resp1.sessionId(), 1), 
reqData2, EMPTY_PART_LIST, false)
+    val topicNamesForRequest2 = if (endsWithTopicIdsInMetadataCache) Map(fooId 
-> "foo").asJava else Map[Uuid, String]().asJava
+    val context2 = fetchManager.newContext(
+      request2.version,
+      request2.metadata,
+      request2.isFromFollower,
+      request2.fetchData(topicNamesForRequest2),
+      request2.forgottenTopics(topicNamesForRequest2),
+      topicNamesForRequest2
+    )
+    assertEquals(classOf[IncrementalFetchContext], context2.getClass)
+    // We will still have the topic ID in the session if we started with topic 
IDs and currently do not have the ID anymore.
+    val partitionsInSession2 = if (startsWithTopicIdsInMetadataCache || 
endsWithTopicIdsInMetadataCache) Seq(tidp0) else Seq(nullTidp0)
+    assertPartitionsOrder(context2, partitionsInSession2)
+    val respData2 = new util.LinkedHashMap[TopicIdPartition, 
FetchResponseData.PartitionData]
+    // If we always had topic IDs in the metadata cache or newly resolved the 
ID we won't have an error.
+    // If the topic ID was once in the metadata cache and in the session, the 
partition has likely been deleted and would have an INCONSISTENT_TOPIC_ID error.
+    // Likely if the topic ID was never in the broker, return 
UNKNOWN_TOPIC_OR_PARTITION
+    val errorCode2 = if (endsWithTopicIdsInMetadataCache) Errors.NONE.code
+    else if (startsWithTopicIdsInMetadataCache) 
Errors.INCONSISTENT_TOPIC_ID.code else Errors.UNKNOWN_TOPIC_OR_PARTITION.code
+    val fooResponseTp2 = partitionsInSession2(0)
+    val fooResponse2 = if (endsWithTopicIdsInMetadataCache) noErrorResponse 
else errorResponse(errorCode2)
+    respData2.put(fooResponseTp2, fooResponse2)
+    val resp2 = context2.updateAndGenerateResponseData(respData2)

Review comment:
       Is there a way to make such a test without duplicating the newContext 
portions? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to