mjsax commented on code in PR #20760:
URL: https://github.com/apache/kafka/pull/20760#discussion_r2492799378
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java:
##########
@@ -678,6 +679,112 @@ public void testValidateOffsetCommit(short version) {
}
}
+ @Test
+ public void testValidateOffsetCommitWithOlderEpoch() {
+ StreamsGroup group = createStreamsGroup("group-foo");
+
+ group.setTopology(new StreamsTopology(1, Map.of("0", new
StreamsGroupTopologyValue.Subtopology()
+ .setSubtopologyId("0")
+ .setSourceTopics(List.of("input-topic")))));
+
+ group.updateMember(new StreamsGroupMember.Builder("member-1")
+ .setMemberEpoch(2)
+ .setAssignedTasks(new TasksTupleWithEpochs(
+ Map.of("0", Map.of(0, 2, 1, 2)),
+ Map.of(), Map.of()))
+ .build());
+
+ CommitPartitionValidator validator = group.validateOffsetCommit(
+ "member-1", "", 1, false, ApiKeys.OFFSET_COMMIT.latestVersion());
+
+ // Received epoch (1) < assignment epoch (2) should throw
+ assertThrows(StaleMemberEpochException.class, () ->
+ validator.validate("input-topic", Uuid.ZERO_UUID, 0));
+ }
+
+ @Test
+ public void testValidateOffsetCommitWithOlderEpochMissingTopology() {
+ StreamsGroup group = createStreamsGroup("group-foo");
+
+ group.updateMember(new StreamsGroupMember.Builder("member-1")
+ .setMemberEpoch(2)
+ .build());
+
+ // Topology is retrieved when creating validator, so exception is
thrown here
+ assertThrows(StaleMemberEpochException.class, () ->
+ group.validateOffsetCommit("member-1", "", 1, false,
ApiKeys.OFFSET_COMMIT.latestVersion()));
+ }
+
+ @Test
+ public void testValidateOffsetCommitWithOlderEpochMissingSubtopology() {
+ StreamsGroup group = createStreamsGroup("group-foo");
+
+ group.setTopology(new StreamsTopology(1, Map.of("0", new
StreamsGroupTopologyValue.Subtopology()
+ .setSubtopologyId("0")
+ .setSourceTopics(List.of("input-topic")))));
+
+ group.updateMember(new StreamsGroupMember.Builder("member-1")
+ .setMemberEpoch(2)
+ .build());
+
+ CommitPartitionValidator validator = group.validateOffsetCommit(
+ "member-1", "", 1, false, ApiKeys.OFFSET_COMMIT.latestVersion());
+
+ assertThrows(StaleMemberEpochException.class, () ->
+ validator.validate("unknown-topic", Uuid.ZERO_UUID, 0));
+ }
+
+ @Test
+ public void testValidateOffsetCommitWithOlderEpochUnassignedPartition() {
+ StreamsGroup group = createStreamsGroup("group-foo");
+
+ group.setTopology(new StreamsTopology(1, Map.of("0", new
StreamsGroupTopologyValue.Subtopology()
+ .setSubtopologyId("0")
+ .setSourceTopics(List.of("input-topic")))));
+
+ group.updateMember(new StreamsGroupMember.Builder("member-1")
+ .setMemberEpoch(2)
+ .setAssignedTasks(new TasksTupleWithEpochs(
+ Map.of("0", Map.of(0, 2)),
+ Map.of(), Map.of()))
+ .setTasksPendingRevocation(TasksTupleWithEpochs.EMPTY)
+ .build());
+
+ CommitPartitionValidator validator = group.validateOffsetCommit(
+ "member-1", "", 1, false, ApiKeys.OFFSET_COMMIT.latestVersion());
+
+ // Partition 0 assigned with epoch 2, received epoch 1 should throw
+ assertThrows(StaleMemberEpochException.class, () ->
Review Comment:
Seem this check is somewhat redundant to
`testValidateOffsetCommitWithOlderEpoch` from above?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]