jolshan commented on a change in pull request #11331: URL: https://github.com/apache/kafka/pull/11331#discussion_r745941900
########## File path: core/src/test/scala/unit/kafka/server/FetchSessionTest.scala ########## @@ -659,88 +670,125 @@ class FetchSessionTest { } @Test - def testIncrementalFetchSessionWithIdsWhenSessionDoesNotUseIds() : Unit = { + def testFetchSessionWithUnknownId(): Unit = { val time = new MockTime() val cache = new FetchSessionCache(10, 1000) val fetchManager = new FetchManager(time, cache) - val topicIds = new util.HashMap[String, Uuid]() - val topicNames = new util.HashMap[Uuid, String]() + val fooId = Uuid.randomUuid() + val barId = Uuid.randomUuid() + val zarId = Uuid.randomUuid() + val topicNames = Map(fooId -> "foo", barId -> "bar").asJava + val foo0 = new TopicIdPartition(fooId, new TopicPartition("foo", 0)) + val foo1 = new TopicIdPartition(fooId, new TopicPartition("foo", 1)) + val zar0 = new TopicIdPartition(zarId, new TopicPartition("zar", 0)) + val emptyFoo0 = new TopicIdPartition(fooId, new TopicPartition(null, 0)) + val emptyFoo1 = new TopicIdPartition(fooId, new TopicPartition(null, 1)) + val emptyZar0 = new TopicIdPartition(zarId, new TopicPartition(null, 0)) - // Create a new fetch session with foo-0 + // Create a new fetch session with foo-0 and foo-1 val reqData1 = new util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData] - reqData1.put(new TopicPartition("foo", 0), new FetchRequest.PartitionData(0, 0, 100, + reqData1.put(foo0.topicPartition, new FetchRequest.PartitionData(foo0.topicId, 0, 0, 100, Optional.empty())) - val request1 = createRequestWithoutTopicIds(JFetchMetadata.INITIAL, reqData1, topicIds, EMPTY_PART_LIST, false) - // Start a fetch session using a request version that does not use topic IDs. + reqData1.put(foo1.topicPartition, new FetchRequest.PartitionData(foo1.topicId,10, 0, 100, + Optional.empty())) + reqData1.put(zar0.topicPartition, new FetchRequest.PartitionData(zar0.topicId,10, 0, 100, + Optional.empty())) + val request1 = createRequest(JFetchMetadata.INITIAL, reqData1, EMPTY_PART_LIST, false) + // Simulate unknown topic ID for foo. + val topicNamesOnlyBar = Collections.singletonMap(barId, "bar") + // We should not throw error since we have an older request version. val context1 = fetchManager.newContext( request1.version, request1.metadata, request1.isFromFollower, - request1.fetchData(topicNames), - request1.forgottenTopics(topicNames), - topicIds + request1.fetchData(topicNamesOnlyBar), + request1.forgottenTopics(topicNamesOnlyBar), + topicNamesOnlyBar ) assertEquals(classOf[FullFetchContext], context1.getClass) - val respData1 = new util.LinkedHashMap[TopicPartition, FetchResponseData.PartitionData] - respData1.put(new TopicPartition("foo", 0), new FetchResponseData.PartitionData() + assertPartitionsOrder(context1, Seq(emptyFoo0, emptyFoo1, emptyZar0)) + val respData1 = new util.LinkedHashMap[TopicIdPartition, FetchResponseData.PartitionData] + respData1.put(emptyFoo0, new FetchResponseData.PartitionData() .setPartitionIndex(0) - .setHighWatermark(100) - .setLastStableOffset(100) - .setLogStartOffset(100)) + .setErrorCode(Errors.UNKNOWN_TOPIC_ID.code)) + respData1.put(emptyFoo1, new FetchResponseData.PartitionData() + .setPartitionIndex(1) + .setErrorCode(Errors.UNKNOWN_TOPIC_ID.code)) + respData1.put(emptyZar0, new FetchResponseData.PartitionData() + .setPartitionIndex(1) + .setErrorCode(Errors.UNKNOWN_TOPIC_ID.code)) val resp1 = context1.updateAndGenerateResponseData(respData1) + // On the latest request version, we should have unknown topic ID errors. assertEquals(Errors.NONE, resp1.error()) assertTrue(resp1.sessionId() != INVALID_SESSION_ID) + assertEquals(2, resp1.responseData(topicNames, request1.version).size) + resp1.responseData(topicNames, request1.version).forEach( (_, resp) => assertEquals(Errors.UNKNOWN_TOPIC_ID.code, resp.errorCode)) - // Create an incremental fetch request as though no topics changed. However, send a v13 request. - // Also simulate the topic ID found on the server. - val fooId = Uuid.randomUuid() - topicIds.put("foo", fooId) - topicNames.put(fooId, "foo") + // Create an incremental request where we resolve the partitions val reqData2 = new util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData] - val request2 = createRequest(new JFetchMetadata(resp1.sessionId(), 1), reqData2, topicIds, EMPTY_PART_LIST, false) + val request2 = createRequest(new JFetchMetadata(resp1.sessionId(), 1), reqData2, EMPTY_PART_LIST, false) val context2 = fetchManager.newContext( request2.version, request2.metadata, request2.isFromFollower, request2.fetchData(topicNames), request2.forgottenTopics(topicNames), - topicIds + topicNames ) - - assertEquals(classOf[SessionErrorContext], context2.getClass) - val respData2 = new util.LinkedHashMap[TopicPartition, FetchResponseData.PartitionData] - assertEquals(Errors.FETCH_SESSION_TOPIC_ID_ERROR, - context2.updateAndGenerateResponseData(respData2).error()) + // Topic names in the session but not in the request are lazily resolved via foreachPartition. Resolve foo topic IDs here. + assertPartitionsOrder(context2, Seq(foo0, foo1, emptyZar0)) + assertEquals(classOf[IncrementalFetchContext], context2.getClass) Review comment: Is this different than assertPartitionsOrder(context2, Seq(foo0, foo1, emptyZar0))? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org