jolshan commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r745941900



##########
File path: core/src/test/scala/unit/kafka/server/FetchSessionTest.scala
##########
@@ -659,88 +670,125 @@ class FetchSessionTest {
   }
 
   @Test
-  def testIncrementalFetchSessionWithIdsWhenSessionDoesNotUseIds() : Unit = {
+  def testFetchSessionWithUnknownId(): Unit = {
     val time = new MockTime()
     val cache = new FetchSessionCache(10, 1000)
     val fetchManager = new FetchManager(time, cache)
-    val topicIds = new util.HashMap[String, Uuid]()
-    val topicNames = new util.HashMap[Uuid, String]()
+    val fooId = Uuid.randomUuid()
+    val barId = Uuid.randomUuid()
+    val zarId = Uuid.randomUuid()
+    val topicNames = Map(fooId -> "foo", barId -> "bar").asJava
+    val foo0 = new TopicIdPartition(fooId, new TopicPartition("foo", 0))
+    val foo1 = new TopicIdPartition(fooId, new TopicPartition("foo", 1))
+    val zar0 = new TopicIdPartition(zarId, new TopicPartition("zar", 0))
+    val emptyFoo0 = new TopicIdPartition(fooId, new TopicPartition(null, 0))
+    val emptyFoo1 = new TopicIdPartition(fooId, new TopicPartition(null, 1))
+    val emptyZar0 = new TopicIdPartition(zarId, new TopicPartition(null, 0))
 
-    // Create a new fetch session with foo-0
+    // Create a new fetch session with foo-0 and foo-1
     val reqData1 = new util.LinkedHashMap[TopicPartition, 
FetchRequest.PartitionData]
-    reqData1.put(new TopicPartition("foo", 0), new 
FetchRequest.PartitionData(0, 0, 100,
+    reqData1.put(foo0.topicPartition, new 
FetchRequest.PartitionData(foo0.topicId, 0, 0, 100,
       Optional.empty()))
-    val request1 = createRequestWithoutTopicIds(JFetchMetadata.INITIAL, 
reqData1, topicIds, EMPTY_PART_LIST, false)
-    // Start a fetch session using a request version that does not use topic 
IDs.
+    reqData1.put(foo1.topicPartition, new 
FetchRequest.PartitionData(foo1.topicId,10, 0, 100,
+      Optional.empty()))
+    reqData1.put(zar0.topicPartition, new 
FetchRequest.PartitionData(zar0.topicId,10, 0, 100,
+      Optional.empty()))
+    val request1 = createRequest(JFetchMetadata.INITIAL, reqData1, 
EMPTY_PART_LIST, false)
+    // Simulate unknown topic ID for foo.
+    val topicNamesOnlyBar = Collections.singletonMap(barId, "bar")
+    // We should not throw error since we have an older request version.
     val context1 = fetchManager.newContext(
       request1.version,
       request1.metadata,
       request1.isFromFollower,
-      request1.fetchData(topicNames),
-      request1.forgottenTopics(topicNames),
-      topicIds
+      request1.fetchData(topicNamesOnlyBar),
+      request1.forgottenTopics(topicNamesOnlyBar),
+      topicNamesOnlyBar
     )
     assertEquals(classOf[FullFetchContext], context1.getClass)
-    val respData1 = new util.LinkedHashMap[TopicPartition, 
FetchResponseData.PartitionData]
-    respData1.put(new TopicPartition("foo", 0), new 
FetchResponseData.PartitionData()
+    assertPartitionsOrder(context1, Seq(emptyFoo0, emptyFoo1, emptyZar0))
+    val respData1 = new util.LinkedHashMap[TopicIdPartition, 
FetchResponseData.PartitionData]
+    respData1.put(emptyFoo0, new FetchResponseData.PartitionData()
       .setPartitionIndex(0)
-      .setHighWatermark(100)
-      .setLastStableOffset(100)
-      .setLogStartOffset(100))
+      .setErrorCode(Errors.UNKNOWN_TOPIC_ID.code))
+    respData1.put(emptyFoo1, new FetchResponseData.PartitionData()
+      .setPartitionIndex(1)
+      .setErrorCode(Errors.UNKNOWN_TOPIC_ID.code))
+    respData1.put(emptyZar0, new FetchResponseData.PartitionData()
+      .setPartitionIndex(1)
+      .setErrorCode(Errors.UNKNOWN_TOPIC_ID.code))
     val resp1 = context1.updateAndGenerateResponseData(respData1)
+    // On the latest request version, we should have unknown topic ID errors.
     assertEquals(Errors.NONE, resp1.error())
     assertTrue(resp1.sessionId() != INVALID_SESSION_ID)
+    assertEquals(2, resp1.responseData(topicNames, request1.version).size)
+    resp1.responseData(topicNames, request1.version).forEach( (_, resp) => 
assertEquals(Errors.UNKNOWN_TOPIC_ID.code, resp.errorCode))
 
-    // Create an incremental fetch request as though no topics changed. 
However, send a v13 request.
-    // Also simulate the topic ID found on the server.
-    val fooId = Uuid.randomUuid()
-    topicIds.put("foo", fooId)
-    topicNames.put(fooId, "foo")
+    // Create an incremental request where we resolve the partitions
     val reqData2 = new util.LinkedHashMap[TopicPartition, 
FetchRequest.PartitionData]
-    val request2 = createRequest(new JFetchMetadata(resp1.sessionId(), 1), 
reqData2, topicIds, EMPTY_PART_LIST, false)
+    val request2 = createRequest(new JFetchMetadata(resp1.sessionId(), 1), 
reqData2, EMPTY_PART_LIST, false)
     val context2 = fetchManager.newContext(
       request2.version,
       request2.metadata,
       request2.isFromFollower,
       request2.fetchData(topicNames),
       request2.forgottenTopics(topicNames),
-      topicIds
+      topicNames
     )
-
-    assertEquals(classOf[SessionErrorContext], context2.getClass)
-    val respData2 = new util.LinkedHashMap[TopicPartition, 
FetchResponseData.PartitionData]
-    assertEquals(Errors.FETCH_SESSION_TOPIC_ID_ERROR,
-      context2.updateAndGenerateResponseData(respData2).error())
+    // Topic names in the session but not in the request are lazily resolved 
via foreachPartition. Resolve foo topic IDs here.
+    assertPartitionsOrder(context2, Seq(foo0, foo1, emptyZar0))
+    assertEquals(classOf[IncrementalFetchContext], context2.getClass)

Review comment:
       Is this different than assertPartitionsOrder(context2, Seq(foo0, foo1, 
emptyZar0))?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to