dongnuo123 commented on code in PR #22510:
URL: https://github.com/apache/kafka/pull/22510#discussion_r3391570199
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -3900,24 +3901,23 @@ private UpdateTargetAssignmentResult<Assignment>
maybeUpdateTargetAssignment(
updatedMember
).orElse(defaultConsumerGroupAssignor.name());
try {
+ UpdatedMembersAndTargetAssignmentView<ConsumerGroupMember,
Assignment> updatedMembersAndTargetAssignment =
+ new UpdatedMembersAndTargetAssignmentView<>(
+ group.members(),
+ group.staticMembers(),
+ group.targetAssignment()
+ );
+
updatedMembersAndTargetAssignment.addOrUpdateMember(updatedMember.memberId(),
updatedMember.instanceId(), updatedMember);
+
TargetAssignmentBuilder.ConsumerTargetAssignmentBuilder
assignmentResultBuilder =
new
TargetAssignmentBuilder.ConsumerTargetAssignmentBuilder(group.groupId(),
groupEpoch, consumerGroupAssignors.get(preferredServerAssignor))
.withTime(time)
- .withMembers(group.members())
- .withStaticMembers(group.staticMembers())
+ .withMembers(updatedMembersAndTargetAssignment.members())
.withSubscriptionType(subscriptionType)
- .withTargetAssignment(group.targetAssignment())
+
.withTargetAssignment(updatedMembersAndTargetAssignment.targetAssignment())
Review Comment:
wondering if we have an existing unit test in GMMTest covering static member
replaced and target assignment recomputed with an unchanged assignment?
Previously the target assignment record is written twice because the builder
reads from the old `group.targetAssignment()`. In this case we shouldn't see
two target assignment records for the new member with the view already did the
replacement?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]