dajac commented on code in PR #18224:
URL: https://github.com/apache/kafka/pull/18224#discussion_r1907259272
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -3002,8 +3008,9 @@ private <T> CoordinatorResult<T, CoordinatorRecord>
consumerGroupFenceMember(
T response
) {
List<CoordinatorRecord> records = new ArrayList<>();
- if (validateOnlineDowngradeWithFencedMember(group, member.memberId()))
{
- convertToClassicGroup(group, member.memberId(), null, records);
+ Set<String> fencedMemberIds = Collections.singleton(member.memberId());
+ if (validateOnlineDowngradeWithFencedMembers(group, fencedMemberIds)) {
Review Comment:
nit: We could use `List.of(member.memberId())` and remove `fencedMemberIds`.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -6091,30 +6096,41 @@ private CoordinatorResult<LeaveGroupResponseData,
CoordinatorRecord> classicGrou
}
}
- if (!records.isEmpty()) {
- // Check whether resolved regular expressions could be deleted.
- Set<String> deletedRegexes = maybeDeleteResolvedRegularExpressions(
- records,
- group,
- validLeaveGroupMembers
- );
+ List<CoordinatorRecord> records = new ArrayList<>();
+ if (!validLeaveGroupMembers.isEmpty()) {
+ if (validateOnlineDowngradeWithFencedMembers(group,
validLeaveGroupMemberIds)) {
Review Comment:
I wonder if we could extend `consumerGroupFenceMember` to take a list of
members instead of taking a single member. It would allow use to reuse the same
code for removing members. I did not look into how feasible that it. Could you
check?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroup.java:
##########
@@ -1336,21 +1336,21 @@ public Map<String, byte[]> groupAssignment() {
/**
* Convert the given ConsumerGroup to a corresponding ClassicGroup.
- * The member with leavingMemberId will not be converted to the new
ClassicGroup as it's the last
- * member using new consumer protocol that left and triggered the
downgrade.
*
* @param consumerGroup The converted ConsumerGroup.
- * @param leavingMemberId The member that will not be
converted in the ClassicGroup.
- * @param joiningMember The member that needs to be
converted and added to the ClassicGroup.
+ * @param leavingMemberIds The members that will not be
converted in the ClassicGroup.
+ * @param replacedMemberId The member that will be replaced
by replacingMember in the ClassicGroup.
Review Comment:
I wonder if we really need `replacedMemberId`. Would it be possible to
remove it?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##########
@@ -1263,6 +1263,21 @@ public boolean allMembersUseClassicProtocolExcept(String
memberId) {
!getOrMaybeCreateMember(memberId, false).useClassicProtocol();
}
+ /**
+ * Checks whether all the members use the classic protocol except the
given members.
+ *
+ * @param memberIds The members to remove.
+ * @return A boolean indicating whether all the members use the classic
protocol.
+ */
+ public boolean allMembersUseClassicProtocolExcept(Set<String> memberIds) {
+ for (ConsumerGroupMember member : members().values()) {
+ if (!memberIds.contains(member.memberId()) &&
!member.useClassicProtocol()) {
+ return false;
+ }
+ }
Review Comment:
I assume that `memberIds` will usually be smaller than `members`. I wonder
if we could check without going through all the `members`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]