Copilot commented on code in PR #20681:
URL: https://github.com/apache/kafka/pull/20681#discussion_r2419930535
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java:
##########
@@ -640,9 +640,48 @@ public void testValidateTransactionalOffsetCommit(short
version) {
group.validateOffsetCommit("", null, -1, isTransactional, version);
}
+ @ParameterizedTest
+ @ApiKeyVersionsSource(apiKey = ApiKeys.TXN_OFFSET_COMMIT)
+ public void testValidateTransactionalOffsetCommitWithRevocationEpoch(short
version) {
+ boolean isTransactional = true;
+ StreamsGroup group = createStreamsGroup("group-foo");
+
+ // Simulate a call from the admin client without member ID and member
epoch.
+ // This should pass only if the group is empty.
+ group.validateOffsetCommit("", "", -1, isTransactional, version);
+
+ // The member does not exist.
+ assertThrows(UnknownMemberIdException.class, () ->
+ group.validateOffsetCommit("member-id", null, 0, isTransactional,
version));
+
+ // Create a member with revocation epoch.3
Review Comment:
Corrected spacing in comment - 'epoch.3' should be 'epoch 3'.
```suggestion
// Create a member with revocation epoch 3
```
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java:
##########
@@ -703,7 +703,13 @@ public void validateOffsetCommit(
"by members using the streams group protocol");
}
- validateMemberEpoch(memberEpoch, member.memberEpoch());
+ if (member.revocationEpoch() >= 0) {
+ validateMemberEpochWithRevocationEpoch(memberEpoch,
member.revocationEpoch(), member.memberEpoch());
+ } else {
+ // If the member was read from a legacy record without a
revocation epoch,
Review Comment:
Trailing whitespace at the end of line 709.
```suggestion
// If the member was read from a legacy record without a
revocation epoch,
```
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java:
##########
@@ -827,6 +833,31 @@ private void validateMemberEpoch(
}
}
+
+ /**
+ * Throws a StaleMemberEpochException if the received member epoch does
not match the range of allowed member epochs.
+ *
+ * @param receivedMemberEpoch The received member epoch.
+ * @param revocationEpoch The revocation epoch.
+ * @param brokerSideMemberEpoch The broker-side member epoch.
+ * @throws StaleMemberEpochException If the received member epoch does not
match the range of allowed member epochs.
+ */
+ private void validateMemberEpochWithRevocationEpoch(
+ int receivedMemberEpoch,
+ int revocationEpoch,
+ int brokerSideMemberEpoch
+ ) throws StaleMemberEpochException {
+ if (receivedMemberEpoch <= revocationEpoch || receivedMemberEpoch >
brokerSideMemberEpoch) {
+ throw new StaleMemberEpochException(String.format("The received
member epoch %d does not match "
+ + "the range of allowed member epochs from %d to %d.",
+ receivedMemberEpoch, revocationEpoch + 1,
brokerSideMemberEpoch));
+ }
+ if (receivedMemberEpoch != brokerSideMemberEpoch) {
+ log.trace("Permitting stale member epoch {} sent by the client
(broker-side member epoch is {}), since it's larger than revocation epoch {}",
+ receivedMemberEpoch, brokerSideMemberEpoch, revocationEpoch);
Review Comment:
Line continuation without proper indentation or formatting.
```suggestion
receivedMemberEpoch, brokerSideMemberEpoch,
revocationEpoch);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]