jolshan commented on a change in pull request #11331: URL: https://github.com/apache/kafka/pull/11331#discussion_r746836952
########## File path: core/src/test/scala/unit/kafka/server/FetchSessionTest.scala ########## @@ -877,34 +935,341 @@ class FetchSessionTest { // Create an incremental fetch request as though no topics changed. val reqData2 = new util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData] - val request2 = createRequest(new JFetchMetadata(resp1.sessionId(), 1), reqData2, topicIds, EMPTY_PART_LIST, false) + val request2 = createRequest(new JFetchMetadata(resp1.sessionId(), 1), reqData2, EMPTY_PART_LIST, false) // Simulate ID changing on server. val topicNamesFooChanged = Map(topicIds.get("bar") -> "bar", Uuid.randomUuid() -> "foo").asJava - val topicIdsFooChanged = topicNamesFooChanged.asScala.map(_.swap).asJava val context2 = fetchManager.newContext( request2.version, request2.metadata, request2.isFromFollower, request2.fetchData(topicNamesFooChanged), request2.forgottenTopics(topicNamesFooChanged), - topicIdsFooChanged + topicNamesFooChanged ) assertEquals(classOf[IncrementalFetchContext], context2.getClass) - val respData2 = new util.LinkedHashMap[TopicPartition, FetchResponseData.PartitionData] + val respData2 = new util.LinkedHashMap[TopicIdPartition, FetchResponseData.PartitionData] // Likely if the topic ID is different in the broker, it will be different in the log. Simulate the log check finding an inconsistent ID. - respData2.put(new TopicPartition("foo", 0), new FetchResponseData.PartitionData() + respData2.put(tp0, new FetchResponseData.PartitionData() .setPartitionIndex(0) .setHighWatermark(-1) .setLastStableOffset(-1) .setLogStartOffset(-1) .setErrorCode(Errors.INCONSISTENT_TOPIC_ID.code)) val resp2 = context2.updateAndGenerateResponseData(respData2) - assertEquals(Errors.INCONSISTENT_TOPIC_ID, resp2.error) + assertEquals(Errors.NONE, resp2.error) + assertTrue(resp2.sessionId > 0) + val responseData2 = resp2.responseData(topicNames, request2.version) + // We should have the inconsistent topic ID error on the partition + assertEquals(Errors.INCONSISTENT_TOPIC_ID.code, responseData2.get(tp0.topicPartition).errorCode) + } + + private def noErrorResponse: FetchResponseData.PartitionData = { + new FetchResponseData.PartitionData() + .setPartitionIndex(1) + .setHighWatermark(10) + .setLastStableOffset(10) + .setLogStartOffset(10) + } + + private def errorResponse(errorCode: Short): FetchResponseData.PartitionData = { + new FetchResponseData.PartitionData() + .setPartitionIndex(0) + .setHighWatermark(-1) + .setLastStableOffset(-1) + .setLogStartOffset(-1) + .setErrorCode(errorCode) + } + + @Test + def testResolveUnknownPartitions(): Unit = { + val time = new MockTime() + val cache = new FetchSessionCache(10, 1000) + val fetchManager = new FetchManager(time, cache) + + def newContext( + metadata: JFetchMetadata, + partitions: Seq[TopicIdPartition], + topicNames: Map[Uuid, String] // Topic ID to name mapping known by the broker. + ): FetchContext = { + val data = new util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData] + partitions.foreach { topicIdPartition => + data.put( + topicIdPartition.topicPartition, + new FetchRequest.PartitionData(topicIdPartition.topicId, 0, 0, 100, Optional.empty()) + ) + } + + val fetchRequest = createRequest(metadata, data, EMPTY_PART_LIST, false) + + fetchManager.newContext( + fetchRequest.version, + fetchRequest.metadata, + fetchRequest.isFromFollower, + fetchRequest.fetchData(topicNames.asJava), + fetchRequest.forgottenTopics(topicNames.asJava), + topicNames.asJava + ) + } + + def updateAndGenerateResponseData( + context: FetchContext + ): Int = { + val data = new util.LinkedHashMap[TopicIdPartition, FetchResponseData.PartitionData] + context.foreachPartition { (topicIdPartition, _) => + data.put( + topicIdPartition, + if (topicIdPartition.topicId == Uuid.ZERO_UUID) + errorResponse(Errors.UNKNOWN_TOPIC_ID.code) + else + noErrorResponse + ) + } + context.updateAndGenerateResponseData(data).sessionId + } + + val foo = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)) + val bar = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("bar", 0)) + val zar = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("zar", 0)) + + val fooUnresolved = new TopicIdPartition(foo.topicId, new TopicPartition(null, foo.partition)) + val barUnresolved = new TopicIdPartition(bar.topicId, new TopicPartition(null, bar.partition)) + val zarUnresolved = new TopicIdPartition(zar.topicId, new TopicPartition(null, zar.partition)) + + // The metadata cache does not know about the topic. + val context1 = newContext( + JFetchMetadata.INITIAL, + Seq(foo, bar, zar), + Map.empty[Uuid, String] + ) + + // So the context contains unresolved partitions. + assertEquals(classOf[FullFetchContext], context1.getClass) + assertPartitionsOrder(context1, Seq(fooUnresolved, barUnresolved, zarUnresolved)) + + // The response is sent back to create the session. + val sessionId = updateAndGenerateResponseData(context1) + + // The metadata cache only knows about foo. + val context2 = newContext( + new JFetchMetadata(sessionId, 1), + Seq.empty, + Map(foo.topicId -> foo.topic) + ) + + // So foo is resolved but not the others. + assertEquals(classOf[IncrementalFetchContext], context2.getClass) + assertPartitionsOrder(context2, Seq(foo, barUnresolved, zarUnresolved)) + + updateAndGenerateResponseData(context2) + + // The metadata cache knows about foo and bar. + val context3 = newContext( + new JFetchMetadata(sessionId, 2), + Seq(bar), + Map(foo.topicId -> foo.topic, bar.topicId -> bar.topic) + ) + + // So foo and bar are resolved. + assertEquals(classOf[IncrementalFetchContext], context3.getClass) + assertPartitionsOrder(context3, Seq(foo, bar, zarUnresolved)) + + updateAndGenerateResponseData(context3) + + // The metadata cache knows about all topics. + val context4 = newContext( + new JFetchMetadata(sessionId, 3), + Seq.empty, + Map(foo.topicId -> foo.topic, bar.topicId -> bar.topic, zar.topicId -> zar.topic) + ) + + // So all topics are resolved. + assertEquals(classOf[IncrementalFetchContext], context4.getClass) + assertPartitionsOrder(context4, Seq(foo, bar, zar)) + + updateAndGenerateResponseData(context4) + + // The metadata cache does not know about the topics anymore (e.g. deleted). + val context5 = newContext( + new JFetchMetadata(sessionId, 4), + Seq.empty, + Map.empty + ) + + // All topics remain resolved. + assertEquals(classOf[IncrementalFetchContext], context5.getClass) + assertPartitionsOrder(context4, Seq(foo, bar, zar)) + } + + // This test either simulates an update to a partition using all possible topic ID usage combinations. + // The possible change will be found in an update from the partition. + @ParameterizedTest + @MethodSource(Array("idUsageCombinations")) + def testUpdatedPartitionResolvesId(startsWithTopicIdsInMetadataCache: Boolean, endsWithTopicIdsInMetadataCache: Boolean): Unit = { + // TODO: make cleaner + val time = new MockTime() + val cache = new FetchSessionCache(10, 1000) + val fetchManager = new FetchManager(time, cache) + val fooId = Uuid.randomUuid() + val topicNames = Map(fooId -> "foo").asJava + val tp0 = new TopicPartition("foo", 0) + val tidp0 = new TopicIdPartition(fooId, tp0) + val nullTidp0 = new TopicIdPartition(fooId, new TopicPartition(null, tp0.partition)) + + // Create a new fetch session with foo-0 + val reqData1 = new util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData] + reqData1.put(tp0, new FetchRequest.PartitionData(fooId, 0, 0, 100, + Optional.empty())) + val request1 = createRequest(JFetchMetadata.INITIAL, reqData1, EMPTY_PART_LIST, false) + val topicNamesForRequest1 = if (startsWithTopicIdsInMetadataCache) topicNames else Map[Uuid, String]().asJava + // Start a fetch session. Simulate no error or unknown topic ID depending on whether we used topic IDs. + val context1 = fetchManager.newContext( + request1.version, + request1.metadata, + request1.isFromFollower, + request1.fetchData(topicNamesForRequest1), + request1.forgottenTopics(topicNamesForRequest1), + topicNamesForRequest1 + ) + assertEquals(classOf[FullFetchContext], context1.getClass) + val partitionsInSession1 = if (startsWithTopicIdsInMetadataCache) Seq(tidp0) else Seq(nullTidp0) + assertPartitionsOrder(context1, partitionsInSession1) + val respData1 = new util.LinkedHashMap[TopicIdPartition, FetchResponseData.PartitionData] + val errorCode1 = if (startsWithTopicIdsInMetadataCache) Errors.NONE.code else Errors.UNKNOWN_TOPIC_ID.code + val fooResponseTp1 = if (startsWithTopicIdsInMetadataCache) tidp0 else nullTidp0 + val fooResponse1 = if (startsWithTopicIdsInMetadataCache) noErrorResponse else errorResponse(errorCode1) + respData1.put(fooResponseTp1, fooResponse1) + val resp1 = context1.updateAndGenerateResponseData(respData1) + assertEquals(Errors.NONE, resp1.error()) + assertTrue(resp1.sessionId() != INVALID_SESSION_ID) + val responseData1 = resp1.responseData(topicNames, request1.version) + assertEquals(errorCode1, responseData1.get(tp0).errorCode) + + // Create an incremental fetch request with an update to the partition. + val reqData2 = new util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData] + reqData2.put(tp0, new FetchRequest.PartitionData(fooId, 0, 0, 100, + Optional.empty())) + val request2 = createRequest(new JFetchMetadata(resp1.sessionId(), 1), reqData2, EMPTY_PART_LIST, false) + val topicNamesForRequest2 = if (endsWithTopicIdsInMetadataCache) Map(fooId -> "foo").asJava else Map[Uuid, String]().asJava + val context2 = fetchManager.newContext( + request2.version, + request2.metadata, + request2.isFromFollower, + request2.fetchData(topicNamesForRequest2), + request2.forgottenTopics(topicNamesForRequest2), + topicNamesForRequest2 + ) + assertEquals(classOf[IncrementalFetchContext], context2.getClass) + // We will still have the topic ID in the session if we started with topic IDs and currently do not have the ID anymore. + val partitionsInSession2 = if (startsWithTopicIdsInMetadataCache || endsWithTopicIdsInMetadataCache) Seq(tidp0) else Seq(nullTidp0) + assertPartitionsOrder(context2, partitionsInSession2) + val respData2 = new util.LinkedHashMap[TopicIdPartition, FetchResponseData.PartitionData] + // If we always had topic IDs in the metadata cache or newly resolved the ID we won't have an error. + // If the topic ID was once in the metadata cache and in the session, the partition has likely been deleted and would have an INCONSISTENT_TOPIC_ID error. + // Likely if the topic ID was never in the broker, return UNKNOWN_TOPIC_OR_PARTITION + val errorCode2 = if (endsWithTopicIdsInMetadataCache) Errors.NONE.code + else if (startsWithTopicIdsInMetadataCache) Errors.INCONSISTENT_TOPIC_ID.code else Errors.UNKNOWN_TOPIC_OR_PARTITION.code + val fooResponseTp2 = partitionsInSession2(0) + val fooResponse2 = if (endsWithTopicIdsInMetadataCache) noErrorResponse else errorResponse(errorCode2) + respData2.put(fooResponseTp2, fooResponse2) + val resp2 = context2.updateAndGenerateResponseData(respData2) Review comment: Is there a way to make such a test without duplicating the newContext portions? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org