AndrewJSchofield commented on code in PR #19778:
URL: https://github.com/apache/kafka/pull/19778#discussion_r2100757284


##########
core/src/test/java/kafka/server/share/SharePartitionManagerTest.java:
##########
@@ -372,6 +459,110 @@ public void testNewContext() {
         assertEquals(0, cache.size());
     }
 
+    @Test
+    public void testAcknowledgeSessionUpdateThrowsOnInitialEpoch() {
+        ShareSessionCache cache = new ShareSessionCache(10);
+        sharePartitionManager = SharePartitionManagerBuilder.builder()
+            .withCache(cache)
+            .build();
+
+        assertThrows(InvalidShareSessionEpochException.class,
+            () -> sharePartitionManager.acknowledgeSessionUpdate("grp",
+                new ShareRequestMetadata(Uuid.randomUuid(), 
ShareRequestMetadata.INITIAL_EPOCH)));
+    }
+
+    @Test
+    public void testAcknowledgeSessionUpdateThrowsWhenShareSessionNotFound() {
+        ShareSessionCache cache = new ShareSessionCache(10);
+        sharePartitionManager = SharePartitionManagerBuilder.builder()
+            .withCache(cache)
+            .build();
+
+        // The share session corresponding to this memberId has not been 
created yet. This should throw an exception.
+        assertThrows(ShareSessionNotFoundException.class,
+            () -> sharePartitionManager.acknowledgeSessionUpdate("grp",
+                new ShareRequestMetadata(Uuid.randomUuid(), 
ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH))));
+    }
+
+    @Test
+    public void 
testAcknowledgeSessionUpdateThrowsInvalidShareSessionEpochException() {
+        ShareSessionCache cache = new ShareSessionCache(10);
+        sharePartitionManager = SharePartitionManagerBuilder.builder()
+            .withCache(cache)
+            .build();
+
+        Uuid tpId0 = Uuid.randomUuid();
+        TopicIdPartition tp0 = new TopicIdPartition(tpId0, new 
TopicPartition("foo", 0));
+        TopicIdPartition tp1 = new TopicIdPartition(tpId0, new 
TopicPartition("foo", 1));
+
+        String groupId = "grp";
+        Uuid memberId = Uuid.randomUuid();
+
+        // Create a new share session with an initial share fetch request
+        ShareFetchContext context1 = sharePartitionManager.newContext(groupId, 
List.of(tp0, tp1), EMPTY_PART_LIST,
+            new ShareRequestMetadata(memberId, 
ShareRequestMetadata.INITIAL_EPOCH), false, CONNECTION_ID);
+        assertInstanceOf(ShareSessionContext.class, context1);
+        assertFalse(((ShareSessionContext) context1).isSubsequent());
+
+        // The expected epoch from the share session should be 1, but we are 
passing 2. This should throw an exception.
+        assertThrows(InvalidShareSessionEpochException.class,
+            () -> sharePartitionManager.acknowledgeSessionUpdate("grp",
+                new ShareRequestMetadata(memberId,
+                    
ShareRequestMetadata.nextEpoch(ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)))));
+    }
+
+    @Test
+    public void testAcknowledgeSessionUpdateSuccessOnSubsequentEpoch() {
+        ShareSessionCache cache = new ShareSessionCache(10);
+        sharePartitionManager = SharePartitionManagerBuilder.builder()
+            .withCache(cache)
+            .build();
+
+        Uuid tpId0 = Uuid.randomUuid();
+        TopicIdPartition tp0 = new TopicIdPartition(tpId0, new 
TopicPartition("foo", 0));
+        TopicIdPartition tp1 = new TopicIdPartition(tpId0, new 
TopicPartition("foo", 1));
+
+        String groupId = "grp";
+        Uuid memberId = Uuid.randomUuid();
+
+        // Create a new share session with an initial share fetch request
+        ShareFetchContext context1 = sharePartitionManager.newContext(groupId, 
List.of(tp0, tp1), EMPTY_PART_LIST,
+            new ShareRequestMetadata(memberId, 
ShareRequestMetadata.INITIAL_EPOCH), false, CONNECTION_ID);
+        assertInstanceOf(ShareSessionContext.class, context1);
+        assertFalse(((ShareSessionContext) context1).isSubsequent());
+
+        // The expected epoch from the share session should be 1, but we are 
passing 2. This should throw an exception.
+        assertDoesNotThrow(
+            () -> sharePartitionManager.acknowledgeSessionUpdate("grp",
+                new ShareRequestMetadata(memberId, 
ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH))));
+    }
+
+    @Test
+    public void testAcknowledgeSessionUpdateSuccessOnFinalEpoch() {
+        ShareSessionCache cache = new ShareSessionCache(10);
+        sharePartitionManager = SharePartitionManagerBuilder.builder()
+            .withCache(cache)
+            .build();
+
+        Uuid tpId0 = Uuid.randomUuid();
+        TopicIdPartition tp0 = new TopicIdPartition(tpId0, new 
TopicPartition("foo", 0));
+        TopicIdPartition tp1 = new TopicIdPartition(tpId0, new 
TopicPartition("foo", 1));
+
+        String groupId = "grp";
+        Uuid memberId = Uuid.randomUuid();
+
+        // Create a new share session with an initial share fetch request
+        ShareFetchContext context1 = sharePartitionManager.newContext(groupId, 
List.of(tp0, tp1), EMPTY_PART_LIST,
+            new ShareRequestMetadata(memberId, 
ShareRequestMetadata.INITIAL_EPOCH), false, CONNECTION_ID);
+        assertInstanceOf(ShareSessionContext.class, context1);
+        assertFalse(((ShareSessionContext) context1).isSubsequent());
+
+        // The expected epoch from the share session should be 1, but we are 
passing 2. This should throw an exception.

Review Comment:
   Aren't we passing FINAL_EPOCH?



##########
core/src/test/java/kafka/server/share/SharePartitionManagerTest.java:
##########
@@ -372,6 +459,110 @@ public void testNewContext() {
         assertEquals(0, cache.size());
     }
 
+    @Test
+    public void testAcknowledgeSessionUpdateThrowsOnInitialEpoch() {
+        ShareSessionCache cache = new ShareSessionCache(10);
+        sharePartitionManager = SharePartitionManagerBuilder.builder()
+            .withCache(cache)
+            .build();
+
+        assertThrows(InvalidShareSessionEpochException.class,
+            () -> sharePartitionManager.acknowledgeSessionUpdate("grp",
+                new ShareRequestMetadata(Uuid.randomUuid(), 
ShareRequestMetadata.INITIAL_EPOCH)));
+    }
+
+    @Test
+    public void testAcknowledgeSessionUpdateThrowsWhenShareSessionNotFound() {
+        ShareSessionCache cache = new ShareSessionCache(10);
+        sharePartitionManager = SharePartitionManagerBuilder.builder()
+            .withCache(cache)
+            .build();
+
+        // The share session corresponding to this memberId has not been 
created yet. This should throw an exception.
+        assertThrows(ShareSessionNotFoundException.class,
+            () -> sharePartitionManager.acknowledgeSessionUpdate("grp",
+                new ShareRequestMetadata(Uuid.randomUuid(), 
ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH))));
+    }
+
+    @Test
+    public void 
testAcknowledgeSessionUpdateThrowsInvalidShareSessionEpochException() {
+        ShareSessionCache cache = new ShareSessionCache(10);
+        sharePartitionManager = SharePartitionManagerBuilder.builder()
+            .withCache(cache)
+            .build();
+
+        Uuid tpId0 = Uuid.randomUuid();
+        TopicIdPartition tp0 = new TopicIdPartition(tpId0, new 
TopicPartition("foo", 0));
+        TopicIdPartition tp1 = new TopicIdPartition(tpId0, new 
TopicPartition("foo", 1));
+
+        String groupId = "grp";
+        Uuid memberId = Uuid.randomUuid();
+
+        // Create a new share session with an initial share fetch request
+        ShareFetchContext context1 = sharePartitionManager.newContext(groupId, 
List.of(tp0, tp1), EMPTY_PART_LIST,
+            new ShareRequestMetadata(memberId, 
ShareRequestMetadata.INITIAL_EPOCH), false, CONNECTION_ID);
+        assertInstanceOf(ShareSessionContext.class, context1);
+        assertFalse(((ShareSessionContext) context1).isSubsequent());
+
+        // The expected epoch from the share session should be 1, but we are 
passing 2. This should throw an exception.

Review Comment:
   Aren't we passing 1?



##########
core/src/test/java/kafka/server/share/SharePartitionManagerTest.java:
##########
@@ -372,6 +459,110 @@ public void testNewContext() {
         assertEquals(0, cache.size());
     }
 
+    @Test
+    public void testAcknowledgeSessionUpdateThrowsOnInitialEpoch() {
+        ShareSessionCache cache = new ShareSessionCache(10);
+        sharePartitionManager = SharePartitionManagerBuilder.builder()
+            .withCache(cache)
+            .build();
+
+        assertThrows(InvalidShareSessionEpochException.class,
+            () -> sharePartitionManager.acknowledgeSessionUpdate("grp",
+                new ShareRequestMetadata(Uuid.randomUuid(), 
ShareRequestMetadata.INITIAL_EPOCH)));
+    }
+
+    @Test
+    public void testAcknowledgeSessionUpdateThrowsWhenShareSessionNotFound() {
+        ShareSessionCache cache = new ShareSessionCache(10);
+        sharePartitionManager = SharePartitionManagerBuilder.builder()
+            .withCache(cache)
+            .build();
+
+        // The share session corresponding to this memberId has not been 
created yet. This should throw an exception.
+        assertThrows(ShareSessionNotFoundException.class,
+            () -> sharePartitionManager.acknowledgeSessionUpdate("grp",
+                new ShareRequestMetadata(Uuid.randomUuid(), 
ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH))));
+    }
+
+    @Test
+    public void 
testAcknowledgeSessionUpdateThrowsInvalidShareSessionEpochException() {
+        ShareSessionCache cache = new ShareSessionCache(10);
+        sharePartitionManager = SharePartitionManagerBuilder.builder()
+            .withCache(cache)
+            .build();
+
+        Uuid tpId0 = Uuid.randomUuid();
+        TopicIdPartition tp0 = new TopicIdPartition(tpId0, new 
TopicPartition("foo", 0));
+        TopicIdPartition tp1 = new TopicIdPartition(tpId0, new 
TopicPartition("foo", 1));
+
+        String groupId = "grp";
+        Uuid memberId = Uuid.randomUuid();
+
+        // Create a new share session with an initial share fetch request
+        ShareFetchContext context1 = sharePartitionManager.newContext(groupId, 
List.of(tp0, tp1), EMPTY_PART_LIST,
+            new ShareRequestMetadata(memberId, 
ShareRequestMetadata.INITIAL_EPOCH), false, CONNECTION_ID);
+        assertInstanceOf(ShareSessionContext.class, context1);
+        assertFalse(((ShareSessionContext) context1).isSubsequent());
+
+        // The expected epoch from the share session should be 1, but we are 
passing 2. This should throw an exception.
+        assertThrows(InvalidShareSessionEpochException.class,
+            () -> sharePartitionManager.acknowledgeSessionUpdate("grp",
+                new ShareRequestMetadata(memberId,
+                    
ShareRequestMetadata.nextEpoch(ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)))));
+    }
+
+    @Test
+    public void testAcknowledgeSessionUpdateSuccessOnSubsequentEpoch() {
+        ShareSessionCache cache = new ShareSessionCache(10);
+        sharePartitionManager = SharePartitionManagerBuilder.builder()
+            .withCache(cache)
+            .build();
+
+        Uuid tpId0 = Uuid.randomUuid();
+        TopicIdPartition tp0 = new TopicIdPartition(tpId0, new 
TopicPartition("foo", 0));
+        TopicIdPartition tp1 = new TopicIdPartition(tpId0, new 
TopicPartition("foo", 1));
+
+        String groupId = "grp";
+        Uuid memberId = Uuid.randomUuid();
+
+        // Create a new share session with an initial share fetch request
+        ShareFetchContext context1 = sharePartitionManager.newContext(groupId, 
List.of(tp0, tp1), EMPTY_PART_LIST,
+            new ShareRequestMetadata(memberId, 
ShareRequestMetadata.INITIAL_EPOCH), false, CONNECTION_ID);
+        assertInstanceOf(ShareSessionContext.class, context1);
+        assertFalse(((ShareSessionContext) context1).isSubsequent());
+
+        // The expected epoch from the share session should be 1, but we are 
passing 2. This should throw an exception.
+        assertDoesNotThrow(
+            () -> sharePartitionManager.acknowledgeSessionUpdate("grp",
+                new ShareRequestMetadata(memberId, 
ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH))));

Review Comment:
   Aren't we passing 1? The next epoch after the initial epoch is 1, isn't it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to