dajac commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r745869793



##########
File path: 
clients/src/test/java/org/apache/kafka/clients/FetchSessionHandlerTest.java
##########
@@ -428,82 +432,182 @@ public void testIdUsageRevokedOnIdDowngrade() {
         List<Integer> partitions = Arrays.asList(0, 1);
         partitions.forEach(partition -> {
             String testType = partition == 0 ? "updating a partition" : 
"adding a new partition";
-            Map<String, Uuid> topicIds = Collections.singletonMap("foo", 
Uuid.randomUuid());
+            Uuid fooId = Uuid.randomUuid();
             FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 
1);
             FetchSessionHandler.Builder builder = handler.newBuilder();
-            builder.add(new TopicPartition("foo", 0),  topicIds.get("foo"),
-                    new FetchRequest.PartitionData(0, 100, 200, 
Optional.empty()));
+            builder.add(new TopicPartition("foo", 0),
+                    new FetchRequest.PartitionData(fooId, 0, 100, 200, 
Optional.empty()));
             FetchSessionHandler.FetchRequestData data = builder.build();
-            assertMapsEqual(reqMap(new ReqEntry("foo", 0, 0, 100, 200)),
+            assertMapsEqual(reqMap(new ReqEntry("foo", fooId, 0, 0, 100, 200)),
                     data.toSend(), data.sessionPartitions());
             assertTrue(data.metadata().isFull());
             assertTrue(data.canUseTopicIds());
 
             FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123,
-                    respMap(new RespEntry("foo", 0, 10, 20)), topicIds);
+                    respMap(new RespEntry("foo", 0, fooId, 10, 20)));
             handler.handleResponse(resp, ApiKeys.FETCH.latestVersion());
 
             // Try to remove a topic ID from an existing topic partition (0) 
or add a new topic partition (1) without an ID.
             FetchSessionHandler.Builder builder2 = handler.newBuilder();
-            builder2.add(new TopicPartition("foo", partition), Uuid.ZERO_UUID,
-                    new FetchRequest.PartitionData(10, 110, 210, 
Optional.empty()));
+            builder2.add(new TopicPartition("foo", partition),
+                    new FetchRequest.PartitionData(Uuid.ZERO_UUID, 10, 110, 
210, Optional.empty()));
             FetchSessionHandler.FetchRequestData data2 = builder2.build();
-            // Should have the same session ID and next epoch, but can no 
longer use topic IDs.
-            // The receiving broker will close the session if we were 
previously using topic IDs.
+            // Should have the same session ID, and next epoch and can no 
longer use topic IDs.
+            // The receiving broker will handle closing the session.
             assertEquals(123, data2.metadata().sessionId(), "Did not use same 
session when " + testType);
             assertEquals(1, data2.metadata().epoch(), "Did not have correct 
epoch when " + testType);
             assertFalse(data2.canUseTopicIds());
         });
     }
 
