dongnuo123 commented on code in PR #22510:
URL: https://github.com/apache/kafka/pull/22510#discussion_r3391570199
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -3900,24 +3901,23 @@ private UpdateTargetAssignmentResult<Assignment>
maybeUpdateTargetAssignment(
updatedMember
).orElse(defaultConsumerGroupAssignor.name());
try {
+ UpdatedMembersAndTargetAssignmentView<ConsumerGroupMember,
Assignment> updatedMembersAndTargetAssignment =
+ new UpdatedMembersAndTargetAssignmentView<>(
+ group.members(),
+ group.staticMembers(),
+ group.targetAssignment()
+ );
+
updatedMembersAndTargetAssignment.addOrUpdateMember(updatedMember.memberId(),
updatedMember.instanceId(), updatedMember);
+
TargetAssignmentBuilder.ConsumerTargetAssignmentBuilder
assignmentResultBuilder =
new
TargetAssignmentBuilder.ConsumerTargetAssignmentBuilder(group.groupId(),
groupEpoch, consumerGroupAssignors.get(preferredServerAssignor))
.withTime(time)
- .withMembers(group.members())
- .withStaticMembers(group.staticMembers())
+ .withMembers(updatedMembersAndTargetAssignment.members())
.withSubscriptionType(subscriptionType)
- .withTargetAssignment(group.targetAssignment())
+
.withTargetAssignment(updatedMembersAndTargetAssignment.targetAssignment())
Review Comment:
wondering if we have an existing unit test in GMMTest covering static member
replaced and target assignment recomputed with an unchanged assignment?
Previously the target assignment record is written twice (one in
`replaceMember` and the other here) because the builder reads from the old
`group.targetAssignment()`. In this case we shouldn't see two target assignment
records for the new member with the view already did the replacement?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]