AndrewJSchofield commented on code in PR #19778: URL: https://github.com/apache/kafka/pull/19778#discussion_r2100757284
########## core/src/test/java/kafka/server/share/SharePartitionManagerTest.java: ########## @@ -372,6 +459,110 @@ public void testNewContext() { assertEquals(0, cache.size()); } + @Test + public void testAcknowledgeSessionUpdateThrowsOnInitialEpoch() { + ShareSessionCache cache = new ShareSessionCache(10); + sharePartitionManager = SharePartitionManagerBuilder.builder() + .withCache(cache) + .build(); + + assertThrows(InvalidShareSessionEpochException.class, + () -> sharePartitionManager.acknowledgeSessionUpdate("grp", + new ShareRequestMetadata(Uuid.randomUuid(), ShareRequestMetadata.INITIAL_EPOCH))); + } + + @Test + public void testAcknowledgeSessionUpdateThrowsWhenShareSessionNotFound() { + ShareSessionCache cache = new ShareSessionCache(10); + sharePartitionManager = SharePartitionManagerBuilder.builder() + .withCache(cache) + .build(); + + // The share session corresponding to this memberId has not been created yet. This should throw an exception. + assertThrows(ShareSessionNotFoundException.class, + () -> sharePartitionManager.acknowledgeSessionUpdate("grp", + new ShareRequestMetadata(Uuid.randomUuid(), ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)))); + } + + @Test + public void testAcknowledgeSessionUpdateThrowsInvalidShareSessionEpochException() { + ShareSessionCache cache = new ShareSessionCache(10); + sharePartitionManager = SharePartitionManagerBuilder.builder() + .withCache(cache) + .build(); + + Uuid tpId0 = Uuid.randomUuid(); + TopicIdPartition tp0 = new TopicIdPartition(tpId0, new TopicPartition("foo", 0)); + TopicIdPartition tp1 = new TopicIdPartition(tpId0, new TopicPartition("foo", 1)); + + String groupId = "grp"; + Uuid memberId = Uuid.randomUuid(); + + // Create a new share session with an initial share fetch request + ShareFetchContext context1 = sharePartitionManager.newContext(groupId, List.of(tp0, tp1), EMPTY_PART_LIST, + new ShareRequestMetadata(memberId, ShareRequestMetadata.INITIAL_EPOCH), false, CONNECTION_ID); + assertInstanceOf(ShareSessionContext.class, context1); + assertFalse(((ShareSessionContext) context1).isSubsequent()); + + // The expected epoch from the share session should be 1, but we are passing 2. This should throw an exception. + assertThrows(InvalidShareSessionEpochException.class, + () -> sharePartitionManager.acknowledgeSessionUpdate("grp", + new ShareRequestMetadata(memberId, + ShareRequestMetadata.nextEpoch(ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH))))); + } + + @Test + public void testAcknowledgeSessionUpdateSuccessOnSubsequentEpoch() { + ShareSessionCache cache = new ShareSessionCache(10); + sharePartitionManager = SharePartitionManagerBuilder.builder() + .withCache(cache) + .build(); + + Uuid tpId0 = Uuid.randomUuid(); + TopicIdPartition tp0 = new TopicIdPartition(tpId0, new TopicPartition("foo", 0)); + TopicIdPartition tp1 = new TopicIdPartition(tpId0, new TopicPartition("foo", 1)); + + String groupId = "grp"; + Uuid memberId = Uuid.randomUuid(); + + // Create a new share session with an initial share fetch request + ShareFetchContext context1 = sharePartitionManager.newContext(groupId, List.of(tp0, tp1), EMPTY_PART_LIST, + new ShareRequestMetadata(memberId, ShareRequestMetadata.INITIAL_EPOCH), false, CONNECTION_ID); + assertInstanceOf(ShareSessionContext.class, context1); + assertFalse(((ShareSessionContext) context1).isSubsequent()); + + // The expected epoch from the share session should be 1, but we are passing 2. This should throw an exception. + assertDoesNotThrow( + () -> sharePartitionManager.acknowledgeSessionUpdate("grp", + new ShareRequestMetadata(memberId, ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)))); + } + + @Test + public void testAcknowledgeSessionUpdateSuccessOnFinalEpoch() { + ShareSessionCache cache = new ShareSessionCache(10); + sharePartitionManager = SharePartitionManagerBuilder.builder() + .withCache(cache) + .build(); + + Uuid tpId0 = Uuid.randomUuid(); + TopicIdPartition tp0 = new TopicIdPartition(tpId0, new TopicPartition("foo", 0)); + TopicIdPartition tp1 = new TopicIdPartition(tpId0, new TopicPartition("foo", 1)); + + String groupId = "grp"; + Uuid memberId = Uuid.randomUuid(); + + // Create a new share session with an initial share fetch request + ShareFetchContext context1 = sharePartitionManager.newContext(groupId, List.of(tp0, tp1), EMPTY_PART_LIST, + new ShareRequestMetadata(memberId, ShareRequestMetadata.INITIAL_EPOCH), false, CONNECTION_ID); + assertInstanceOf(ShareSessionContext.class, context1); + assertFalse(((ShareSessionContext) context1).isSubsequent()); + + // The expected epoch from the share session should be 1, but we are passing 2. This should throw an exception. Review Comment: Aren't we passing FINAL_EPOCH? ########## core/src/test/java/kafka/server/share/SharePartitionManagerTest.java: ########## @@ -372,6 +459,110 @@ public void testNewContext() { assertEquals(0, cache.size()); } + @Test + public void testAcknowledgeSessionUpdateThrowsOnInitialEpoch() { + ShareSessionCache cache = new ShareSessionCache(10); + sharePartitionManager = SharePartitionManagerBuilder.builder() + .withCache(cache) + .build(); + + assertThrows(InvalidShareSessionEpochException.class, + () -> sharePartitionManager.acknowledgeSessionUpdate("grp", + new ShareRequestMetadata(Uuid.randomUuid(), ShareRequestMetadata.INITIAL_EPOCH))); + } + + @Test + public void testAcknowledgeSessionUpdateThrowsWhenShareSessionNotFound() { + ShareSessionCache cache = new ShareSessionCache(10); + sharePartitionManager = SharePartitionManagerBuilder.builder() + .withCache(cache) + .build(); + + // The share session corresponding to this memberId has not been created yet. This should throw an exception. + assertThrows(ShareSessionNotFoundException.class, + () -> sharePartitionManager.acknowledgeSessionUpdate("grp", + new ShareRequestMetadata(Uuid.randomUuid(), ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)))); + } + + @Test + public void testAcknowledgeSessionUpdateThrowsInvalidShareSessionEpochException() { + ShareSessionCache cache = new ShareSessionCache(10); + sharePartitionManager = SharePartitionManagerBuilder.builder() + .withCache(cache) + .build(); + + Uuid tpId0 = Uuid.randomUuid(); + TopicIdPartition tp0 = new TopicIdPartition(tpId0, new TopicPartition("foo", 0)); + TopicIdPartition tp1 = new TopicIdPartition(tpId0, new TopicPartition("foo", 1)); + + String groupId = "grp"; + Uuid memberId = Uuid.randomUuid(); + + // Create a new share session with an initial share fetch request + ShareFetchContext context1 = sharePartitionManager.newContext(groupId, List.of(tp0, tp1), EMPTY_PART_LIST, + new ShareRequestMetadata(memberId, ShareRequestMetadata.INITIAL_EPOCH), false, CONNECTION_ID); + assertInstanceOf(ShareSessionContext.class, context1); + assertFalse(((ShareSessionContext) context1).isSubsequent()); + + // The expected epoch from the share session should be 1, but we are passing 2. This should throw an exception. Review Comment: Aren't we passing 1? ########## core/src/test/java/kafka/server/share/SharePartitionManagerTest.java: ########## @@ -372,6 +459,110 @@ public void testNewContext() { assertEquals(0, cache.size()); } + @Test + public void testAcknowledgeSessionUpdateThrowsOnInitialEpoch() { + ShareSessionCache cache = new ShareSessionCache(10); + sharePartitionManager = SharePartitionManagerBuilder.builder() + .withCache(cache) + .build(); + + assertThrows(InvalidShareSessionEpochException.class, + () -> sharePartitionManager.acknowledgeSessionUpdate("grp", + new ShareRequestMetadata(Uuid.randomUuid(), ShareRequestMetadata.INITIAL_EPOCH))); + } + + @Test + public void testAcknowledgeSessionUpdateThrowsWhenShareSessionNotFound() { + ShareSessionCache cache = new ShareSessionCache(10); + sharePartitionManager = SharePartitionManagerBuilder.builder() + .withCache(cache) + .build(); + + // The share session corresponding to this memberId has not been created yet. This should throw an exception. + assertThrows(ShareSessionNotFoundException.class, + () -> sharePartitionManager.acknowledgeSessionUpdate("grp", + new ShareRequestMetadata(Uuid.randomUuid(), ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)))); + } + + @Test + public void testAcknowledgeSessionUpdateThrowsInvalidShareSessionEpochException() { + ShareSessionCache cache = new ShareSessionCache(10); + sharePartitionManager = SharePartitionManagerBuilder.builder() + .withCache(cache) + .build(); + + Uuid tpId0 = Uuid.randomUuid(); + TopicIdPartition tp0 = new TopicIdPartition(tpId0, new TopicPartition("foo", 0)); + TopicIdPartition tp1 = new TopicIdPartition(tpId0, new TopicPartition("foo", 1)); + + String groupId = "grp"; + Uuid memberId = Uuid.randomUuid(); + + // Create a new share session with an initial share fetch request + ShareFetchContext context1 = sharePartitionManager.newContext(groupId, List.of(tp0, tp1), EMPTY_PART_LIST, + new ShareRequestMetadata(memberId, ShareRequestMetadata.INITIAL_EPOCH), false, CONNECTION_ID); + assertInstanceOf(ShareSessionContext.class, context1); + assertFalse(((ShareSessionContext) context1).isSubsequent()); + + // The expected epoch from the share session should be 1, but we are passing 2. This should throw an exception. + assertThrows(InvalidShareSessionEpochException.class, + () -> sharePartitionManager.acknowledgeSessionUpdate("grp", + new ShareRequestMetadata(memberId, + ShareRequestMetadata.nextEpoch(ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH))))); + } + + @Test + public void testAcknowledgeSessionUpdateSuccessOnSubsequentEpoch() { + ShareSessionCache cache = new ShareSessionCache(10); + sharePartitionManager = SharePartitionManagerBuilder.builder() + .withCache(cache) + .build(); + + Uuid tpId0 = Uuid.randomUuid(); + TopicIdPartition tp0 = new TopicIdPartition(tpId0, new TopicPartition("foo", 0)); + TopicIdPartition tp1 = new TopicIdPartition(tpId0, new TopicPartition("foo", 1)); + + String groupId = "grp"; + Uuid memberId = Uuid.randomUuid(); + + // Create a new share session with an initial share fetch request + ShareFetchContext context1 = sharePartitionManager.newContext(groupId, List.of(tp0, tp1), EMPTY_PART_LIST, + new ShareRequestMetadata(memberId, ShareRequestMetadata.INITIAL_EPOCH), false, CONNECTION_ID); + assertInstanceOf(ShareSessionContext.class, context1); + assertFalse(((ShareSessionContext) context1).isSubsequent()); + + // The expected epoch from the share session should be 1, but we are passing 2. This should throw an exception. + assertDoesNotThrow( + () -> sharePartitionManager.acknowledgeSessionUpdate("grp", + new ShareRequestMetadata(memberId, ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)))); Review Comment: Aren't we passing 1? The next epoch after the initial epoch is 1, isn't it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org