+    private static Stream<Arguments> idUsageCombinations() {
+        return Stream.of(
+                Arguments.of(true, true),
+                Arguments.of(true, false),
+                Arguments.of(false, true),
+                Arguments.of(false, false)
+        );
+    }
+
+    @ParameterizedTest
+    @MethodSource("idUsageCombinations")
+    public void testTopicIdReplaced(boolean startsWithTopicIds, boolean 
endsWithTopicIds) {
+        TopicPartition tp = new TopicPartition("foo", 0);
+        FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 1);
+        FetchSessionHandler.Builder builder = handler.newBuilder();
+        Uuid topicId1 = startsWithTopicIds ? Uuid.randomUuid() : 
Uuid.ZERO_UUID;
+        builder.add(tp, new FetchRequest.PartitionData(topicId1, 0, 100, 200, 
Optional.empty()));
+        FetchSessionHandler.FetchRequestData data = builder.build();
+        assertMapsEqual(reqMap(new ReqEntry("foo", topicId1, 0, 0, 100, 200)),
+                data.toSend(), data.sessionPartitions());
+        assertTrue(data.metadata().isFull());
+        assertEquals(startsWithTopicIds, data.canUseTopicIds());
+
+        FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123, respMap(new 
RespEntry("foo", 0, topicId1, 10, 20)));
+        short version = startsWithTopicIds ? ApiKeys.FETCH.latestVersion() : 
12;
+        handler.handleResponse(resp, version);
+
+        // Try to add a new topic ID.
+        FetchSessionHandler.Builder builder2 = handler.newBuilder();
+        Uuid topicId2 = endsWithTopicIds ? Uuid.randomUuid() : Uuid.ZERO_UUID;
+        // Use the same data besides the topic ID.
+        FetchRequest.PartitionData partitionData = new 
FetchRequest.PartitionData(topicId2, 0, 100, 200, Optional.empty());
+        builder2.add(tp, partitionData);
+        FetchSessionHandler.FetchRequestData data2 = builder2.build();
+        if (startsWithTopicIds && endsWithTopicIds) {
+            // If we started with an ID, both a only a new ID will count 
towards replaced.
+            // The old topic ID partition should be in toReplace, and the new 
one should be in toSend.
+            assertEquals(Collections.singletonList(new 
TopicIdPartition(topicId1, tp)), data2.toReplace());
+            assertMapsEqual(reqMap(new ReqEntry("foo", topicId2, 0, 0, 100, 
200)),
+                    data2.toSend(), data2.sessionPartitions());
+            assertTrue(handler.sessionTopicNames().containsKey(topicId2));

Review comment:
       Yes, I was referring to `sessionTopicNames`.

##########
File path: 
clients/src/test/java/org/apache/kafka/clients/FetchSessionHandlerTest.java
##########
@@ -428,82 +432,182 @@ public void testIdUsageRevokedOnIdDowngrade() {
         List<Integer> partitions = Arrays.asList(0, 1);
         partitions.forEach(partition -> {
             String testType = partition == 0 ? "updating a partition" : 
"adding a new partition";
-            Map<String, Uuid> topicIds = Collections.singletonMap("foo", 
Uuid.randomUuid());
+            Uuid fooId = Uuid.randomUuid();
             FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 
1);
             FetchSessionHandler.Builder builder = handler.newBuilder();
-            builder.add(new TopicPartition("foo", 0),  topicIds.get("foo"),
-                    new FetchRequest.PartitionData(0, 100, 200, 
Optional.empty()));
+            builder.add(new TopicPartition("foo", 0),
+                    new FetchRequest.PartitionData(fooId, 0, 100, 200, 
Optional.empty()));
             FetchSessionHandler.FetchRequestData data = builder.build();
-            assertMapsEqual(reqMap(new ReqEntry("foo", 0, 0, 100, 200)),
+            assertMapsEqual(reqMap(new ReqEntry("foo", fooId, 0, 0, 100, 200)),
                     data.toSend(), data.sessionPartitions());
             assertTrue(data.metadata().isFull());
             assertTrue(data.canUseTopicIds());
 
             FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123,
-                    respMap(new RespEntry("foo", 0, 10, 20)), topicIds);
+                    respMap(new RespEntry("foo", 0, fooId, 10, 20)));
             handler.handleResponse(resp, ApiKeys.FETCH.latestVersion());
 
             // Try to remove a topic ID from an existing topic partition (0) 
or add a new topic partition (1) without an ID.
             FetchSessionHandler.Builder builder2 = handler.newBuilder();
-            builder2.add(new TopicPartition("foo", partition), Uuid.ZERO_UUID,
-                    new FetchRequest.PartitionData(10, 110, 210, 
Optional.empty()));
+            builder2.add(new TopicPartition("foo", partition),
+                    new FetchRequest.PartitionData(Uuid.ZERO_UUID, 10, 110, 
210, Optional.empty()));
             FetchSessionHandler.FetchRequestData data2 = builder2.build();
-            // Should have the same session ID and next epoch, but can no 
longer use topic IDs.
-            // The receiving broker will close the session if we were 
previously using topic IDs.
+            // Should have the same session ID, and next epoch and can no 
longer use topic IDs.
+            // The receiving broker will handle closing the session.
             assertEquals(123, data2.metadata().sessionId(), "Did not use same 
session when " + testType);
             assertEquals(1, data2.metadata().epoch(), "Did not have correct 
epoch when " + testType);
             assertFalse(data2.canUseTopicIds());
         });
     }
 
+    private static Stream<Arguments> idUsageCombinations() {
+        return Stream.of(
+                Arguments.of(true, true),
+                Arguments.of(true, false),
+                Arguments.of(false, true),
+                Arguments.of(false, false)
+        );
+    }
+
+    @ParameterizedTest
+    @MethodSource("idUsageCombinations")
+    public void testTopicIdReplaced(boolean startsWithTopicIds, boolean 
endsWithTopicIds) {
+        TopicPartition tp = new TopicPartition("foo", 0);
+        FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 1);
+        FetchSessionHandler.Builder builder = handler.newBuilder();
+        Uuid topicId1 = startsWithTopicIds ? Uuid.randomUuid() : 
Uuid.ZERO_UUID;
+        builder.add(tp, new FetchRequest.PartitionData(topicId1, 0, 100, 200, 
Optional.empty()));
+        FetchSessionHandler.FetchRequestData data = builder.build();
+        assertMapsEqual(reqMap(new ReqEntry("foo", topicId1, 0, 0, 100, 200)),
+                data.toSend(), data.sessionPartitions());
+        assertTrue(data.metadata().isFull());
+        assertEquals(startsWithTopicIds, data.canUseTopicIds());
+
+        FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123, respMap(new 
RespEntry("foo", 0, topicId1, 10, 20)));
+        short version = startsWithTopicIds ? ApiKeys.FETCH.latestVersion() : 
12;
+        handler.handleResponse(resp, version);
+
+        // Try to add a new topic ID.
+        FetchSessionHandler.Builder builder2 = handler.newBuilder();
+        Uuid topicId2 = endsWithTopicIds ? Uuid.randomUuid() : Uuid.ZERO_UUID;
+        // Use the same data besides the topic ID.
+        FetchRequest.PartitionData partitionData = new 
FetchRequest.PartitionData(topicId2, 0, 100, 200, Optional.empty());
+        builder2.add(tp, partitionData);
+        FetchSessionHandler.FetchRequestData data2 = builder2.build();
+        if (startsWithTopicIds && endsWithTopicIds) {
+            // If we started with an ID, both a only a new ID will count 
towards replaced.
+            // The old topic ID partition should be in toReplace, and the new 
one should be in toSend.
+            assertEquals(Collections.singletonList(new 
TopicIdPartition(topicId1, tp)), data2.toReplace());
+            assertMapsEqual(reqMap(new ReqEntry("foo", topicId2, 0, 0, 100, 
200)),
+                    data2.toSend(), data2.sessionPartitions());
+            assertTrue(handler.sessionTopicNames().containsKey(topicId2));
+        } else if (startsWithTopicIds || endsWithTopicIds) {
+            // If we downgraded to not using topic IDs we will want to send 
this data.
+            // However, we will not mark the partition as one replaced. In 
this scenario, we should see the session close due to
+            // changing request types.
+            // We will have the new topic ID in the session partition map
+            assertEquals(0, data2.toReplace().size());
+            assertEquals(1, data2.toSend().size());

Review comment:
       Right.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to