mjsax commented on code in PR #20760:
URL: https://github.com/apache/kafka/pull/20760#discussion_r2492806297
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java:
##########
@@ -3659,4 +3660,119 @@ private ClassicGroupMember mkGenericMember(
)
);
}
+
+ @Test
+ public void testStreamsGroupOffsetCommitWithAssignmentEpochValid() {
+ OffsetMetadataManagerTestContext context = new
OffsetMetadataManagerTestContext.Builder().build();
+ StreamsGroup group =
context.groupMetadataManager.getOrMaybeCreatePersistedStreamsGroup("foo", true);
+
+ // Setup: topology with topic "bar" in subtopology "0"
+ group.setTopology(new StreamsTopology(1, Map.of("0", new
StreamsGroupTopologyValue.Subtopology()
+ .setSubtopologyId("0")
+ .setSourceTopics(List.of("bar")))));
+
+ // Member at epoch 10, with partitions assigned at epoch 5
+ group.updateMember(StreamsGroupMember.Builder.withDefaults("member")
+ .setMemberEpoch(10)
+ .setAssignedTasks(new TasksTupleWithEpochs(
+ Map.of("0", Map.of(0, 5, 1, 5)),
+ Map.of(), Map.of()))
+ .build());
+
+ // Commit with member epoch 5 should succeed (5 >= assignment epoch 5)
+ CoordinatorResult<OffsetCommitResponseData, CoordinatorRecord> result
= context.commitOffset(
+ new OffsetCommitRequestData()
+ .setGroupId("foo")
+ .setMemberId("member")
+ .setGenerationIdOrMemberEpoch(5)
+ .setTopics(List.of(new
OffsetCommitRequestData.OffsetCommitRequestTopic()
+ .setName("bar")
+ .setPartitions(List.of(
+ new
OffsetCommitRequestData.OffsetCommitRequestPartition()
+ .setPartitionIndex(0)
+ .setCommittedOffset(100L),
+ new
OffsetCommitRequestData.OffsetCommitRequestPartition()
+ .setPartitionIndex(1)
+ .setCommittedOffset(200L))))));
+
+ assertEquals(Errors.NONE.code(),
result.response().topics().get(0).partitions().get(0).errorCode());
+ assertEquals(Errors.NONE.code(),
result.response().topics().get(0).partitions().get(1).errorCode());
+ assertEquals(2, result.records().size());
+ }
+
+ @Test
+ public void testStreamsGroupOffsetCommitWithAssignmentEpochStale() {
+ OffsetMetadataManagerTestContext context = new
OffsetMetadataManagerTestContext.Builder().build();
+ StreamsGroup group =
context.groupMetadataManager.getOrMaybeCreatePersistedStreamsGroup("foo", true);
+
+ group.setTopology(new StreamsTopology(1, Map.of("0", new
StreamsGroupTopologyValue.Subtopology()
+ .setSubtopologyId("0")
+ .setSourceTopics(List.of("bar")))));
+
+ // Member at epoch 10, with partitions assigned at different epochs
+ group.updateMember(StreamsGroupMember.Builder.withDefaults("member")
+ .setMemberEpoch(10)
+ .setAssignedTasks(new TasksTupleWithEpochs(
+ Map.of("0", Map.of(0, 5, 1, 8)),
+ Map.of(), Map.of()))
+ .build());
+
+ // Commit with member epoch 3 should fail (3 < assignment epochs 5 and
8)
Review Comment:
We should even fail for epoch 7 (< 8) right?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]