lucasbru commented on code in PR #22245:
URL: https://github.com/apache/kafka/pull/22245#discussion_r3281617653
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -4485,6 +4723,42 @@ private void replaceMember(
));
}
+ /**
+ * Write records to replace the old member by the new member.
+ *
+ * @param records The list of records to append to.
+ * @param group The streams group.
+ * @param oldMember The old member.
+ * @param newMember The new member.
+ */
+ private void replaceStreamsMember(
+ List<CoordinatorRecord> records,
+ StreamsGroup group,
+ StreamsGroupMember oldMember,
+ StreamsGroupMember newMember
+ ) {
+ String groupId = group.groupId();
+
+ // Remove the member without canceling its timers in case the change
is reverted. If the
+ // change is not reverted, the group validation will fail and the
timer will do nothing.
+ records.addAll(removeStreamsMember(groupId, oldMember.memberId()));
+
+ // Generate records.
+
records.add(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(
+ groupId,
+ newMember
+ ));
+
records.add(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(
+ groupId,
+ newMember.memberId(),
+ group.targetAssignment(oldMember.memberId(),
oldMember.instanceId())
Review Comment:
Yes. I think you are right
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]