dajac commented on code in PR #16371:
URL: https://github.com/apache/kafka/pull/16371#discussion_r1657083140
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1867,11 +1936,12 @@ private Assignment updateTargetAssignment(
.withTopicsImage(metadataImage.topics())
.addOrUpdateMember(updatedMember.memberId(),
updatedMember);
- if (staticMemberReplaced) {
+ String previousMemberId =
group.staticMemberId(updatedMember.instanceId());
Review Comment:
We check the member id associated to the instance id. If it is different
from the member id of the current member, it means that the member was
replaced. Using `member.memberId` does not work because it is the idea of the
current member.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -1133,14 +1148,19 @@ public void
testStaticMemberGetsBackAssignmentUponRejoin() {
.build();
List<CoordinatorRecord> expectedRecordsAfterRejoin = Arrays.asList(
+ // The previous member is deleted.
CoordinatorRecordHelpers.newCurrentAssignmentTombstoneRecord(groupId,
memberId2),
CoordinatorRecordHelpers.newTargetAssignmentTombstoneRecord(groupId, memberId2),
CoordinatorRecordHelpers.newMemberSubscriptionTombstoneRecord(groupId,
memberId2),
- CoordinatorRecordHelpers.newMemberSubscriptionRecord(groupId,
expectedRejoinedMember),
+
+ // The previous member is replaced by the new one.
Review Comment:
Correct. These are the records generated by `replaceMember`. There are a
copy of the previous member but with a new member id, and new member epoch and
a new previous member epoch.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1867,11 +1936,12 @@ private Assignment updateTargetAssignment(
.withTopicsImage(metadataImage.topics())
.addOrUpdateMember(updatedMember.memberId(),
updatedMember);
- if (staticMemberReplaced) {
+ String previousMemberId =
group.staticMemberId(updatedMember.instanceId());
Review Comment:
We check the member id associated to the instance id. If it is different
from the member id of the current member, it means that the member was
replaced. Using `member.memberId` does not work because it is the id of the
current member.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1867,11 +1936,12 @@ private Assignment updateTargetAssignment(
.withTopicsImage(metadataImage.topics())
.addOrUpdateMember(updatedMember.memberId(),
updatedMember);
- if (staticMemberReplaced) {
+ String previousMemberId =
group.staticMemberId(updatedMember.instanceId());
Review Comment:
Added a comment.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]