squah-confluent commented on code in PR #18224:
URL: https://github.com/apache/kafka/pull/18224#discussion_r1909869186
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -3001,17 +3010,52 @@ private <T> CoordinatorResult<T, CoordinatorRecord>
consumerGroupFenceMember(
ConsumerGroupMember member,
T response
) {
+ return consumerGroupFenceMembers(group, Set.of(member), response);
+ }
+
+ /**
+ * Fences members from a consumer group and maybe downgrade the consumer
group to a classic group.
+ *
+ * @param group The group.
+ * @param members The members.
+ * @param response The response of the CoordinatorResult.
+ *
+ * @return The CoordinatorResult to be applied.
+ */
+ private <T> CoordinatorResult<T, CoordinatorRecord>
consumerGroupFenceMembers(
+ ConsumerGroup group,
+ Set<ConsumerGroupMember> members,
+ T response
+ ) {
+ if (members.isEmpty()) {
+ // No members to fence. Don't bump the group epoch.
+ return new CoordinatorResult<>(Collections.emptyList(), response);
+ }
+
+ Set<String> memberIds = new HashSet<String>();
+ for (ConsumerGroupMember member : members) {
+ memberIds.add(member.memberId());
+ }
Review Comment:
Also gave this a try. For consistency I applied the same change to
`validateOnlineDowngradeWithReplacedMemberId`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]