dajac commented on a change in pull request #11331: URL: https://github.com/apache/kafka/pull/11331#discussion_r742933244
########## File path: clients/src/test/java/org/apache/kafka/clients/FetchSessionHandlerTest.java ########## @@ -428,82 +442,165 @@ public void testIdUsageRevokedOnIdDowngrade() { List<Integer> partitions = Arrays.asList(0, 1); partitions.forEach(partition -> { String testType = partition == 0 ? "updating a partition" : "adding a new partition"; - Map<String, Uuid> topicIds = Collections.singletonMap("foo", Uuid.randomUuid()); + Uuid fooId = Uuid.randomUuid(); FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 1); FetchSessionHandler.Builder builder = handler.newBuilder(); - builder.add(new TopicPartition("foo", 0), topicIds.get("foo"), - new FetchRequest.PartitionData(0, 100, 200, Optional.empty())); + builder.add(new TopicPartition("foo", 0), + new FetchRequest.PartitionData(fooId, 0, 100, 200, Optional.empty())); FetchSessionHandler.FetchRequestData data = builder.build(); - assertMapsEqual(reqMap(new ReqEntry("foo", 0, 0, 100, 200)), + assertMapsEqual(reqMap(new ReqEntry("foo", fooId, 0, 0, 100, 200)), data.toSend(), data.sessionPartitions()); assertTrue(data.metadata().isFull()); assertTrue(data.canUseTopicIds()); FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123, - respMap(new RespEntry("foo", 0, 10, 20)), topicIds); + respMap(new RespEntry("foo", 0, fooId, 10, 20))); handler.handleResponse(resp, ApiKeys.FETCH.latestVersion()); // Try to remove a topic ID from an existing topic partition (0) or add a new topic partition (1) without an ID. FetchSessionHandler.Builder builder2 = handler.newBuilder(); - builder2.add(new TopicPartition("foo", partition), Uuid.ZERO_UUID, - new FetchRequest.PartitionData(10, 110, 210, Optional.empty())); + builder2.add(new TopicPartition("foo", partition), + new FetchRequest.PartitionData(Uuid.ZERO_UUID, 10, 110, 210, Optional.empty())); FetchSessionHandler.FetchRequestData data2 = builder2.build(); - // Should have the same session ID and next epoch, but can no longer use topic IDs. - // The receiving broker will close the session if we were previously using topic IDs. + // Should have the same session ID, and next epoch and can no longer use topic IDs. + // The receiving broker will handle closing the session. assertEquals(123, data2.metadata().sessionId(), "Did not use same session when " + testType); assertEquals(1, data2.metadata().epoch(), "Did not have correct epoch when " + testType); assertFalse(data2.canUseTopicIds()); }); } + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testTopicIdReplaced(boolean fetchRequestUsesIds) { + TopicPartition tp = new TopicPartition("foo", 0); + FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 1); + FetchSessionHandler.Builder builder = handler.newBuilder(); + Uuid topicId1 = Uuid.randomUuid(); + builder.add(tp, + new FetchRequest.PartitionData(topicId1, 0, 100, 200, Optional.empty())); + FetchSessionHandler.FetchRequestData data = builder.build(); + assertMapsEqual(reqMap(new ReqEntry("foo", topicId1, 0, 0, 100, 200)), + data.toSend(), data.sessionPartitions()); + assertTrue(data.metadata().isFull()); + assertTrue(data.canUseTopicIds()); + + FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123, + respMap(new RespEntry("foo", 0, Uuid.ZERO_UUID, 10, 20))); + handler.handleResponse(resp, (short) 12); + + // Try to add a new topic ID. + FetchSessionHandler.Builder builder2 = handler.newBuilder(); + Uuid topicId2 = Uuid.randomUuid(); + // Use the same data besides the topic ID. + FetchRequest.PartitionData partitionData = new FetchRequest.PartitionData(topicId2, 0, 100, 200, Optional.empty()); + builder2.add(tp, partitionData); + FetchSessionHandler.FetchRequestData data2 = builder2.build(); + // The old topic ID partition should be in toReplace, and the new one should be in toSend. + assertMapsEqual(reqMap(new ReqEntry("foo", topicId2, 0, 0, 100, 200)), + data2.toSend(), data2.sessionPartitions()); + assertEquals(Collections.singletonList(new TopicIdPartition(topicId1, tp)), data2.toReplace()); + // Should have the same session ID, and next epoch and can use topic IDs. + assertEquals(123, data2.metadata().sessionId(), "Did not use same session"); + assertEquals(1, data2.metadata().epoch(), "Did not have correct epoch"); + assertTrue(data2.canUseTopicIds()); + + short version = fetchRequestUsesIds ? ApiKeys.FETCH.latestVersion() : 12; + FetchRequest fetchRequest = FetchRequest.Builder + .forReplica(version, 0, 1, 1, data2.toSend()) + .removed(data2.toForget()) + .replaced(data2.toReplace()) + .metadata(data2.metadata()).build(version); + + assertEquals(fetchRequestUsesIds, fetchRequest.data().forgottenTopicsData().size() > 0); + assertEquals(1, fetchRequest.data().topics().size()); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testSessionEpochWhenMixedUsageOfTopicIDs(boolean startsWithTopicIds) { + Uuid fooId = startsWithTopicIds ? Uuid.randomUuid() : Uuid.ZERO_UUID; + Uuid barId = startsWithTopicIds ? Uuid.ZERO_UUID : Uuid.randomUuid(); + short responseVersion = startsWithTopicIds ? ApiKeys.FETCH.latestVersion() : 12; + + TopicPartition tp0 = new TopicPartition("foo", 0); + TopicPartition tp1 = new TopicPartition("bar", 1); + + FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 1); + FetchSessionHandler.Builder builder = handler.newBuilder(); + builder.add(tp0, + new FetchRequest.PartitionData(fooId, 0, 100, 200, Optional.empty())); + FetchSessionHandler.FetchRequestData data = builder.build(); + assertMapsEqual(reqMap(new ReqEntry("foo", fooId, 0, 0, 100, 200)), + data.toSend(), data.sessionPartitions()); + assertTrue(data.metadata().isFull()); + assertEquals(startsWithTopicIds, data.canUseTopicIds()); + + FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123, + respMap(new RespEntry("foo", 0, fooId, 10, 20))); + handler.handleResponse(resp, responseVersion); + + // Re-add the first partition. Then add a partition with opposite ID usage. + FetchSessionHandler.Builder builder2 = handler.newBuilder(); + builder2.add(tp0, + new FetchRequest.PartitionData(fooId, 10, 110, 210, Optional.empty())); + builder2.add(tp1, + new FetchRequest.PartitionData(barId, 0, 100, 200, Optional.empty())); + FetchSessionHandler.FetchRequestData data2 = builder2.build(); + // Should have the same session ID, and the next epoch and can not use topic IDs. + // The receiving broker will handle closing the session. + assertEquals(123, data2.metadata().sessionId(), "Did not use same session"); + assertEquals(1, data2.metadata().epoch(), "Did not have final epoch"); + assertFalse(data2.canUseTopicIds()); + } + + @ParameterizedTest @ValueSource(booleans = {true, false}) public void testIdUsageWithAllForgottenPartitions(boolean useTopicIds) { // We want to test when all topics are removed from the session Uuid topicId = useTopicIds ? Uuid.randomUuid() : Uuid.ZERO_UUID; - Short responseVersion = useTopicIds ? ApiKeys.FETCH.latestVersion() : 12; - Map<String, Uuid> topicIds = Collections.singletonMap("foo", topicId); + short responseVersion = useTopicIds ? ApiKeys.FETCH.latestVersion() : 12; FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 1); // Add topic foo to the session FetchSessionHandler.Builder builder = handler.newBuilder(); - builder.add(new TopicPartition("foo", 0), topicIds.get("foo"), - new FetchRequest.PartitionData(0, 100, 200, Optional.empty())); + builder.add(new TopicPartition("foo", 0), + new FetchRequest.PartitionData(topicId, 0, 100, 200, Optional.empty())); FetchSessionHandler.FetchRequestData data = builder.build(); - assertMapsEqual(reqMap(new ReqEntry("foo", 0, 0, 100, 200)), + assertMapsEqual(reqMap(new ReqEntry("foo", topicId, 0, 0, 100, 200)), data.toSend(), data.sessionPartitions()); assertTrue(data.metadata().isFull()); assertEquals(useTopicIds, data.canUseTopicIds()); FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123, - respMap(new RespEntry("foo", 0, 10, 20)), topicIds); - handler.handleResponse(resp, responseVersion.shortValue()); + respMap(new RespEntry("foo", 0, topicId, 10, 20))); + handler.handleResponse(resp, responseVersion); // Remove the topic from the session FetchSessionHandler.Builder builder2 = handler.newBuilder(); FetchSessionHandler.FetchRequestData data2 = builder2.build(); - // Should have the same session ID and next epoch, but can no longer use topic IDs. - // The receiving broker will close the session if we were previously using topic IDs. + // Should have the same session ID, next epoch, and same ID usage. assertEquals(123, data2.metadata().sessionId(), "Did not use same session when useTopicIds was " + useTopicIds); Review comment: It is curious that we don't assert the forgotten partitions here. Is there a reason? ########## File path: clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java ########## @@ -212,6 +217,9 @@ private void assignFromUser(Set<TopicPartition> partitions) { metadata.updateWithCurrentRequestVersion(RequestTestUtils.metadataUpdateWithIds("dummy", 1, Collections.emptyMap(), singletonMap(topicName, 4), tp -> validLeaderEpoch, topicIds), false, 0L); + + metadata.fetch().nodes().forEach(node -> + apiVersions.update(node.idString(), NodeApiVersions.create())); Review comment: Do we still need this change? ########## File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala ########## @@ -794,22 +793,23 @@ class ReplicaManagerTest { // We receive one valid request from the follower and replica state is updated var successfulFetch: Option[FetchPartitionData] = None - def callback(response: Seq[(TopicPartition, FetchPartitionData)]): Unit = { - successfulFetch = response.headOption.filter { case (topicPartition, _) => topicPartition == tp }.map { case (_, data) => data } + def callback(response: Seq[(TopicIdPartition, FetchPartitionData)]): Unit = { + // Check the topic partition only since we are reusing this callback on different TopicIdPartitions. + successfulFetch = response.headOption.filter { case (topicIdPartition, _) => topicIdPartition.topicPartition == tidp.topicPartition }.map { case (_, data) => data } Review comment: This is not ideal. Could we validate that the topic id is correct as well? ########## File path: clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java ########## @@ -2659,6 +2661,9 @@ private FetchResponse fetchResponse(TopicPartition partition, long fetchOffset, autoCommitIntervalMs, interceptors, throwOnStableOffsetNotSupported); + ApiVersions apiVersions = new ApiVersions(); + metadata.fetch().nodes().forEach(node -> + apiVersions.update(node.idString(), NodeApiVersions.create())); Review comment: Do we still need this change? ########## File path: clients/src/test/java/org/apache/kafka/clients/FetchSessionHandlerTest.java ########## @@ -428,82 +442,165 @@ public void testIdUsageRevokedOnIdDowngrade() { List<Integer> partitions = Arrays.asList(0, 1); partitions.forEach(partition -> { String testType = partition == 0 ? "updating a partition" : "adding a new partition"; - Map<String, Uuid> topicIds = Collections.singletonMap("foo", Uuid.randomUuid()); + Uuid fooId = Uuid.randomUuid(); FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 1); FetchSessionHandler.Builder builder = handler.newBuilder(); - builder.add(new TopicPartition("foo", 0), topicIds.get("foo"), - new FetchRequest.PartitionData(0, 100, 200, Optional.empty())); + builder.add(new TopicPartition("foo", 0), + new FetchRequest.PartitionData(fooId, 0, 100, 200, Optional.empty())); FetchSessionHandler.FetchRequestData data = builder.build(); - assertMapsEqual(reqMap(new ReqEntry("foo", 0, 0, 100, 200)), + assertMapsEqual(reqMap(new ReqEntry("foo", fooId, 0, 0, 100, 200)), data.toSend(), data.sessionPartitions()); assertTrue(data.metadata().isFull()); assertTrue(data.canUseTopicIds()); FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123, - respMap(new RespEntry("foo", 0, 10, 20)), topicIds); + respMap(new RespEntry("foo", 0, fooId, 10, 20))); handler.handleResponse(resp, ApiKeys.FETCH.latestVersion()); // Try to remove a topic ID from an existing topic partition (0) or add a new topic partition (1) without an ID. FetchSessionHandler.Builder builder2 = handler.newBuilder(); - builder2.add(new TopicPartition("foo", partition), Uuid.ZERO_UUID, - new FetchRequest.PartitionData(10, 110, 210, Optional.empty())); + builder2.add(new TopicPartition("foo", partition), + new FetchRequest.PartitionData(Uuid.ZERO_UUID, 10, 110, 210, Optional.empty())); FetchSessionHandler.FetchRequestData data2 = builder2.build(); - // Should have the same session ID and next epoch, but can no longer use topic IDs. - // The receiving broker will close the session if we were previously using topic IDs. + // Should have the same session ID, and next epoch and can no longer use topic IDs. + // The receiving broker will handle closing the session. assertEquals(123, data2.metadata().sessionId(), "Did not use same session when " + testType); assertEquals(1, data2.metadata().epoch(), "Did not have correct epoch when " + testType); assertFalse(data2.canUseTopicIds()); }); } + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testTopicIdReplaced(boolean fetchRequestUsesIds) { + TopicPartition tp = new TopicPartition("foo", 0); + FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 1); + FetchSessionHandler.Builder builder = handler.newBuilder(); + Uuid topicId1 = Uuid.randomUuid(); + builder.add(tp, + new FetchRequest.PartitionData(topicId1, 0, 100, 200, Optional.empty())); + FetchSessionHandler.FetchRequestData data = builder.build(); + assertMapsEqual(reqMap(new ReqEntry("foo", topicId1, 0, 0, 100, 200)), + data.toSend(), data.sessionPartitions()); + assertTrue(data.metadata().isFull()); + assertTrue(data.canUseTopicIds()); + + FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123, + respMap(new RespEntry("foo", 0, Uuid.ZERO_UUID, 10, 20))); + handler.handleResponse(resp, (short) 12); Review comment: Why do we use 12 here? ########## File path: clients/src/test/java/org/apache/kafka/clients/FetchSessionHandlerTest.java ########## @@ -202,29 +203,30 @@ public void testSessionless() { FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 1); FetchSessionHandler.Builder builder = handler.newBuilder(); addTopicId(topicIds, topicNames, "foo", version); - builder.add(new TopicPartition("foo", 0), topicIds.getOrDefault("foo", Uuid.ZERO_UUID), - new FetchRequest.PartitionData(0, 100, 200, Optional.empty())); - builder.add(new TopicPartition("foo", 1), topicIds.getOrDefault("foo", Uuid.ZERO_UUID), - new FetchRequest.PartitionData(10, 110, 210, Optional.empty())); + Uuid fooId = topicIds.getOrDefault("foo", Uuid.ZERO_UUID); + builder.add(new TopicPartition("foo", 0), + new FetchRequest.PartitionData(fooId, 0, 100, 200, Optional.empty())); Review comment: nit: Is it worth bringing back this line on the previous one as there is space now? It might be too long though. ########## File path: clients/src/test/java/org/apache/kafka/clients/FetchSessionHandlerTest.java ########## @@ -428,82 +442,165 @@ public void testIdUsageRevokedOnIdDowngrade() { List<Integer> partitions = Arrays.asList(0, 1); partitions.forEach(partition -> { String testType = partition == 0 ? "updating a partition" : "adding a new partition"; - Map<String, Uuid> topicIds = Collections.singletonMap("foo", Uuid.randomUuid()); + Uuid fooId = Uuid.randomUuid(); FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 1); FetchSessionHandler.Builder builder = handler.newBuilder(); - builder.add(new TopicPartition("foo", 0), topicIds.get("foo"), - new FetchRequest.PartitionData(0, 100, 200, Optional.empty())); + builder.add(new TopicPartition("foo", 0), + new FetchRequest.PartitionData(fooId, 0, 100, 200, Optional.empty())); FetchSessionHandler.FetchRequestData data = builder.build(); - assertMapsEqual(reqMap(new ReqEntry("foo", 0, 0, 100, 200)), + assertMapsEqual(reqMap(new ReqEntry("foo", fooId, 0, 0, 100, 200)), data.toSend(), data.sessionPartitions()); assertTrue(data.metadata().isFull()); assertTrue(data.canUseTopicIds()); FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123, - respMap(new RespEntry("foo", 0, 10, 20)), topicIds); + respMap(new RespEntry("foo", 0, fooId, 10, 20))); handler.handleResponse(resp, ApiKeys.FETCH.latestVersion()); // Try to remove a topic ID from an existing topic partition (0) or add a new topic partition (1) without an ID. FetchSessionHandler.Builder builder2 = handler.newBuilder(); - builder2.add(new TopicPartition("foo", partition), Uuid.ZERO_UUID, - new FetchRequest.PartitionData(10, 110, 210, Optional.empty())); + builder2.add(new TopicPartition("foo", partition), + new FetchRequest.PartitionData(Uuid.ZERO_UUID, 10, 110, 210, Optional.empty())); FetchSessionHandler.FetchRequestData data2 = builder2.build(); - // Should have the same session ID and next epoch, but can no longer use topic IDs. - // The receiving broker will close the session if we were previously using topic IDs. + // Should have the same session ID, and next epoch and can no longer use topic IDs. + // The receiving broker will handle closing the session. assertEquals(123, data2.metadata().sessionId(), "Did not use same session when " + testType); assertEquals(1, data2.metadata().epoch(), "Did not have correct epoch when " + testType); assertFalse(data2.canUseTopicIds()); }); } + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testTopicIdReplaced(boolean fetchRequestUsesIds) { + TopicPartition tp = new TopicPartition("foo", 0); + FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 1); + FetchSessionHandler.Builder builder = handler.newBuilder(); + Uuid topicId1 = Uuid.randomUuid(); + builder.add(tp, + new FetchRequest.PartitionData(topicId1, 0, 100, 200, Optional.empty())); + FetchSessionHandler.FetchRequestData data = builder.build(); + assertMapsEqual(reqMap(new ReqEntry("foo", topicId1, 0, 0, 100, 200)), + data.toSend(), data.sessionPartitions()); + assertTrue(data.metadata().isFull()); + assertTrue(data.canUseTopicIds()); + + FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123, + respMap(new RespEntry("foo", 0, Uuid.ZERO_UUID, 10, 20))); + handler.handleResponse(resp, (short) 12); + + // Try to add a new topic ID. + FetchSessionHandler.Builder builder2 = handler.newBuilder(); + Uuid topicId2 = Uuid.randomUuid(); + // Use the same data besides the topic ID. + FetchRequest.PartitionData partitionData = new FetchRequest.PartitionData(topicId2, 0, 100, 200, Optional.empty()); + builder2.add(tp, partitionData); + FetchSessionHandler.FetchRequestData data2 = builder2.build(); + // The old topic ID partition should be in toReplace, and the new one should be in toSend. + assertMapsEqual(reqMap(new ReqEntry("foo", topicId2, 0, 0, 100, 200)), + data2.toSend(), data2.sessionPartitions()); + assertEquals(Collections.singletonList(new TopicIdPartition(topicId1, tp)), data2.toReplace()); + // Should have the same session ID, and next epoch and can use topic IDs. + assertEquals(123, data2.metadata().sessionId(), "Did not use same session"); + assertEquals(1, data2.metadata().epoch(), "Did not have correct epoch"); + assertTrue(data2.canUseTopicIds()); + + short version = fetchRequestUsesIds ? ApiKeys.FETCH.latestVersion() : 12; Review comment: Would it be more appropriate to move the above assertions to `FetchRequestTest`? ########## File path: clients/src/test/java/org/apache/kafka/clients/FetchSessionHandlerTest.java ########## @@ -428,82 +442,165 @@ public void testIdUsageRevokedOnIdDowngrade() { List<Integer> partitions = Arrays.asList(0, 1); partitions.forEach(partition -> { String testType = partition == 0 ? "updating a partition" : "adding a new partition"; - Map<String, Uuid> topicIds = Collections.singletonMap("foo", Uuid.randomUuid()); + Uuid fooId = Uuid.randomUuid(); FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 1); FetchSessionHandler.Builder builder = handler.newBuilder(); - builder.add(new TopicPartition("foo", 0), topicIds.get("foo"), - new FetchRequest.PartitionData(0, 100, 200, Optional.empty())); + builder.add(new TopicPartition("foo", 0), + new FetchRequest.PartitionData(fooId, 0, 100, 200, Optional.empty())); FetchSessionHandler.FetchRequestData data = builder.build(); - assertMapsEqual(reqMap(new ReqEntry("foo", 0, 0, 100, 200)), + assertMapsEqual(reqMap(new ReqEntry("foo", fooId, 0, 0, 100, 200)), data.toSend(), data.sessionPartitions()); assertTrue(data.metadata().isFull()); assertTrue(data.canUseTopicIds()); FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123, - respMap(new RespEntry("foo", 0, 10, 20)), topicIds); + respMap(new RespEntry("foo", 0, fooId, 10, 20))); handler.handleResponse(resp, ApiKeys.FETCH.latestVersion()); // Try to remove a topic ID from an existing topic partition (0) or add a new topic partition (1) without an ID. FetchSessionHandler.Builder builder2 = handler.newBuilder(); - builder2.add(new TopicPartition("foo", partition), Uuid.ZERO_UUID, - new FetchRequest.PartitionData(10, 110, 210, Optional.empty())); + builder2.add(new TopicPartition("foo", partition), + new FetchRequest.PartitionData(Uuid.ZERO_UUID, 10, 110, 210, Optional.empty())); FetchSessionHandler.FetchRequestData data2 = builder2.build(); - // Should have the same session ID and next epoch, but can no longer use topic IDs. - // The receiving broker will close the session if we were previously using topic IDs. + // Should have the same session ID, and next epoch and can no longer use topic IDs. + // The receiving broker will handle closing the session. assertEquals(123, data2.metadata().sessionId(), "Did not use same session when " + testType); assertEquals(1, data2.metadata().epoch(), "Did not have correct epoch when " + testType); assertFalse(data2.canUseTopicIds()); }); } + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testTopicIdReplaced(boolean fetchRequestUsesIds) { + TopicPartition tp = new TopicPartition("foo", 0); + FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 1); + FetchSessionHandler.Builder builder = handler.newBuilder(); + Uuid topicId1 = Uuid.randomUuid(); + builder.add(tp, + new FetchRequest.PartitionData(topicId1, 0, 100, 200, Optional.empty())); + FetchSessionHandler.FetchRequestData data = builder.build(); + assertMapsEqual(reqMap(new ReqEntry("foo", topicId1, 0, 0, 100, 200)), + data.toSend(), data.sessionPartitions()); + assertTrue(data.metadata().isFull()); + assertTrue(data.canUseTopicIds()); + + FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123, + respMap(new RespEntry("foo", 0, Uuid.ZERO_UUID, 10, 20))); + handler.handleResponse(resp, (short) 12); + + // Try to add a new topic ID. + FetchSessionHandler.Builder builder2 = handler.newBuilder(); + Uuid topicId2 = Uuid.randomUuid(); + // Use the same data besides the topic ID. + FetchRequest.PartitionData partitionData = new FetchRequest.PartitionData(topicId2, 0, 100, 200, Optional.empty()); + builder2.add(tp, partitionData); + FetchSessionHandler.FetchRequestData data2 = builder2.build(); + // The old topic ID partition should be in toReplace, and the new one should be in toSend. + assertMapsEqual(reqMap(new ReqEntry("foo", topicId2, 0, 0, 100, 200)), + data2.toSend(), data2.sessionPartitions()); + assertEquals(Collections.singletonList(new TopicIdPartition(topicId1, tp)), data2.toReplace()); Review comment: Should we also test when the current topic-partition in the session does not have a topic id? In this case, it should not be added to the `toReplace` set. ########## File path: clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java ########## @@ -2069,10 +2071,10 @@ public void testReturnRecordsDuringRebalance() throws InterruptedException { ConsumerMetadata metadata = createMetadata(subscription); MockClient client = new MockClient(time, metadata); ConsumerPartitionAssignor assignor = new CooperativeStickyAssignor(); - KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId); - initMetadata(client, Utils.mkMap(Utils.mkEntry(topic, 1), Utils.mkEntry(topic2, 1), Utils.mkEntry(topic3, 1))); + KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId); Review comment: Is there any reason for this change? ########## File path: clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java ########## @@ -4814,6 +4842,7 @@ private void buildDependencies(MetricConfig metricConfig, metadata = new ConsumerMetadata(0, metadataExpireMs, false, false, subscriptions, logContext, new ClusterResourceListeners()); client = new MockClient(time, metadata); + client.setNodeApiVersions(NodeApiVersions.create()); metrics = new Metrics(metricConfig, time); consumerClient = new ConsumerNetworkClient(logContext, client, metadata, time, 100, 1000, Integer.MAX_VALUE); Review comment: The PR changed how some errors are handled in the `Fetcher`. Do we have any tests for this new behavior? ########## File path: core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala ########## @@ -137,11 +137,11 @@ class DelayedFetchTest extends EasyMockSupport { val fetchStatus = FetchPartitionStatus( startOffsetMetadata = LogOffsetMetadata(fetchOffset), - fetchInfo = new FetchRequest.PartitionData(fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch, lastFetchedEpoch)) + fetchInfo = new FetchRequest.PartitionData(topicIds.get("topic"), fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch, lastFetchedEpoch)) Review comment: nit: It seems that we could use `TopicIdPartition` directly and remove `topicIds` map entirely. We could also pass the `TopicIdPartition` to `buildFetchMetadata`. ########## File path: clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java ########## @@ -465,18 +477,18 @@ public void testFetchError() { assertEquals(1, fetcher.sendFetches()); assertFalse(fetcher.hasCompletedFetches()); - client.prepareResponse(fullFetchResponse(tp0, this.records, Errors.NOT_LEADER_OR_FOLLOWER, 100L, 0)); + client.prepareResponse(fullFetchResponse(tidp0, this.records, Errors.NOT_LEADER_OR_FOLLOWER, 100L, 0)); consumerClient.poll(time.timer(0)); assertTrue(fetcher.hasCompletedFetches()); Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecords = fetchedRecords(); assertFalse(partitionRecords.containsKey(tp0)); } - private MockClient.RequestMatcher matchesOffset(final TopicPartition tp, final long offset) { + private MockClient.RequestMatcher matchesOffset(final TopicIdPartition tp, final long offset) { return body -> { FetchRequest fetch = (FetchRequest) body; - Map<TopicPartition, FetchRequest.PartitionData> fetchData = fetch.fetchData(topicNames); + Map<TopicIdPartition, FetchRequest.PartitionData> fetchData = fetch.fetchData(topicNames); Review comment: nit: There are two spaces after `=`. ########## File path: core/src/test/scala/unit/kafka/server/FetchSessionTest.scala ########## @@ -157,38 +156,38 @@ class FetchSessionTest { val cache = new FetchSessionCache(10, 1000) val fetchManager = new FetchManager(time, cache) - val tp0 = new TopicPartition("foo", 0) - val tp1 = new TopicPartition("foo", 1) - val tp2 = new TopicPartition("bar", 1) val topicIds = Map("foo" -> Uuid.randomUuid(), "bar" -> Uuid.randomUuid()).asJava + val tp0 = new TopicIdPartition(topicIds.get("foo"), new TopicPartition("foo", 0)) + val tp1 = new TopicIdPartition(topicIds.get("foo"), new TopicPartition("foo", 1)) + val tp2 = new TopicIdPartition(topicIds.get("bar"), new TopicPartition("bar", 1)) val topicNames = topicIds.asScala.map(_.swap).asJava - def cachedLeaderEpochs(context: FetchContext): Map[TopicPartition, Optional[Integer]] = { - val mapBuilder = Map.newBuilder[TopicPartition, Optional[Integer]] - context.foreachPartition((tp, _, data) => mapBuilder += tp -> data.currentLeaderEpoch) + def cachedLeaderEpochs(context: FetchContext): Map[TopicIdPartition, Optional[Integer]] = { + val mapBuilder = Map.newBuilder[TopicIdPartition, Optional[Integer]] + context.foreachPartition((tp, data) => mapBuilder += tp -> data.currentLeaderEpoch) mapBuilder.result() } val requestData1 = new util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData] - requestData1.put(tp0, new FetchRequest.PartitionData(0, 0, 100, Optional.empty())) - requestData1.put(tp1, new FetchRequest.PartitionData(10, 0, 100, Optional.of(1))) - requestData1.put(tp2, new FetchRequest.PartitionData(10, 0, 100, Optional.of(2))) + requestData1.put(tp0.topicPartition, new FetchRequest.PartitionData(topicIds.get("foo"), 0, 0, 100, Optional.empty())) + requestData1.put(tp1.topicPartition, new FetchRequest.PartitionData(topicIds.get("foo"), 10, 0, 100, Optional.of(1))) + requestData1.put(tp2.topicPartition, new FetchRequest.PartitionData(topicIds.get("bar"), 10, 0, 100, Optional.of(2))) Review comment: nit: We could get the topic id from `tp*.topicId`. ########## File path: core/src/test/scala/unit/kafka/server/FetchSessionTest.scala ########## @@ -659,88 +670,108 @@ class FetchSessionTest { } @Test - def testIncrementalFetchSessionWithIdsWhenSessionDoesNotUseIds() : Unit = { + def testFetchSessionWithUnknownId(): Unit = { val time = new MockTime() val cache = new FetchSessionCache(10, 1000) val fetchManager = new FetchManager(time, cache) - val topicIds = new util.HashMap[String, Uuid]() - val topicNames = new util.HashMap[Uuid, String]() + val topicNames = Map(Uuid.randomUuid() -> "foo", Uuid.randomUuid() -> "bar").asJava + val topicIds = topicNames.asScala.map(_.swap).asJava + val foo0 = new TopicIdPartition(topicIds.getOrDefault("foo", Uuid.ZERO_UUID), new TopicPartition("foo", 0)) + val foo1 = new TopicIdPartition(topicIds.getOrDefault("foo", Uuid.ZERO_UUID), new TopicPartition("foo", 1)) + val emptyFoo0 = new TopicIdPartition(topicIds.getOrDefault("foo", Uuid.ZERO_UUID), new TopicPartition(null, 0)) + val emptyFoo1 = new TopicIdPartition(topicIds.getOrDefault("foo", Uuid.ZERO_UUID), new TopicPartition(null, 1)) - // Create a new fetch session with foo-0 + // Create a new fetch session with foo-0 and foo-1 val reqData1 = new util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData] - reqData1.put(new TopicPartition("foo", 0), new FetchRequest.PartitionData(0, 0, 100, + reqData1.put(foo0.topicPartition, new FetchRequest.PartitionData(foo0.topicId, 0, 0, 100, Optional.empty())) - val request1 = createRequestWithoutTopicIds(JFetchMetadata.INITIAL, reqData1, topicIds, EMPTY_PART_LIST, false) - // Start a fetch session using a request version that does not use topic IDs. + reqData1.put(foo1.topicPartition, new FetchRequest.PartitionData(foo1.topicId,10, 0, 100, + Optional.empty())) + val request1 = createRequest(JFetchMetadata.INITIAL, reqData1, EMPTY_PART_LIST, false) + // Simulate unknown topic ID for foo. + val topicNamesOnlyBar = Collections.singletonMap(topicIds.get("bar"), "bar") + // We should not throw error since we have an older request version. val context1 = fetchManager.newContext( request1.version, request1.metadata, request1.isFromFollower, - request1.fetchData(topicNames), - request1.forgottenTopics(topicNames), - topicIds + request1.fetchData(topicNamesOnlyBar), + request1.forgottenTopics(topicNamesOnlyBar), + topicNamesOnlyBar ) assertEquals(classOf[FullFetchContext], context1.getClass) - val respData1 = new util.LinkedHashMap[TopicPartition, FetchResponseData.PartitionData] - respData1.put(new TopicPartition("foo", 0), new FetchResponseData.PartitionData() + val respData1 = new util.LinkedHashMap[TopicIdPartition, FetchResponseData.PartitionData] Review comment: Should we iterate over the partitions in the context to check the `TopicIdPartition`? ########## File path: clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java ########## @@ -222,6 +230,9 @@ private void assignFromUserNoId(Set<TopicPartition> partitions) { metadata.update(9, RequestTestUtils.metadataUpdateWithIds("dummy", 1, Collections.emptyMap(), singletonMap("noId", 1), tp -> validLeaderEpoch, topicIds), false, 0L); + + metadata.fetch().nodes().forEach(node -> + apiVersions.update(node.idString(), NodeApiVersions.create())); Review comment: ditto. There is a few other cases in this file. ########## File path: core/src/test/scala/unit/kafka/server/FetchSessionTest.scala ########## @@ -659,88 +670,108 @@ class FetchSessionTest { } @Test - def testIncrementalFetchSessionWithIdsWhenSessionDoesNotUseIds() : Unit = { + def testFetchSessionWithUnknownId(): Unit = { val time = new MockTime() val cache = new FetchSessionCache(10, 1000) val fetchManager = new FetchManager(time, cache) - val topicIds = new util.HashMap[String, Uuid]() - val topicNames = new util.HashMap[Uuid, String]() + val topicNames = Map(Uuid.randomUuid() -> "foo", Uuid.randomUuid() -> "bar").asJava + val topicIds = topicNames.asScala.map(_.swap).asJava + val foo0 = new TopicIdPartition(topicIds.getOrDefault("foo", Uuid.ZERO_UUID), new TopicPartition("foo", 0)) + val foo1 = new TopicIdPartition(topicIds.getOrDefault("foo", Uuid.ZERO_UUID), new TopicPartition("foo", 1)) + val emptyFoo0 = new TopicIdPartition(topicIds.getOrDefault("foo", Uuid.ZERO_UUID), new TopicPartition(null, 0)) + val emptyFoo1 = new TopicIdPartition(topicIds.getOrDefault("foo", Uuid.ZERO_UUID), new TopicPartition(null, 1)) - // Create a new fetch session with foo-0 + // Create a new fetch session with foo-0 and foo-1 val reqData1 = new util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData] - reqData1.put(new TopicPartition("foo", 0), new FetchRequest.PartitionData(0, 0, 100, + reqData1.put(foo0.topicPartition, new FetchRequest.PartitionData(foo0.topicId, 0, 0, 100, Optional.empty())) - val request1 = createRequestWithoutTopicIds(JFetchMetadata.INITIAL, reqData1, topicIds, EMPTY_PART_LIST, false) - // Start a fetch session using a request version that does not use topic IDs. + reqData1.put(foo1.topicPartition, new FetchRequest.PartitionData(foo1.topicId,10, 0, 100, + Optional.empty())) + val request1 = createRequest(JFetchMetadata.INITIAL, reqData1, EMPTY_PART_LIST, false) + // Simulate unknown topic ID for foo. + val topicNamesOnlyBar = Collections.singletonMap(topicIds.get("bar"), "bar") + // We should not throw error since we have an older request version. val context1 = fetchManager.newContext( request1.version, request1.metadata, request1.isFromFollower, - request1.fetchData(topicNames), - request1.forgottenTopics(topicNames), - topicIds + request1.fetchData(topicNamesOnlyBar), + request1.forgottenTopics(topicNamesOnlyBar), + topicNamesOnlyBar ) assertEquals(classOf[FullFetchContext], context1.getClass) - val respData1 = new util.LinkedHashMap[TopicPartition, FetchResponseData.PartitionData] - respData1.put(new TopicPartition("foo", 0), new FetchResponseData.PartitionData() + val respData1 = new util.LinkedHashMap[TopicIdPartition, FetchResponseData.PartitionData] + respData1.put(emptyFoo0, new FetchResponseData.PartitionData() .setPartitionIndex(0) - .setHighWatermark(100) - .setLastStableOffset(100) - .setLogStartOffset(100)) + .setErrorCode(Errors.UNKNOWN_TOPIC_ID.code)) + respData1.put(emptyFoo1, new FetchResponseData.PartitionData() + .setPartitionIndex(1) + .setErrorCode(Errors.UNKNOWN_TOPIC_ID.code)) val resp1 = context1.updateAndGenerateResponseData(respData1) + // On the latest request version, we should have unknown topic ID errors. assertEquals(Errors.NONE, resp1.error()) assertTrue(resp1.sessionId() != INVALID_SESSION_ID) + assertEquals(2, resp1.responseData(topicNames, request1.version).size) + resp1.responseData(topicNames, request1.version).forEach( (_, resp) => assertEquals(Errors.UNKNOWN_TOPIC_ID.code, resp.errorCode)) - // Create an incremental fetch request as though no topics changed. However, send a v13 request. - // Also simulate the topic ID found on the server. - val fooId = Uuid.randomUuid() - topicIds.put("foo", fooId) - topicNames.put(fooId, "foo") + // Create an incremental request where we resolve the partitions val reqData2 = new util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData] - val request2 = createRequest(new JFetchMetadata(resp1.sessionId(), 1), reqData2, topicIds, EMPTY_PART_LIST, false) + val request2 = createRequest(new JFetchMetadata(resp1.sessionId(), 1), reqData2, EMPTY_PART_LIST, false) val context2 = fetchManager.newContext( request2.version, request2.metadata, request2.isFromFollower, request2.fetchData(topicNames), request2.forgottenTopics(topicNames), - topicIds + topicNames ) - - assertEquals(classOf[SessionErrorContext], context2.getClass) - val respData2 = new util.LinkedHashMap[TopicPartition, FetchResponseData.PartitionData] - assertEquals(Errors.FETCH_SESSION_TOPIC_ID_ERROR, - context2.updateAndGenerateResponseData(respData2).error()) + // run through each partition to resolve them + context2.foreachPartition((_, _) => ()) Review comment: Should we assert that the `TopicIdPartition` received here contains the topic name? ########## File path: core/src/test/scala/unit/kafka/server/FetchSessionTest.scala ########## @@ -659,88 +670,108 @@ class FetchSessionTest { } @Test - def testIncrementalFetchSessionWithIdsWhenSessionDoesNotUseIds() : Unit = { + def testFetchSessionWithUnknownId(): Unit = { val time = new MockTime() val cache = new FetchSessionCache(10, 1000) val fetchManager = new FetchManager(time, cache) - val topicIds = new util.HashMap[String, Uuid]() - val topicNames = new util.HashMap[Uuid, String]() + val topicNames = Map(Uuid.randomUuid() -> "foo", Uuid.randomUuid() -> "bar").asJava + val topicIds = topicNames.asScala.map(_.swap).asJava + val foo0 = new TopicIdPartition(topicIds.getOrDefault("foo", Uuid.ZERO_UUID), new TopicPartition("foo", 0)) + val foo1 = new TopicIdPartition(topicIds.getOrDefault("foo", Uuid.ZERO_UUID), new TopicPartition("foo", 1)) + val emptyFoo0 = new TopicIdPartition(topicIds.getOrDefault("foo", Uuid.ZERO_UUID), new TopicPartition(null, 0)) + val emptyFoo1 = new TopicIdPartition(topicIds.getOrDefault("foo", Uuid.ZERO_UUID), new TopicPartition(null, 1)) - // Create a new fetch session with foo-0 + // Create a new fetch session with foo-0 and foo-1 val reqData1 = new util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData] - reqData1.put(new TopicPartition("foo", 0), new FetchRequest.PartitionData(0, 0, 100, + reqData1.put(foo0.topicPartition, new FetchRequest.PartitionData(foo0.topicId, 0, 0, 100, Optional.empty())) - val request1 = createRequestWithoutTopicIds(JFetchMetadata.INITIAL, reqData1, topicIds, EMPTY_PART_LIST, false) - // Start a fetch session using a request version that does not use topic IDs. + reqData1.put(foo1.topicPartition, new FetchRequest.PartitionData(foo1.topicId,10, 0, 100, + Optional.empty())) + val request1 = createRequest(JFetchMetadata.INITIAL, reqData1, EMPTY_PART_LIST, false) + // Simulate unknown topic ID for foo. + val topicNamesOnlyBar = Collections.singletonMap(topicIds.get("bar"), "bar") + // We should not throw error since we have an older request version. val context1 = fetchManager.newContext( request1.version, request1.metadata, request1.isFromFollower, - request1.fetchData(topicNames), - request1.forgottenTopics(topicNames), - topicIds + request1.fetchData(topicNamesOnlyBar), + request1.forgottenTopics(topicNamesOnlyBar), + topicNamesOnlyBar ) assertEquals(classOf[FullFetchContext], context1.getClass) - val respData1 = new util.LinkedHashMap[TopicPartition, FetchResponseData.PartitionData] - respData1.put(new TopicPartition("foo", 0), new FetchResponseData.PartitionData() + val respData1 = new util.LinkedHashMap[TopicIdPartition, FetchResponseData.PartitionData] + respData1.put(emptyFoo0, new FetchResponseData.PartitionData() .setPartitionIndex(0) - .setHighWatermark(100) - .setLastStableOffset(100) - .setLogStartOffset(100)) + .setErrorCode(Errors.UNKNOWN_TOPIC_ID.code)) + respData1.put(emptyFoo1, new FetchResponseData.PartitionData() + .setPartitionIndex(1) + .setErrorCode(Errors.UNKNOWN_TOPIC_ID.code)) val resp1 = context1.updateAndGenerateResponseData(respData1) + // On the latest request version, we should have unknown topic ID errors. assertEquals(Errors.NONE, resp1.error()) assertTrue(resp1.sessionId() != INVALID_SESSION_ID) + assertEquals(2, resp1.responseData(topicNames, request1.version).size) + resp1.responseData(topicNames, request1.version).forEach( (_, resp) => assertEquals(Errors.UNKNOWN_TOPIC_ID.code, resp.errorCode)) - // Create an incremental fetch request as though no topics changed. However, send a v13 request. - // Also simulate the topic ID found on the server. - val fooId = Uuid.randomUuid() - topicIds.put("foo", fooId) - topicNames.put(fooId, "foo") + // Create an incremental request where we resolve the partitions val reqData2 = new util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData] - val request2 = createRequest(new JFetchMetadata(resp1.sessionId(), 1), reqData2, topicIds, EMPTY_PART_LIST, false) + val request2 = createRequest(new JFetchMetadata(resp1.sessionId(), 1), reqData2, EMPTY_PART_LIST, false) val context2 = fetchManager.newContext( request2.version, request2.metadata, request2.isFromFollower, request2.fetchData(topicNames), request2.forgottenTopics(topicNames), - topicIds + topicNames ) - - assertEquals(classOf[SessionErrorContext], context2.getClass) - val respData2 = new util.LinkedHashMap[TopicPartition, FetchResponseData.PartitionData] - assertEquals(Errors.FETCH_SESSION_TOPIC_ID_ERROR, - context2.updateAndGenerateResponseData(respData2).error()) + // run through each partition to resolve them Review comment: nit: I would expand this comment a little and stress the fact that topic names are lazily resolved when the partitions are iterated over. ########## File path: core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala ########## @@ -1091,18 +1089,20 @@ class AbstractFetcherThreadTest { override def buildFetch(partitionMap: Map[TopicPartition, PartitionFetchState]): ResultWithPartitions[Option[ReplicaFetch]] = { val fetchData = mutable.Map.empty[TopicPartition, FetchRequest.PartitionData] - partitionMap.foreach { case (partition, state) => + partitionMap.foreach { case (partition, state) => 0 + .equals(0) Review comment: `0.equals(0)` was very likely put here by mistake. ########## File path: core/src/test/scala/unit/kafka/server/FetchSessionTest.scala ########## @@ -659,88 +670,108 @@ class FetchSessionTest { } @Test - def testIncrementalFetchSessionWithIdsWhenSessionDoesNotUseIds() : Unit = { + def testFetchSessionWithUnknownId(): Unit = { val time = new MockTime() val cache = new FetchSessionCache(10, 1000) val fetchManager = new FetchManager(time, cache) - val topicIds = new util.HashMap[String, Uuid]() - val topicNames = new util.HashMap[Uuid, String]() + val topicNames = Map(Uuid.randomUuid() -> "foo", Uuid.randomUuid() -> "bar").asJava + val topicIds = topicNames.asScala.map(_.swap).asJava Review comment: It seems to be that it would be simpler to declare `fooId` and `barId` and to use them instead of getting them from the map. ########## File path: core/src/test/scala/unit/kafka/server/KafkaApisTest.scala ########## @@ -3530,37 +3534,37 @@ class KafkaApisTest { def testSizeOfThrottledPartitions(): Unit = { val topicNames = new util.HashMap[Uuid, String] val topicIds = new util.HashMap[String, Uuid]() - def fetchResponse(data: Map[TopicPartition, String]): FetchResponse = { - val responseData = new util.LinkedHashMap[TopicPartition, FetchResponseData.PartitionData]( + def fetchResponse(data: Map[TopicIdPartition, String]): FetchResponse = { + val responseData = new util.LinkedHashMap[TopicIdPartition, FetchResponseData.PartitionData]( data.map { case (tp, raw) => tp -> new FetchResponseData.PartitionData() - .setPartitionIndex(tp.partition) + .setPartitionIndex(tp.topicPartition.partition) .setHighWatermark(105) .setLastStableOffset(105) .setLogStartOffset(0) .setRecords(MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord(100, raw.getBytes(StandardCharsets.UTF_8)))) }.toMap.asJava) data.foreach{case (tp, _) => - val id = Uuid.randomUuid() - topicIds.put(tp.topic(), id) - topicNames.put(id, tp.topic()) + topicIds.put(tp.topicPartition.topic, tp.topicId) + topicNames.put(tp.topicId, tp.topicPartition.topic) } - FetchResponse.of(Errors.NONE, 100, 100, responseData, topicIds) + FetchResponse.of(Errors.NONE, 100, 100, responseData) } - val throttledPartition = new TopicPartition("throttledData", 0) + val throttledPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("throttledData", 0)) val throttledData = Map(throttledPartition -> "throttledData") val expectedSize = FetchResponse.sizeOf(FetchResponseData.HIGHEST_SUPPORTED_VERSION, - fetchResponse(throttledData).responseData(topicNames, FetchResponseData.HIGHEST_SUPPORTED_VERSION).entrySet.iterator, topicIds) + fetchResponse(throttledData).responseData(topicNames, FetchResponseData.HIGHEST_SUPPORTED_VERSION).entrySet.asScala.map( entry => + (new TopicIdPartition(Uuid.ZERO_UUID, entry.getKey), entry.getValue)).toMap.asJava.entrySet.iterator) - val response = fetchResponse(throttledData ++ Map(new TopicPartition("nonThrottledData", 0) -> "nonThrottledData")) + val response = fetchResponse(throttledData ++ Map(new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("nonThrottledData", 0)) -> "nonThrottledData")) val quota = Mockito.mock(classOf[ReplicationQuotaManager]) Mockito.when(quota.isThrottled(ArgumentMatchers.any(classOf[TopicPartition]))) - .thenAnswer(invocation => throttledPartition == invocation.getArgument(0).asInstanceOf[TopicPartition]) + .thenAnswer(invocation => throttledPartition.topicPartition == invocation.getArgument(0).asInstanceOf[TopicPartition]) - assertEquals(expectedSize, KafkaApis.sizeOfThrottledPartitions(FetchResponseData.HIGHEST_SUPPORTED_VERSION, response, quota, topicIds)) + assertEquals(expectedSize, KafkaApis.sizeOfThrottledPartitions(FetchResponseData.HIGHEST_SUPPORTED_VERSION, response, quota)) } @Test Review comment: Should we add any tests for the new logic in KafkaApis? ########## File path: core/src/test/scala/unit/kafka/server/FetchSessionTest.scala ########## @@ -659,88 +670,108 @@ class FetchSessionTest { } @Test - def testIncrementalFetchSessionWithIdsWhenSessionDoesNotUseIds() : Unit = { + def testFetchSessionWithUnknownId(): Unit = { val time = new MockTime() val cache = new FetchSessionCache(10, 1000) val fetchManager = new FetchManager(time, cache) - val topicIds = new util.HashMap[String, Uuid]() - val topicNames = new util.HashMap[Uuid, String]() + val topicNames = Map(Uuid.randomUuid() -> "foo", Uuid.randomUuid() -> "bar").asJava Review comment: I wonder if we should add a third topic which is never resolved. What do you think? ########## File path: core/src/test/scala/unit/kafka/server/FetchSessionTest.scala ########## @@ -1361,102 +1542,113 @@ class FetchSessionTest { val resp4 = context2.updateAndGenerateResponseData(respData) assertEquals(Errors.NONE, resp4.error) assertEquals(resp1.sessionId, resp4.sessionId) - assertEquals(Utils.mkSet(tp1, tp2), resp4.responseData(topicNames, request2.version).keySet) + assertEquals(Utils.mkSet(tp1.topicPartition, tp2.topicPartition), resp4.responseData(topicNames, request2.version).keySet) } @Test def testDeprioritizesPartitionsWithRecordsOnly(): Unit = { val time = new MockTime() val cache = new FetchSessionCache(10, 1000) val fetchManager = new FetchManager(time, cache) - val tp1 = new TopicPartition("foo", 1) - val tp2 = new TopicPartition("bar", 2) - val tp3 = new TopicPartition("zar", 3) val topicIds = Map("foo" -> Uuid.randomUuid(), "bar" -> Uuid.randomUuid(), "zar" -> Uuid.randomUuid()).asJava val topicNames = topicIds.asScala.map(_.swap).asJava + val tp1 = new TopicIdPartition(topicIds.get("foo"), new TopicPartition("foo", 1)) + val tp2 = new TopicIdPartition(topicIds.get("bar"), new TopicPartition("bar", 2)) + val tp3 = new TopicIdPartition(topicIds.get("zar"), new TopicPartition("zar", 3)) - val reqData = new util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData] - reqData.put(tp1, new FetchRequest.PartitionData(100, 0, 1000, Optional.of(5), Optional.of(4))) - reqData.put(tp2, new FetchRequest.PartitionData(100, 0, 1000, Optional.of(5), Optional.of(4))) - reqData.put(tp3, new FetchRequest.PartitionData(100, 0, 1000, Optional.of(5), Optional.of(4))) + val reqData = new util.LinkedHashMap[TopicIdPartition, FetchRequest.PartitionData] + reqData.put(tp1, new FetchRequest.PartitionData(topicIds.get("foo"), 100, 0, 1000, Optional.of(5), Optional.of(4))) + reqData.put(tp2, new FetchRequest.PartitionData(topicIds.get("bar"), 100, 0, 1000, Optional.of(5), Optional.of(4))) + reqData.put(tp3, new FetchRequest.PartitionData(topicIds.get("zar"), 100, 0, 1000, Optional.of(5), Optional.of(4))) // Full fetch context returns all partitions in the response val context1 = fetchManager.newContext(ApiKeys.FETCH.latestVersion(), JFetchMetadata.INITIAL, false, - reqData, Collections.emptyList(), topicIds) + reqData, Collections.emptyList(), topicNames) assertEquals(classOf[FullFetchContext], context1.getClass) - val respData1 = new util.LinkedHashMap[TopicPartition, FetchResponseData.PartitionData] + val respData1 = new util.LinkedHashMap[TopicIdPartition, FetchResponseData.PartitionData] respData1.put(tp1, new FetchResponseData.PartitionData() - .setPartitionIndex(tp1.partition) + .setPartitionIndex(tp1.topicPartition.partition) .setHighWatermark(50) .setLastStableOffset(50) .setLogStartOffset(0)) respData1.put(tp2, new FetchResponseData.PartitionData() - .setPartitionIndex(tp2.partition) + .setPartitionIndex(tp2.topicPartition.partition) .setHighWatermark(50) .setLastStableOffset(50) .setLogStartOffset(0)) respData1.put(tp3, new FetchResponseData.PartitionData() - .setPartitionIndex(tp3.partition) + .setPartitionIndex(tp3.topicPartition.partition) .setHighWatermark(50) .setLastStableOffset(50) .setLogStartOffset(0)) val resp1 = context1.updateAndGenerateResponseData(respData1) assertEquals(Errors.NONE, resp1.error) assertNotEquals(INVALID_SESSION_ID, resp1.sessionId) - assertEquals(Utils.mkSet(tp1, tp2, tp3), resp1.responseData(topicNames, ApiKeys.FETCH.latestVersion()).keySet()) + assertEquals(Utils.mkSet(tp1.topicPartition, tp2.topicPartition, tp3.topicPartition), resp1.responseData(topicNames, ApiKeys.FETCH.latestVersion()).keySet()) // Incremental fetch context returns partitions with changes but only deprioritizes // the partitions with records val context2 = fetchManager.newContext(ApiKeys.FETCH.latestVersion(), new JFetchMetadata(resp1.sessionId, 1), false, - reqData, Collections.emptyList(), topicIds) + reqData, Collections.emptyList(), topicNames) assertEquals(classOf[IncrementalFetchContext], context2.getClass) // Partitions are ordered in the session as per last response assertPartitionsOrder(context2, Seq(tp1, tp2, tp3)) // Response is empty - val respData2 = new util.LinkedHashMap[TopicPartition, FetchResponseData.PartitionData] + val respData2 = new util.LinkedHashMap[TopicIdPartition, FetchResponseData.PartitionData] val resp2 = context2.updateAndGenerateResponseData(respData2) assertEquals(Errors.NONE, resp2.error) assertEquals(resp1.sessionId, resp2.sessionId) assertEquals(Collections.emptySet(), resp2.responseData(topicNames, ApiKeys.FETCH.latestVersion()).keySet) // All partitions with changes should be returned. - val respData3 = new util.LinkedHashMap[TopicPartition, FetchResponseData.PartitionData] + val respData3 = new util.LinkedHashMap[TopicIdPartition, FetchResponseData.PartitionData] respData3.put(tp1, new FetchResponseData.PartitionData() - .setPartitionIndex(tp1.partition) + .setPartitionIndex(tp1.topicPartition.partition) .setHighWatermark(60) .setLastStableOffset(50) .setLogStartOffset(0)) respData3.put(tp2, new FetchResponseData.PartitionData() - .setPartitionIndex(tp2.partition) + .setPartitionIndex(tp2.topicPartition.partition) .setHighWatermark(60) .setLastStableOffset(50) .setLogStartOffset(0) .setRecords(MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord(100, null)))) respData3.put(tp3, new FetchResponseData.PartitionData() - .setPartitionIndex(tp3.partition) + .setPartitionIndex(tp3.topicPartition.partition) .setHighWatermark(50) .setLastStableOffset(50) .setLogStartOffset(0)) val resp3 = context2.updateAndGenerateResponseData(respData3) assertEquals(Errors.NONE, resp3.error) assertEquals(resp1.sessionId, resp3.sessionId) - assertEquals(Utils.mkSet(tp1, tp2), resp3.responseData(topicNames, ApiKeys.FETCH.latestVersion()).keySet) + assertEquals(Utils.mkSet(tp1.topicPartition, tp2.topicPartition), resp3.responseData(topicNames, ApiKeys.FETCH.latestVersion()).keySet) // Only the partitions whose returned records in the last response // were deprioritized assertPartitionsOrder(context2, Seq(tp1, tp3, tp2)) } - private def assertPartitionsOrder(context: FetchContext, partitions: Seq[TopicPartition]): Unit = { - val partitionsInContext = ArrayBuffer.empty[TopicPartition] - context.foreachPartition { (tp, _, _) => + private def assertPartitionsOrder(context: FetchContext, partitions: Seq[TopicIdPartition]): Unit = { + val partitionsInContext = ArrayBuffer.empty[TopicIdPartition] + context.foreachPartition { (tp, _) => partitionsInContext += tp } assertEquals(partitions, partitionsInContext.toSeq) } } + +object FetchSessionTest { + def idUsageCombinations: java.util.stream.Stream[Arguments] = { + Seq( + Arguments.of(true, true), + Arguments.of(true, false), + Arguments.of(false, true), + Arguments.of(false, false) + ).asJava.stream() + } +} Review comment: I wonder if we could add a few more unit tests. For instance, we should test the equals/hash methods of the CachedPartition (and possibly other methods there). We might want to add some for other classes as well. What do you think? ########## File path: clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java ########## @@ -285,52 +268,57 @@ public FetchRequestData build() { if (nextMetadata.isFull()) { if (log.isDebugEnabled()) { log.debug("Built full fetch {} for node {} with {}.", - nextMetadata, node, partitionsToLogString(next.keySet())); + nextMetadata, node, topicPartitionsToLogString(next.keySet())); } sessionPartitions = next; next = null; + Map<TopicPartition, PartitionData> toSend = + Collections.unmodifiableMap(new LinkedHashMap<>(sessionPartitions)); // Only add topic IDs to the session if we are using topic IDs. if (canUseTopicIds) { - sessionTopicIds = topicIds; - sessionTopicNames = new HashMap<>(topicIds.size()); - topicIds.forEach((name, id) -> sessionTopicNames.put(id, name)); + Map<Uuid, Set<String>> newTopicNames = sessionPartitions.entrySet().stream().collect(Collectors.groupingByConcurrent(entry -> entry.getValue().topicId, + Collectors.mapping(entry -> entry.getKey().topic(), Collectors.toSet()))); Review comment: I think that the grouping is slower because it has to allocate another Map, Sets for each Uuid, etc. ########## File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala ########## @@ -374,7 +374,7 @@ abstract class AbstractFetcherThread(name: String, } } } catch { - case ime@( _: CorruptRecordException | _: InvalidRecordException) => + case ime@(_: CorruptRecordException | _: InvalidRecordException) => Review comment: I think that would for instance append when the controller fails over to an older IBP during an upgrade. This should remove the topic ids which means that v12 will be used for the next fetch request and trigger a FETCH_SESSION_TOPIC_ID_ERROR. In this particular case, re-trying directly would be the optimal way to proceed for a follower. I wonder if they are other cases to consider here. For the consumer, it is definitely different. ########## File path: core/src/main/scala/kafka/server/FetchSession.scala ########## @@ -163,18 +173,35 @@ class CachedPartition(val topic: String, mustRespond } - override def hashCode: Int = Objects.hash(new TopicPartition(topic, partition), topicId) + /** + * We have different equality checks depending on whether topic IDs are used. + * This means we need a different hash function as well. We use name to calculate the hash if the ID is zero and unused. + * Otherwise, we use the topic ID in the hash calculation. + * + * @return the hash code for the CachedPartition depending on what request version we are using. + */ + override def hashCode: Int = if (topicId != Uuid.ZERO_UUID) (31 * partition) + topicId.hashCode else + (31 * partition) + topic.hashCode def canEqual(that: Any): Boolean = that.isInstanceOf[CachedPartition] + /** + * We have different equality checks depending on whether topic IDs are used. + * + * This is because when we use topic IDs, a partition with a given ID and an unknown name is the same as a partition with that + * ID and a known name. This means we can only use topic ID and partition when determining equality. + * + * On the other hand, if we are using topic names, all IDs are zero. This means we can only use topic name and partition + * when determining equality. + */ override def equals(that: Any): Boolean = that match { case that: CachedPartition => this.eq(that) || (that.canEqual(this) && Review comment: Right. It seems to be that the `canEqual(this)` does not make any sense here. Could you double check? ########## File path: core/src/main/scala/kafka/server/KafkaApis.scala ########## @@ -708,40 +701,41 @@ class KafkaApis(val requestChannel: RequestChannel, None } - val erroneous = mutable.ArrayBuffer[(TopicPartition, FetchResponseData.PartitionData)]() - val interesting = mutable.ArrayBuffer[(TopicPartition, FetchRequest.PartitionData)]() - val sessionTopicIds = mutable.Map[String, Uuid]() + val erroneous = mutable.ArrayBuffer[(TopicIdPartition, FetchResponseData.PartitionData)]() + val interesting = mutable.ArrayBuffer[(TopicIdPartition, FetchRequest.PartitionData)]() if (fetchRequest.isFromFollower) { // The follower must have ClusterAction on ClusterResource in order to fetch partition data. if (authHelper.authorize(request.context, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME)) { - fetchContext.foreachPartition { (topicPartition, topicId, data) => - sessionTopicIds.put(topicPartition.topic(), topicId) - if (!metadataCache.contains(topicPartition)) - erroneous += topicPartition -> FetchResponse.partitionResponse(topicPartition.partition, Errors.UNKNOWN_TOPIC_OR_PARTITION) + fetchContext.foreachPartition { (topicIdPartition, data) => + if (topicIdPartition.topicPartition.topic == null ) + erroneous += topicIdPartition -> FetchResponse.partitionResponse(topicIdPartition, Errors.UNKNOWN_TOPIC_ID) + else if (!metadataCache.contains(topicIdPartition.topicPartition)) + erroneous += topicIdPartition -> FetchResponse.partitionResponse(topicIdPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION) else - interesting += (topicPartition -> data) + interesting += (topicIdPartition -> data) } } else { - fetchContext.foreachPartition { (part, topicId, _) => - sessionTopicIds.put(part.topic(), topicId) - erroneous += part -> FetchResponse.partitionResponse(part.partition, Errors.TOPIC_AUTHORIZATION_FAILED) + fetchContext.foreachPartition { (topicIdPartition, _) => + erroneous += topicIdPartition -> FetchResponse.partitionResponse(topicIdPartition, Errors.TOPIC_AUTHORIZATION_FAILED) Review comment: I guess that it does not change much in the end. I was considering this in order to be consistent with how we handle this for the consumer. ########## File path: clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java ########## @@ -66,16 +69,28 @@ public PartitionData( int maxBytes, Optional<Integer> currentLeaderEpoch ) { - this(fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch, Optional.empty()); + this(Uuid.ZERO_UUID, fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch, Optional.empty()); Review comment: Yeah, that's a good question. I guess that that constructor is convenient for tests but might be bug prone in the regular code. I am tempted to remove it entirely.... What do you think? ########## File path: clients/src/test/java/org/apache/kafka/clients/FetchSessionHandlerTest.java ########## @@ -428,82 +442,165 @@ public void testIdUsageRevokedOnIdDowngrade() { List<Integer> partitions = Arrays.asList(0, 1); partitions.forEach(partition -> { String testType = partition == 0 ? "updating a partition" : "adding a new partition"; - Map<String, Uuid> topicIds = Collections.singletonMap("foo", Uuid.randomUuid()); + Uuid fooId = Uuid.randomUuid(); FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 1); FetchSessionHandler.Builder builder = handler.newBuilder(); - builder.add(new TopicPartition("foo", 0), topicIds.get("foo"), - new FetchRequest.PartitionData(0, 100, 200, Optional.empty())); + builder.add(new TopicPartition("foo", 0), + new FetchRequest.PartitionData(fooId, 0, 100, 200, Optional.empty())); FetchSessionHandler.FetchRequestData data = builder.build(); - assertMapsEqual(reqMap(new ReqEntry("foo", 0, 0, 100, 200)), + assertMapsEqual(reqMap(new ReqEntry("foo", fooId, 0, 0, 100, 200)), data.toSend(), data.sessionPartitions()); assertTrue(data.metadata().isFull()); assertTrue(data.canUseTopicIds()); FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123, - respMap(new RespEntry("foo", 0, 10, 20)), topicIds); + respMap(new RespEntry("foo", 0, fooId, 10, 20))); handler.handleResponse(resp, ApiKeys.FETCH.latestVersion()); // Try to remove a topic ID from an existing topic partition (0) or add a new topic partition (1) without an ID. FetchSessionHandler.Builder builder2 = handler.newBuilder(); - builder2.add(new TopicPartition("foo", partition), Uuid.ZERO_UUID, - new FetchRequest.PartitionData(10, 110, 210, Optional.empty())); + builder2.add(new TopicPartition("foo", partition), + new FetchRequest.PartitionData(Uuid.ZERO_UUID, 10, 110, 210, Optional.empty())); FetchSessionHandler.FetchRequestData data2 = builder2.build(); - // Should have the same session ID and next epoch, but can no longer use topic IDs. - // The receiving broker will close the session if we were previously using topic IDs. + // Should have the same session ID, and next epoch and can no longer use topic IDs. + // The receiving broker will handle closing the session. assertEquals(123, data2.metadata().sessionId(), "Did not use same session when " + testType); assertEquals(1, data2.metadata().epoch(), "Did not have correct epoch when " + testType); assertFalse(data2.canUseTopicIds()); }); } + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testTopicIdReplaced(boolean fetchRequestUsesIds) { + TopicPartition tp = new TopicPartition("foo", 0); + FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 1); + FetchSessionHandler.Builder builder = handler.newBuilder(); + Uuid topicId1 = Uuid.randomUuid(); + builder.add(tp, + new FetchRequest.PartitionData(topicId1, 0, 100, 200, Optional.empty())); + FetchSessionHandler.FetchRequestData data = builder.build(); + assertMapsEqual(reqMap(new ReqEntry("foo", topicId1, 0, 0, 100, 200)), + data.toSend(), data.sessionPartitions()); + assertTrue(data.metadata().isFull()); + assertTrue(data.canUseTopicIds()); + + FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123, + respMap(new RespEntry("foo", 0, Uuid.ZERO_UUID, 10, 20))); + handler.handleResponse(resp, (short) 12); + + // Try to add a new topic ID. + FetchSessionHandler.Builder builder2 = handler.newBuilder(); + Uuid topicId2 = Uuid.randomUuid(); + // Use the same data besides the topic ID. + FetchRequest.PartitionData partitionData = new FetchRequest.PartitionData(topicId2, 0, 100, 200, Optional.empty()); + builder2.add(tp, partitionData); + FetchSessionHandler.FetchRequestData data2 = builder2.build(); + // The old topic ID partition should be in toReplace, and the new one should be in toSend. + assertMapsEqual(reqMap(new ReqEntry("foo", topicId2, 0, 0, 100, 200)), + data2.toSend(), data2.sessionPartitions()); + assertEquals(Collections.singletonList(new TopicIdPartition(topicId1, tp)), data2.toReplace()); + // Should have the same session ID, and next epoch and can use topic IDs. + assertEquals(123, data2.metadata().sessionId(), "Did not use same session"); + assertEquals(1, data2.metadata().epoch(), "Did not have correct epoch"); + assertTrue(data2.canUseTopicIds()); + + short version = fetchRequestUsesIds ? ApiKeys.FETCH.latestVersion() : 12; Review comment: Sorry, I meant below assertions not above. Yes, it seems that they are testing the logic of the `FetchRequest` itself and not really the logic of the FetchSessionHandler. ########## File path: clients/src/test/java/org/apache/kafka/clients/FetchSessionHandlerTest.java ########## @@ -428,82 +442,165 @@ public void testIdUsageRevokedOnIdDowngrade() { List<Integer> partitions = Arrays.asList(0, 1); partitions.forEach(partition -> { String testType = partition == 0 ? "updating a partition" : "adding a new partition"; - Map<String, Uuid> topicIds = Collections.singletonMap("foo", Uuid.randomUuid()); + Uuid fooId = Uuid.randomUuid(); FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 1); FetchSessionHandler.Builder builder = handler.newBuilder(); - builder.add(new TopicPartition("foo", 0), topicIds.get("foo"), - new FetchRequest.PartitionData(0, 100, 200, Optional.empty())); + builder.add(new TopicPartition("foo", 0), + new FetchRequest.PartitionData(fooId, 0, 100, 200, Optional.empty())); FetchSessionHandler.FetchRequestData data = builder.build(); - assertMapsEqual(reqMap(new ReqEntry("foo", 0, 0, 100, 200)), + assertMapsEqual(reqMap(new ReqEntry("foo", fooId, 0, 0, 100, 200)), data.toSend(), data.sessionPartitions()); assertTrue(data.metadata().isFull()); assertTrue(data.canUseTopicIds()); FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123, - respMap(new RespEntry("foo", 0, 10, 20)), topicIds); + respMap(new RespEntry("foo", 0, fooId, 10, 20))); handler.handleResponse(resp, ApiKeys.FETCH.latestVersion()); // Try to remove a topic ID from an existing topic partition (0) or add a new topic partition (1) without an ID. FetchSessionHandler.Builder builder2 = handler.newBuilder(); - builder2.add(new TopicPartition("foo", partition), Uuid.ZERO_UUID, - new FetchRequest.PartitionData(10, 110, 210, Optional.empty())); + builder2.add(new TopicPartition("foo", partition), + new FetchRequest.PartitionData(Uuid.ZERO_UUID, 10, 110, 210, Optional.empty())); FetchSessionHandler.FetchRequestData data2 = builder2.build(); - // Should have the same session ID and next epoch, but can no longer use topic IDs. - // The receiving broker will close the session if we were previously using topic IDs. + // Should have the same session ID, and next epoch and can no longer use topic IDs. + // The receiving broker will handle closing the session. assertEquals(123, data2.metadata().sessionId(), "Did not use same session when " + testType); assertEquals(1, data2.metadata().epoch(), "Did not have correct epoch when " + testType); assertFalse(data2.canUseTopicIds()); }); } + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testTopicIdReplaced(boolean fetchRequestUsesIds) { + TopicPartition tp = new TopicPartition("foo", 0); + FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 1); + FetchSessionHandler.Builder builder = handler.newBuilder(); + Uuid topicId1 = Uuid.randomUuid(); + builder.add(tp, + new FetchRequest.PartitionData(topicId1, 0, 100, 200, Optional.empty())); + FetchSessionHandler.FetchRequestData data = builder.build(); + assertMapsEqual(reqMap(new ReqEntry("foo", topicId1, 0, 0, 100, 200)), + data.toSend(), data.sessionPartitions()); + assertTrue(data.metadata().isFull()); + assertTrue(data.canUseTopicIds()); + + FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123, + respMap(new RespEntry("foo", 0, Uuid.ZERO_UUID, 10, 20))); + handler.handleResponse(resp, (short) 12); + + // Try to add a new topic ID. + FetchSessionHandler.Builder builder2 = handler.newBuilder(); + Uuid topicId2 = Uuid.randomUuid(); + // Use the same data besides the topic ID. + FetchRequest.PartitionData partitionData = new FetchRequest.PartitionData(topicId2, 0, 100, 200, Optional.empty()); + builder2.add(tp, partitionData); + FetchSessionHandler.FetchRequestData data2 = builder2.build(); + // The old topic ID partition should be in toReplace, and the new one should be in toSend. + assertMapsEqual(reqMap(new ReqEntry("foo", topicId2, 0, 0, 100, 200)), + data2.toSend(), data2.sessionPartitions()); + assertEquals(Collections.singletonList(new TopicIdPartition(topicId1, tp)), data2.toReplace()); Review comment: Correct. I was referring to the upgrade case. We might need to handle the downgrade case for https://github.com/apache/kafka/pull/11459. ########## File path: clients/src/test/java/org/apache/kafka/clients/FetchSessionHandlerTest.java ########## @@ -428,82 +442,165 @@ public void testIdUsageRevokedOnIdDowngrade() { List<Integer> partitions = Arrays.asList(0, 1); partitions.forEach(partition -> { String testType = partition == 0 ? "updating a partition" : "adding a new partition"; - Map<String, Uuid> topicIds = Collections.singletonMap("foo", Uuid.randomUuid()); + Uuid fooId = Uuid.randomUuid(); FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 1); FetchSessionHandler.Builder builder = handler.newBuilder(); - builder.add(new TopicPartition("foo", 0), topicIds.get("foo"), - new FetchRequest.PartitionData(0, 100, 200, Optional.empty())); + builder.add(new TopicPartition("foo", 0), + new FetchRequest.PartitionData(fooId, 0, 100, 200, Optional.empty())); FetchSessionHandler.FetchRequestData data = builder.build(); - assertMapsEqual(reqMap(new ReqEntry("foo", 0, 0, 100, 200)), + assertMapsEqual(reqMap(new ReqEntry("foo", fooId, 0, 0, 100, 200)), data.toSend(), data.sessionPartitions()); assertTrue(data.metadata().isFull()); assertTrue(data.canUseTopicIds()); FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123, - respMap(new RespEntry("foo", 0, 10, 20)), topicIds); + respMap(new RespEntry("foo", 0, fooId, 10, 20))); handler.handleResponse(resp, ApiKeys.FETCH.latestVersion()); // Try to remove a topic ID from an existing topic partition (0) or add a new topic partition (1) without an ID. FetchSessionHandler.Builder builder2 = handler.newBuilder(); - builder2.add(new TopicPartition("foo", partition), Uuid.ZERO_UUID, - new FetchRequest.PartitionData(10, 110, 210, Optional.empty())); + builder2.add(new TopicPartition("foo", partition), + new FetchRequest.PartitionData(Uuid.ZERO_UUID, 10, 110, 210, Optional.empty())); FetchSessionHandler.FetchRequestData data2 = builder2.build(); - // Should have the same session ID and next epoch, but can no longer use topic IDs. - // The receiving broker will close the session if we were previously using topic IDs. + // Should have the same session ID, and next epoch and can no longer use topic IDs. + // The receiving broker will handle closing the session. assertEquals(123, data2.metadata().sessionId(), "Did not use same session when " + testType); assertEquals(1, data2.metadata().epoch(), "Did not have correct epoch when " + testType); assertFalse(data2.canUseTopicIds()); }); } + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testTopicIdReplaced(boolean fetchRequestUsesIds) { + TopicPartition tp = new TopicPartition("foo", 0); + FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 1); + FetchSessionHandler.Builder builder = handler.newBuilder(); + Uuid topicId1 = Uuid.randomUuid(); + builder.add(tp, + new FetchRequest.PartitionData(topicId1, 0, 100, 200, Optional.empty())); + FetchSessionHandler.FetchRequestData data = builder.build(); + assertMapsEqual(reqMap(new ReqEntry("foo", topicId1, 0, 0, 100, 200)), + data.toSend(), data.sessionPartitions()); + assertTrue(data.metadata().isFull()); + assertTrue(data.canUseTopicIds()); + + FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123, + respMap(new RespEntry("foo", 0, Uuid.ZERO_UUID, 10, 20))); + handler.handleResponse(resp, (short) 12); + + // Try to add a new topic ID. + FetchSessionHandler.Builder builder2 = handler.newBuilder(); + Uuid topicId2 = Uuid.randomUuid(); + // Use the same data besides the topic ID. + FetchRequest.PartitionData partitionData = new FetchRequest.PartitionData(topicId2, 0, 100, 200, Optional.empty()); + builder2.add(tp, partitionData); + FetchSessionHandler.FetchRequestData data2 = builder2.build(); + // The old topic ID partition should be in toReplace, and the new one should be in toSend. + assertMapsEqual(reqMap(new ReqEntry("foo", topicId2, 0, 0, 100, 200)), + data2.toSend(), data2.sessionPartitions()); + assertEquals(Collections.singletonList(new TopicIdPartition(topicId1, tp)), data2.toReplace()); + // Should have the same session ID, and next epoch and can use topic IDs. + assertEquals(123, data2.metadata().sessionId(), "Did not use same session"); + assertEquals(1, data2.metadata().epoch(), "Did not have correct epoch"); + assertTrue(data2.canUseTopicIds()); + + short version = fetchRequestUsesIds ? ApiKeys.FETCH.latestVersion() : 12; + FetchRequest fetchRequest = FetchRequest.Builder + .forReplica(version, 0, 1, 1, data2.toSend()) + .removed(data2.toForget()) + .replaced(data2.toReplace()) + .metadata(data2.metadata()).build(version); + + assertEquals(fetchRequestUsesIds, fetchRequest.data().forgottenTopicsData().size() > 0); + assertEquals(1, fetchRequest.data().topics().size()); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testSessionEpochWhenMixedUsageOfTopicIDs(boolean startsWithTopicIds) { + Uuid fooId = startsWithTopicIds ? Uuid.randomUuid() : Uuid.ZERO_UUID; + Uuid barId = startsWithTopicIds ? Uuid.ZERO_UUID : Uuid.randomUuid(); + short responseVersion = startsWithTopicIds ? ApiKeys.FETCH.latestVersion() : 12; + + TopicPartition tp0 = new TopicPartition("foo", 0); + TopicPartition tp1 = new TopicPartition("bar", 1); + + FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 1); + FetchSessionHandler.Builder builder = handler.newBuilder(); + builder.add(tp0, + new FetchRequest.PartitionData(fooId, 0, 100, 200, Optional.empty())); + FetchSessionHandler.FetchRequestData data = builder.build(); + assertMapsEqual(reqMap(new ReqEntry("foo", fooId, 0, 0, 100, 200)), + data.toSend(), data.sessionPartitions()); + assertTrue(data.metadata().isFull()); + assertEquals(startsWithTopicIds, data.canUseTopicIds()); + + FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123, + respMap(new RespEntry("foo", 0, fooId, 10, 20))); + handler.handleResponse(resp, responseVersion); + + // Re-add the first partition. Then add a partition with opposite ID usage. + FetchSessionHandler.Builder builder2 = handler.newBuilder(); + builder2.add(tp0, + new FetchRequest.PartitionData(fooId, 10, 110, 210, Optional.empty())); + builder2.add(tp1, + new FetchRequest.PartitionData(barId, 0, 100, 200, Optional.empty())); + FetchSessionHandler.FetchRequestData data2 = builder2.build(); + // Should have the same session ID, and the next epoch and can not use topic IDs. + // The receiving broker will handle closing the session. + assertEquals(123, data2.metadata().sessionId(), "Did not use same session"); + assertEquals(1, data2.metadata().epoch(), "Did not have final epoch"); + assertFalse(data2.canUseTopicIds()); + } + + @ParameterizedTest @ValueSource(booleans = {true, false}) public void testIdUsageWithAllForgottenPartitions(boolean useTopicIds) { // We want to test when all topics are removed from the session Uuid topicId = useTopicIds ? Uuid.randomUuid() : Uuid.ZERO_UUID; - Short responseVersion = useTopicIds ? ApiKeys.FETCH.latestVersion() : 12; - Map<String, Uuid> topicIds = Collections.singletonMap("foo", topicId); + short responseVersion = useTopicIds ? ApiKeys.FETCH.latestVersion() : 12; FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 1); // Add topic foo to the session FetchSessionHandler.Builder builder = handler.newBuilder(); - builder.add(new TopicPartition("foo", 0), topicIds.get("foo"), - new FetchRequest.PartitionData(0, 100, 200, Optional.empty())); + builder.add(new TopicPartition("foo", 0), + new FetchRequest.PartitionData(topicId, 0, 100, 200, Optional.empty())); FetchSessionHandler.FetchRequestData data = builder.build(); - assertMapsEqual(reqMap(new ReqEntry("foo", 0, 0, 100, 200)), + assertMapsEqual(reqMap(new ReqEntry("foo", topicId, 0, 0, 100, 200)), data.toSend(), data.sessionPartitions()); assertTrue(data.metadata().isFull()); assertEquals(useTopicIds, data.canUseTopicIds()); FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123, - respMap(new RespEntry("foo", 0, 10, 20)), topicIds); - handler.handleResponse(resp, responseVersion.shortValue()); + respMap(new RespEntry("foo", 0, topicId, 10, 20))); + handler.handleResponse(resp, responseVersion); // Remove the topic from the session FetchSessionHandler.Builder builder2 = handler.newBuilder(); FetchSessionHandler.FetchRequestData data2 = builder2.build(); - // Should have the same session ID and next epoch, but can no longer use topic IDs. - // The receiving broker will close the session if we were previously using topic IDs. + // Should have the same session ID, next epoch, and same ID usage. assertEquals(123, data2.metadata().sessionId(), "Did not use same session when useTopicIds was " + useTopicIds); Review comment: Yeah, it would be good to assert what we expect in `data2` for completeness. ########## File path: clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java ########## @@ -4814,6 +4842,7 @@ private void buildDependencies(MetricConfig metricConfig, metadata = new ConsumerMetadata(0, metadataExpireMs, false, false, subscriptions, logContext, new ClusterResourceListeners()); client = new MockClient(time, metadata); + client.setNodeApiVersions(NodeApiVersions.create()); metrics = new Metrics(metricConfig, time); consumerClient = new ConsumerNetworkClient(logContext, client, metadata, time, 100, 1000, Integer.MAX_VALUE); Review comment: Yes, I was referring to those. Ack, I missed them during my first read. ########## File path: core/src/test/scala/unit/kafka/server/FetchSessionTest.scala ########## @@ -659,88 +670,108 @@ class FetchSessionTest { } @Test - def testIncrementalFetchSessionWithIdsWhenSessionDoesNotUseIds() : Unit = { + def testFetchSessionWithUnknownId(): Unit = { val time = new MockTime() val cache = new FetchSessionCache(10, 1000) val fetchManager = new FetchManager(time, cache) - val topicIds = new util.HashMap[String, Uuid]() - val topicNames = new util.HashMap[Uuid, String]() + val topicNames = Map(Uuid.randomUuid() -> "foo", Uuid.randomUuid() -> "bar").asJava + val topicIds = topicNames.asScala.map(_.swap).asJava + val foo0 = new TopicIdPartition(topicIds.getOrDefault("foo", Uuid.ZERO_UUID), new TopicPartition("foo", 0)) + val foo1 = new TopicIdPartition(topicIds.getOrDefault("foo", Uuid.ZERO_UUID), new TopicPartition("foo", 1)) + val emptyFoo0 = new TopicIdPartition(topicIds.getOrDefault("foo", Uuid.ZERO_UUID), new TopicPartition(null, 0)) + val emptyFoo1 = new TopicIdPartition(topicIds.getOrDefault("foo", Uuid.ZERO_UUID), new TopicPartition(null, 1)) - // Create a new fetch session with foo-0 + // Create a new fetch session with foo-0 and foo-1 val reqData1 = new util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData] - reqData1.put(new TopicPartition("foo", 0), new FetchRequest.PartitionData(0, 0, 100, + reqData1.put(foo0.topicPartition, new FetchRequest.PartitionData(foo0.topicId, 0, 0, 100, Optional.empty())) - val request1 = createRequestWithoutTopicIds(JFetchMetadata.INITIAL, reqData1, topicIds, EMPTY_PART_LIST, false) - // Start a fetch session using a request version that does not use topic IDs. + reqData1.put(foo1.topicPartition, new FetchRequest.PartitionData(foo1.topicId,10, 0, 100, + Optional.empty())) + val request1 = createRequest(JFetchMetadata.INITIAL, reqData1, EMPTY_PART_LIST, false) + // Simulate unknown topic ID for foo. + val topicNamesOnlyBar = Collections.singletonMap(topicIds.get("bar"), "bar") + // We should not throw error since we have an older request version. val context1 = fetchManager.newContext( request1.version, request1.metadata, request1.isFromFollower, - request1.fetchData(topicNames), - request1.forgottenTopics(topicNames), - topicIds + request1.fetchData(topicNamesOnlyBar), + request1.forgottenTopics(topicNamesOnlyBar), + topicNamesOnlyBar ) assertEquals(classOf[FullFetchContext], context1.getClass) - val respData1 = new util.LinkedHashMap[TopicPartition, FetchResponseData.PartitionData] - respData1.put(new TopicPartition("foo", 0), new FetchResponseData.PartitionData() + val respData1 = new util.LinkedHashMap[TopicIdPartition, FetchResponseData.PartitionData] Review comment: Yeah, I meant exactly that. How about using `assertPartitionsOrder` helper? The assertion would be more complete. ########## File path: core/src/test/scala/unit/kafka/server/FetchSessionTest.scala ########## @@ -659,88 +670,108 @@ class FetchSessionTest { } @Test - def testIncrementalFetchSessionWithIdsWhenSessionDoesNotUseIds() : Unit = { + def testFetchSessionWithUnknownId(): Unit = { val time = new MockTime() val cache = new FetchSessionCache(10, 1000) val fetchManager = new FetchManager(time, cache) - val topicIds = new util.HashMap[String, Uuid]() - val topicNames = new util.HashMap[Uuid, String]() + val topicNames = Map(Uuid.randomUuid() -> "foo", Uuid.randomUuid() -> "bar").asJava Review comment: You could use `assertPartitionsOrder` helper here as well. ########## File path: core/src/test/scala/unit/kafka/server/KafkaApisTest.scala ########## @@ -3530,37 +3534,37 @@ class KafkaApisTest { def testSizeOfThrottledPartitions(): Unit = { val topicNames = new util.HashMap[Uuid, String] val topicIds = new util.HashMap[String, Uuid]() - def fetchResponse(data: Map[TopicPartition, String]): FetchResponse = { - val responseData = new util.LinkedHashMap[TopicPartition, FetchResponseData.PartitionData]( + def fetchResponse(data: Map[TopicIdPartition, String]): FetchResponse = { + val responseData = new util.LinkedHashMap[TopicIdPartition, FetchResponseData.PartitionData]( data.map { case (tp, raw) => tp -> new FetchResponseData.PartitionData() - .setPartitionIndex(tp.partition) + .setPartitionIndex(tp.topicPartition.partition) .setHighWatermark(105) .setLastStableOffset(105) .setLogStartOffset(0) .setRecords(MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord(100, raw.getBytes(StandardCharsets.UTF_8)))) }.toMap.asJava) data.foreach{case (tp, _) => - val id = Uuid.randomUuid() - topicIds.put(tp.topic(), id) - topicNames.put(id, tp.topic()) + topicIds.put(tp.topicPartition.topic, tp.topicId) + topicNames.put(tp.topicId, tp.topicPartition.topic) } - FetchResponse.of(Errors.NONE, 100, 100, responseData, topicIds) + FetchResponse.of(Errors.NONE, 100, 100, responseData) } - val throttledPartition = new TopicPartition("throttledData", 0) + val throttledPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("throttledData", 0)) val throttledData = Map(throttledPartition -> "throttledData") val expectedSize = FetchResponse.sizeOf(FetchResponseData.HIGHEST_SUPPORTED_VERSION, - fetchResponse(throttledData).responseData(topicNames, FetchResponseData.HIGHEST_SUPPORTED_VERSION).entrySet.iterator, topicIds) + fetchResponse(throttledData).responseData(topicNames, FetchResponseData.HIGHEST_SUPPORTED_VERSION).entrySet.asScala.map( entry => + (new TopicIdPartition(Uuid.ZERO_UUID, entry.getKey), entry.getValue)).toMap.asJava.entrySet.iterator) - val response = fetchResponse(throttledData ++ Map(new TopicPartition("nonThrottledData", 0) -> "nonThrottledData")) + val response = fetchResponse(throttledData ++ Map(new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("nonThrottledData", 0)) -> "nonThrottledData")) val quota = Mockito.mock(classOf[ReplicationQuotaManager]) Mockito.when(quota.isThrottled(ArgumentMatchers.any(classOf[TopicPartition]))) - .thenAnswer(invocation => throttledPartition == invocation.getArgument(0).asInstanceOf[TopicPartition]) + .thenAnswer(invocation => throttledPartition.topicPartition == invocation.getArgument(0).asInstanceOf[TopicPartition]) - assertEquals(expectedSize, KafkaApis.sizeOfThrottledPartitions(FetchResponseData.HIGHEST_SUPPORTED_VERSION, response, quota, topicIds)) + assertEquals(expectedSize, KafkaApis.sizeOfThrottledPartitions(FetchResponseData.HIGHEST_SUPPORTED_VERSION, response, quota)) } @Test Review comment: That is right. ########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ########## @@ -262,11 +262,12 @@ public synchronized int sendFetches() { maxVersion = ApiKeys.FETCH.latestVersion(); } final FetchRequest.Builder request = FetchRequest.Builder - .forConsumer(maxVersion, this.maxWaitMs, this.minBytes, data.toSend(), data.topicIds()) + .forConsumer(maxVersion, this.maxWaitMs, this.minBytes, data.toSend()) .isolationLevel(isolationLevel) .setMaxBytes(this.maxBytes) .metadata(data.metadata()) - .toForget(data.toForget()) + .removed(data.toForget()) + .replaced(data.toReplace()) Review comment: Should we add or extend a test in `FetcherTest` to cover this change? I would like to have one which ensure that the request sent is populated correctly (especially the replaced part) by the fetcher based on the session handler. It seems that we don't have such test in the suite at the moment. ########## File path: clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java ########## @@ -199,26 +235,31 @@ public FetchRequest build(short version) { fetchRequestData.setMaxBytes(maxBytes); fetchRequestData.setIsolationLevel(isolationLevel.id()); fetchRequestData.setForgottenTopicsData(new ArrayList<>()); - toForget.stream() - .collect(Collectors.groupingBy(TopicPartition::topic, LinkedHashMap::new, Collectors.toList())) - .forEach((topic, partitions) -> - fetchRequestData.forgottenTopicsData().add(new FetchRequestData.ForgottenTopic() - .setTopic(topic) - .setTopicId(topicIds.getOrDefault(topic, Uuid.ZERO_UUID)) - .setPartitions(partitions.stream().map(TopicPartition::partition).collect(Collectors.toList()))) - ); - fetchRequestData.setTopics(new ArrayList<>()); + + Map<String, FetchRequestData.ForgottenTopic> forgottenTopicMap = new LinkedHashMap<>(); + addToForgottenTopicMap(removed, forgottenTopicMap); + + // If a version older than v13 is used, topic-partition which were replaced + // by a topic-partition with the same name but a different topic ID are not + // sent out in the "forget" set in order to not remove the newly added + // partition in the "fetch" set. + if (version >= 13) { + addToForgottenTopicMap(replaced, forgottenTopicMap); + } Review comment: Should we add a few unit tests to validate the changes that we have done in this class? We could add a few to FetchRequestTest (not use if it already exists though). ########## File path: clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java ########## @@ -314,8 +356,7 @@ public int maxBytes() { // For versions < 13, builds the partitionData map using only the FetchRequestData. // For versions 13+, builds the partitionData map using both the FetchRequestData and a mapping of topic IDs to names. - // Throws UnknownTopicIdException for versions 13+ if the topic ID was unknown to the server. - public Map<TopicPartition, PartitionData> fetchData(Map<Uuid, String> topicNames) throws UnknownTopicIdException { + public Map<TopicIdPartition, PartitionData> fetchData(Map<Uuid, String> topicNames) throws UnknownTopicIdException { Review comment: Do we have a unit test for this one and for `forgottenTopics`? ########## File path: clients/src/test/java/org/apache/kafka/clients/FetchSessionHandlerTest.java ########## @@ -242,65 +244,68 @@ public void testIncrementals() { FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 1); FetchSessionHandler.Builder builder = handler.newBuilder(); addTopicId(topicIds, topicNames, "foo", version); - builder.add(new TopicPartition("foo", 0), topicIds.getOrDefault("foo", Uuid.ZERO_UUID), - new FetchRequest.PartitionData(0, 100, 200, Optional.empty())); - builder.add(new TopicPartition("foo", 1), topicIds.getOrDefault("foo", Uuid.ZERO_UUID), - new FetchRequest.PartitionData(10, 110, 210, Optional.empty())); + Uuid fooId = topicIds.getOrDefault("foo", Uuid.ZERO_UUID); + TopicPartition foo0 = new TopicPartition("foo", 0); + TopicPartition foo1 = new TopicPartition("foo", 1); + builder.add(foo0, new FetchRequest.PartitionData(fooId, 0, 100, 200, Optional.empty())); + builder.add(foo1, new FetchRequest.PartitionData(fooId, 10, 110, 210, Optional.empty())); FetchSessionHandler.FetchRequestData data = builder.build(); - assertMapsEqual(reqMap(new ReqEntry("foo", 0, 0, 100, 200), - new ReqEntry("foo", 1, 10, 110, 210)), + assertMapsEqual(reqMap(new ReqEntry("foo", fooId, 0, 0, 100, 200), + new ReqEntry("foo", fooId, 1, 10, 110, 210)), data.toSend(), data.sessionPartitions()); assertEquals(INVALID_SESSION_ID, data.metadata().sessionId()); assertEquals(INITIAL_EPOCH, data.metadata().epoch()); FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123, - respMap(new RespEntry("foo", 0, 10, 20), - new RespEntry("foo", 1, 10, 20)), topicIds); + respMap(new RespEntry("foo", 0, fooId, 10, 20), + new RespEntry("foo", 1, fooId, 10, 20))); handler.handleResponse(resp, version); // Test an incremental fetch request which adds one partition and modifies another. FetchSessionHandler.Builder builder2 = handler.newBuilder(); addTopicId(topicIds, topicNames, "bar", version); - builder2.add(new TopicPartition("foo", 0), topicIds.getOrDefault("foo", Uuid.ZERO_UUID), - new FetchRequest.PartitionData(0, 100, 200, Optional.empty())); - builder2.add(new TopicPartition("foo", 1), topicIds.getOrDefault("foo", Uuid.ZERO_UUID), - new FetchRequest.PartitionData(10, 120, 210, Optional.empty())); - builder2.add(new TopicPartition("bar", 0), topicIds.getOrDefault("bar", Uuid.ZERO_UUID), - new FetchRequest.PartitionData(20, 200, 200, Optional.empty())); + Uuid barId = topicIds.getOrDefault("bar", Uuid.ZERO_UUID); + TopicPartition bar0 = new TopicPartition("bar", 0); + builder2.add(foo0, + new FetchRequest.PartitionData(fooId, 0, 100, 200, Optional.empty())); Review comment: There are a few more cases where we could put the partition data back on the previous line in this file. ########## File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala ########## @@ -374,7 +374,7 @@ abstract class AbstractFetcherThread(name: String, } } } catch { - case ime@( _: CorruptRecordException | _: InvalidRecordException) => + case ime@(_: CorruptRecordException | _: InvalidRecordException) => Review comment: I think that would for instance happen when the controller fails over to an older IBP during an upgrade. This should remove the topic ids which means that v12 will be used for the next fetch request and trigger a FETCH_SESSION_TOPIC_ID_ERROR. In this particular case, re-trying directly would be the optimal way to proceed for a follower. I wonder if they are other cases to consider here. For the consumer, it is definitely different. ########## File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala ########## @@ -374,7 +374,7 @@ abstract class AbstractFetcherThread(name: String, } } } catch { - case ime@( _: CorruptRecordException | _: InvalidRecordException) => + case ime@(_: CorruptRecordException | _: InvalidRecordException) => Review comment: Sorry, I wanted to say happen. ########## File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala ########## @@ -374,7 +374,7 @@ abstract class AbstractFetcherThread(name: String, } } } catch { - case ime@( _: CorruptRecordException | _: InvalidRecordException) => + case ime@(_: CorruptRecordException | _: InvalidRecordException) => Review comment: Anyway, we don't need to address this in this PR. I just wanted to point out that there is an opportunity for an improvement. ########## File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala ########## @@ -413,8 +413,20 @@ abstract class AbstractFetcherThread(name: String, case Errors.UNKNOWN_TOPIC_OR_PARTITION => warn(s"Received ${Errors.UNKNOWN_TOPIC_OR_PARTITION} from the leader for partition $topicPartition. " + - "This error may be returned transiently when the partition is being created or deleted, but it is not " + - "expected to persist.") + "This error may be returned transiently when the partition is being created or deleted, but it is not " + + "expected to persist.") + partitionsWithError += topicPartition + + case Errors.UNKNOWN_TOPIC_ID => + warn(s"Received ${Errors.UNKNOWN_TOPIC_ID} from the leader for partition $topicPartition. " + + "This error may be returned transiently when the partition is being created or deleted, but it is not " + + "expected to persist.") + partitionsWithError += topicPartition + + case Errors.INCONSISTENT_TOPIC_ID => + warn(s"Received ${Errors.INCONSISTENT_TOPIC_ID} from the leader for partition $topicPartition. " + + "This error may be returned transiently when the partition is being created or deleted, but it is not " + + "expected to persist.") Review comment: Do we have unit tests covering those cases? There are almost no changes in `AbstractFetcherThreadTest` so it seems that we don't. Are they somewhere else perhaps? ########## File path: core/src/main/scala/kafka/server/FetchSession.scala ########## @@ -163,18 +178,37 @@ class CachedPartition(val topic: String, mustRespond } - override def hashCode: Int = Objects.hash(new TopicPartition(topic, partition), topicId) + /** + * We have different equality checks depending on whether topic IDs are used. + * This means we need a different hash function as well. We use name to calculate the hash if the ID is zero and unused. + * Otherwise, we use the topic ID in the hash calculation. + * + * @return the hash code for the CachedPartition depending on what request version we are using. + */ + override def hashCode: Int = + if (topicId != Uuid.ZERO_UUID) + (31 * partition) + topicId.hashCode + else + (31 * partition) + topic.hashCode def canEqual(that: Any): Boolean = that.isInstanceOf[CachedPartition] Review comment: I guess that we could remove it now. ########## File path: core/src/main/scala/kafka/server/FetchSession.scala ########## @@ -93,27 +93,42 @@ class CachedPartition(val topic: String, def this(topic: String, partition: Int, topicId: Uuid) = this(topic, topicId, partition, -1, -1, -1, Optional.empty(), -1, -1, Optional.empty[Integer]) - def this(part: TopicPartition, topicId: Uuid) = - this(part.topic, part.partition, topicId) + def this(part: TopicIdPartition) = { + this(part.topic, part.partition, part.topicId) + } - def this(part: TopicPartition, id: Uuid, reqData: FetchRequest.PartitionData) = - this(part.topic, id, part.partition, reqData.maxBytes, reqData.fetchOffset, -1, + def this(part: TopicIdPartition, reqData: FetchRequest.PartitionData) = + this(part.topic, part.topicId, part.partition, reqData.maxBytes, reqData.fetchOffset, -1, reqData.currentLeaderEpoch, reqData.logStartOffset, -1, reqData.lastFetchedEpoch) - def this(part: TopicPartition, id: Uuid, reqData: FetchRequest.PartitionData, + def this(part: TopicIdPartition, reqData: FetchRequest.PartitionData, respData: FetchResponseData.PartitionData) = - this(part.topic, id, part.partition, reqData.maxBytes, reqData.fetchOffset, respData.highWatermark, + this(part.topic, part.topicId, part.partition, reqData.maxBytes, reqData.fetchOffset, respData.highWatermark, reqData.currentLeaderEpoch, reqData.logStartOffset, respData.logStartOffset, reqData.lastFetchedEpoch) - def reqData = new FetchRequest.PartitionData(fetchOffset, fetcherLogStartOffset, maxBytes, leaderEpoch, lastFetchedEpoch) + def reqData = new FetchRequest.PartitionData(topicId, fetchOffset, fetcherLogStartOffset, maxBytes, leaderEpoch, lastFetchedEpoch) - def updateRequestParams(reqData: FetchRequest.PartitionData): Unit = { + def maybeUpdateRequestParamsOrName(reqData: FetchRequest.PartitionData, name: String): Unit = { // Update our cached request parameters. maxBytes = reqData.maxBytes fetchOffset = reqData.fetchOffset fetcherLogStartOffset = reqData.logStartOffset leaderEpoch = reqData.currentLeaderEpoch lastFetchedEpoch = reqData.lastFetchedEpoch + // Update name if needed + maybeSetUnknownName(name) + } + + def maybeSetUnknownName(name: String): Unit = { + if (this.topic == null) { + this.topic = name + } + } + + def maybeResolveUnknownName(topicNames: FetchSession.TOPIC_NAME_MAP): Unit = { Review comment: nit: Should we use the same name for both `maybeSetUnknownName` and `maybeResolveUnknownName`? I guess that you could differ by their argument. ########## File path: core/src/main/scala/kafka/server/FetchSession.scala ########## @@ -93,27 +93,42 @@ class CachedPartition(val topic: String, def this(topic: String, partition: Int, topicId: Uuid) = this(topic, topicId, partition, -1, -1, -1, Optional.empty(), -1, -1, Optional.empty[Integer]) - def this(part: TopicPartition, topicId: Uuid) = - this(part.topic, part.partition, topicId) + def this(part: TopicIdPartition) = { + this(part.topic, part.partition, part.topicId) + } - def this(part: TopicPartition, id: Uuid, reqData: FetchRequest.PartitionData) = - this(part.topic, id, part.partition, reqData.maxBytes, reqData.fetchOffset, -1, + def this(part: TopicIdPartition, reqData: FetchRequest.PartitionData) = + this(part.topic, part.topicId, part.partition, reqData.maxBytes, reqData.fetchOffset, -1, reqData.currentLeaderEpoch, reqData.logStartOffset, -1, reqData.lastFetchedEpoch) - def this(part: TopicPartition, id: Uuid, reqData: FetchRequest.PartitionData, + def this(part: TopicIdPartition, reqData: FetchRequest.PartitionData, respData: FetchResponseData.PartitionData) = - this(part.topic, id, part.partition, reqData.maxBytes, reqData.fetchOffset, respData.highWatermark, + this(part.topic, part.topicId, part.partition, reqData.maxBytes, reqData.fetchOffset, respData.highWatermark, reqData.currentLeaderEpoch, reqData.logStartOffset, respData.logStartOffset, reqData.lastFetchedEpoch) - def reqData = new FetchRequest.PartitionData(fetchOffset, fetcherLogStartOffset, maxBytes, leaderEpoch, lastFetchedEpoch) + def reqData = new FetchRequest.PartitionData(topicId, fetchOffset, fetcherLogStartOffset, maxBytes, leaderEpoch, lastFetchedEpoch) - def updateRequestParams(reqData: FetchRequest.PartitionData): Unit = { + def maybeUpdateRequestParamsOrName(reqData: FetchRequest.PartitionData, name: String): Unit = { // Update our cached request parameters. maxBytes = reqData.maxBytes fetchOffset = reqData.fetchOffset fetcherLogStartOffset = reqData.logStartOffset leaderEpoch = reqData.currentLeaderEpoch lastFetchedEpoch = reqData.lastFetchedEpoch + // Update name if needed + maybeSetUnknownName(name) + } + + def maybeSetUnknownName(name: String): Unit = { + if (this.topic == null) { + this.topic = name + } + } + + def maybeResolveUnknownName(topicNames: FetchSession.TOPIC_NAME_MAP): Unit = { Review comment: Should we use the same name for both `maybeSetUnknownName` and `maybeResolveUnknownName`? I guess that you could differ by their argument. If we add unit tests for other methods of this class, should we cover them as well? ########## File path: core/src/main/scala/kafka/server/FetchSession.scala ########## @@ -93,27 +93,42 @@ class CachedPartition(val topic: String, def this(topic: String, partition: Int, topicId: Uuid) = this(topic, topicId, partition, -1, -1, -1, Optional.empty(), -1, -1, Optional.empty[Integer]) - def this(part: TopicPartition, topicId: Uuid) = - this(part.topic, part.partition, topicId) + def this(part: TopicIdPartition) = { + this(part.topic, part.partition, part.topicId) + } - def this(part: TopicPartition, id: Uuid, reqData: FetchRequest.PartitionData) = - this(part.topic, id, part.partition, reqData.maxBytes, reqData.fetchOffset, -1, + def this(part: TopicIdPartition, reqData: FetchRequest.PartitionData) = + this(part.topic, part.topicId, part.partition, reqData.maxBytes, reqData.fetchOffset, -1, reqData.currentLeaderEpoch, reqData.logStartOffset, -1, reqData.lastFetchedEpoch) - def this(part: TopicPartition, id: Uuid, reqData: FetchRequest.PartitionData, + def this(part: TopicIdPartition, reqData: FetchRequest.PartitionData, respData: FetchResponseData.PartitionData) = - this(part.topic, id, part.partition, reqData.maxBytes, reqData.fetchOffset, respData.highWatermark, + this(part.topic, part.topicId, part.partition, reqData.maxBytes, reqData.fetchOffset, respData.highWatermark, reqData.currentLeaderEpoch, reqData.logStartOffset, respData.logStartOffset, reqData.lastFetchedEpoch) - def reqData = new FetchRequest.PartitionData(fetchOffset, fetcherLogStartOffset, maxBytes, leaderEpoch, lastFetchedEpoch) + def reqData = new FetchRequest.PartitionData(topicId, fetchOffset, fetcherLogStartOffset, maxBytes, leaderEpoch, lastFetchedEpoch) - def updateRequestParams(reqData: FetchRequest.PartitionData): Unit = { + def maybeUpdateRequestParamsOrName(reqData: FetchRequest.PartitionData, name: String): Unit = { // Update our cached request parameters. maxBytes = reqData.maxBytes fetchOffset = reqData.fetchOffset fetcherLogStartOffset = reqData.logStartOffset leaderEpoch = reqData.currentLeaderEpoch lastFetchedEpoch = reqData.lastFetchedEpoch + // Update name if needed + maybeSetUnknownName(name) + } + + def maybeSetUnknownName(name: String): Unit = { + if (this.topic == null) { + this.topic = name + } + } + + def maybeResolveUnknownName(topicNames: FetchSession.TOPIC_NAME_MAP): Unit = { Review comment: Should we use the same name for both `maybeSetUnknownName` and `maybeResolveUnknownName`? I guess that you could differ by their argument. If we add unit tests for other methods of this class, should we cover all the methods that we have changed or added as well? ########## File path: core/src/main/scala/kafka/server/ReplicaFetcherThread.scala ########## @@ -305,9 +304,10 @@ class ReplicaFetcherThread(name: String, } else { val version: Short = if (fetchRequestVersion >= 13 && !fetchData.canUseTopicIds) 12 else fetchRequestVersion val requestBuilder = FetchRequest.Builder - .forReplica(version, replicaId, maxWait, minBytes, fetchData.toSend, fetchData.topicIds) + .forReplica(version, replicaId, maxWait, minBytes, fetchData.toSend) .setMaxBytes(maxBytes) - .toForget(fetchData.toForget) + .removed(fetchData.toForget) + .replaced(fetchData.toReplace) Review comment: Do we have tests verifying this change? ########## File path: clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java ########## @@ -285,52 +268,57 @@ public FetchRequestData build() { if (nextMetadata.isFull()) { if (log.isDebugEnabled()) { log.debug("Built full fetch {} for node {} with {}.", - nextMetadata, node, partitionsToLogString(next.keySet())); + nextMetadata, node, topicPartitionsToLogString(next.keySet())); } sessionPartitions = next; next = null; // Only add topic IDs to the session if we are using topic IDs. if (canUseTopicIds) { - sessionTopicIds = topicIds; - sessionTopicNames = new HashMap<>(topicIds.size()); - topicIds.forEach((name, id) -> sessionTopicNames.put(id, name)); + Map<Uuid, Set<String>> newTopicNames = sessionPartitions.entrySet().stream().collect(Collectors.groupingByConcurrent(entry -> entry.getValue().topicId, + Collectors.mapping(entry -> entry.getKey().topic(), Collectors.toSet()))); + + sessionTopicNames = new HashMap<>(newTopicNames.size()); + // There should only be one topic name per topic ID. + newTopicNames.forEach((topicId, topicNamesSet) -> topicNamesSet.forEach(topicName -> sessionTopicNames.put(topicId, topicName))); } else { - sessionTopicIds = new HashMap<>(); - sessionTopicNames = new HashMap<>(); + sessionTopicNames = Collections.emptyMap(); } - topicIds = null; Map<TopicPartition, PartitionData> toSend = - Collections.unmodifiableMap(new LinkedHashMap<>(sessionPartitions)); - Map<String, Uuid> toSendTopicIds = - Collections.unmodifiableMap(new HashMap<>(sessionTopicIds)); - Map<Uuid, String> toSendTopicNames = - Collections.unmodifiableMap(new HashMap<>(sessionTopicNames)); - return new FetchRequestData(toSend, Collections.emptyList(), toSend, toSendTopicIds, toSendTopicNames, nextMetadata, canUseTopicIds); + Collections.unmodifiableMap(new LinkedHashMap<>(sessionPartitions)); + return new FetchRequestData(toSend, Collections.emptyList(), Collections.emptyList(), toSend, nextMetadata, canUseTopicIds); } - List<TopicPartition> added = new ArrayList<>(); - List<TopicPartition> removed = new ArrayList<>(); - List<TopicPartition> altered = new ArrayList<>(); + List<TopicIdPartition> added = new ArrayList<>(); + List<TopicIdPartition> removed = new ArrayList<>(); + List<TopicIdPartition> altered = new ArrayList<>(); + List<TopicIdPartition> replaced = new ArrayList<>(); for (Iterator<Entry<TopicPartition, PartitionData>> iter = - sessionPartitions.entrySet().iterator(); iter.hasNext(); ) { + sessionPartitions.entrySet().iterator(); iter.hasNext(); ) { Entry<TopicPartition, PartitionData> entry = iter.next(); TopicPartition topicPartition = entry.getKey(); PartitionData prevData = entry.getValue(); PartitionData nextData = next.remove(topicPartition); if (nextData != null) { - if (!prevData.equals(nextData)) { + // We basically check if the new partition had the same topic ID. If not, + // we add it to the "replaced" set. + if (!prevData.topicId.equals(nextData.topicId) && !prevData.topicId.equals(Uuid.ZERO_UUID)) { + // Re-add the replaced partition to the end of 'next' + next.put(topicPartition, nextData); + entry.setValue(nextData); + replaced.add(new TopicIdPartition(prevData.topicId, topicPartition)); + } else if (!prevData.equals(nextData)) { // Re-add the altered partition to the end of 'next' next.put(topicPartition, nextData); entry.setValue(nextData); - altered.add(topicPartition); + altered.add(new TopicIdPartition(nextData.topicId, topicPartition)); } } else { // Remove this partition from the session. iter.remove(); // Indicate that we no longer want to listen to this partition. - removed.add(topicPartition); + removed.add(new TopicIdPartition(prevData.topicId, topicPartition)); // If we do not have this topic ID in the builder or the session, we can not use topic IDs. - if (canUseTopicIds && !topicIds.containsKey(topicPartition.topic()) && !sessionTopicIds.containsKey(topicPartition.topic())) + if (canUseTopicIds && prevData.topicId == Uuid.ZERO_UUID) Review comment: Should we use `equals` instead of `==`? We use `equals` at L304 btw. ########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ########## @@ -262,11 +262,12 @@ public synchronized int sendFetches() { maxVersion = ApiKeys.FETCH.latestVersion(); } final FetchRequest.Builder request = FetchRequest.Builder - .forConsumer(maxVersion, this.maxWaitMs, this.minBytes, data.toSend(), data.topicIds()) + .forConsumer(maxVersion, this.maxWaitMs, this.minBytes, data.toSend()) .isolationLevel(isolationLevel) .setMaxBytes(this.maxBytes) .metadata(data.metadata()) - .toForget(data.toForget()) + .removed(data.toForget()) + .replaced(data.toReplace()) Review comment: We should have a test in the Fetcher which ensure that the builder received the correct information. Then we could have one for the request which ensure that the builder does its job correctly as well. ########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ########## @@ -262,11 +262,12 @@ public synchronized int sendFetches() { maxVersion = ApiKeys.FETCH.latestVersion(); } final FetchRequest.Builder request = FetchRequest.Builder - .forConsumer(maxVersion, this.maxWaitMs, this.minBytes, data.toSend(), data.topicIds()) + .forConsumer(maxVersion, this.maxWaitMs, this.minBytes, data.toSend()) .isolationLevel(isolationLevel) .setMaxBytes(this.maxBytes) .metadata(data.metadata()) - .toForget(data.toForget()) + .removed(data.toForget()) + .replaced(data.toReplace()) Review comment: Right. You might have to assert on the request in the fetcher as well. As you said, we can't really get the data out from the builder otherwise. ########## File path: core/src/main/scala/kafka/server/FetchSession.scala ########## @@ -93,27 +93,42 @@ class CachedPartition(val topic: String, def this(topic: String, partition: Int, topicId: Uuid) = this(topic, topicId, partition, -1, -1, -1, Optional.empty(), -1, -1, Optional.empty[Integer]) - def this(part: TopicPartition, topicId: Uuid) = - this(part.topic, part.partition, topicId) + def this(part: TopicIdPartition) = { + this(part.topic, part.partition, part.topicId) + } - def this(part: TopicPartition, id: Uuid, reqData: FetchRequest.PartitionData) = - this(part.topic, id, part.partition, reqData.maxBytes, reqData.fetchOffset, -1, + def this(part: TopicIdPartition, reqData: FetchRequest.PartitionData) = + this(part.topic, part.topicId, part.partition, reqData.maxBytes, reqData.fetchOffset, -1, reqData.currentLeaderEpoch, reqData.logStartOffset, -1, reqData.lastFetchedEpoch) - def this(part: TopicPartition, id: Uuid, reqData: FetchRequest.PartitionData, + def this(part: TopicIdPartition, reqData: FetchRequest.PartitionData, respData: FetchResponseData.PartitionData) = - this(part.topic, id, part.partition, reqData.maxBytes, reqData.fetchOffset, respData.highWatermark, + this(part.topic, part.topicId, part.partition, reqData.maxBytes, reqData.fetchOffset, respData.highWatermark, reqData.currentLeaderEpoch, reqData.logStartOffset, respData.logStartOffset, reqData.lastFetchedEpoch) - def reqData = new FetchRequest.PartitionData(fetchOffset, fetcherLogStartOffset, maxBytes, leaderEpoch, lastFetchedEpoch) + def reqData = new FetchRequest.PartitionData(topicId, fetchOffset, fetcherLogStartOffset, maxBytes, leaderEpoch, lastFetchedEpoch) - def updateRequestParams(reqData: FetchRequest.PartitionData): Unit = { + def maybeUpdateRequestParamsOrName(reqData: FetchRequest.PartitionData, name: String): Unit = { // Update our cached request parameters. maxBytes = reqData.maxBytes fetchOffset = reqData.fetchOffset fetcherLogStartOffset = reqData.logStartOffset leaderEpoch = reqData.currentLeaderEpoch lastFetchedEpoch = reqData.lastFetchedEpoch + // Update name if needed + maybeSetUnknownName(name) + } + + def maybeSetUnknownName(name: String): Unit = { + if (this.topic == null) { + this.topic = name + } + } + + def maybeResolveUnknownName(topicNames: FetchSession.TOPIC_NAME_MAP): Unit = { Review comment: Yeah, I agree with you. Perhaps, we could just remove the maybeSetTopicName and move its logic into the update request params method. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org