Re: [PR] KAFKA-16832: LeaveGroup API for upgrading ConsumerGroup [kafka]

2024-05-28 Thread via GitHub


dajac merged PR #16057:
URL: https://github.com/apache/kafka/pull/16057


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16832: LeaveGroup API for upgrading ConsumerGroup [kafka]

2024-05-28 Thread via GitHub


jeffkbkim commented on code in PR #16057:
URL: https://github.com/apache/kafka/pull/16057#discussion_r1617796172


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -8695,7 +8691,7 @@ public void testLeaveGroupInvalidGroup() {
 .build();
 context.createClassicGroup("group-id");
 
-assertThrows(UnknownMemberIdException.class, () -> 
context.sendClassicGroupLeave(
+assertThrows(GroupIdNotFoundException.class, () -> 
context.sendClassicGroupLeave(

Review Comment:
   i also thought we would convert it in GroupCoordinatorService. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16832: LeaveGroup API for upgrading ConsumerGroup [kafka]

2024-05-28 Thread via GitHub


dongnuo123 commented on code in PR #16057:
URL: https://github.com/apache/kafka/pull/16057#discussion_r1617763169


##
checkstyle/suppressions.xml:
##
@@ -347,7 +347,7 @@
 
 
+  files="(GroupMetadataManager|GroupMetadataManagerTest).java"/>

Review Comment:
   Yeah it's still needed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16832: LeaveGroup API for upgrading ConsumerGroup [kafka]

2024-05-28 Thread via GitHub


dajac commented on code in PR #16057:
URL: https://github.com/apache/kafka/pull/16057#discussion_r1617618237


##
checkstyle/suppressions.xml:
##
@@ -347,7 +347,7 @@
 
 
+  files="(GroupMetadataManager|GroupMetadataManagerTest).java"/>

Review Comment:
   Do we still need this exception?



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -1076,6 +1076,29 @@ public Map computeSubscribedTopicNames(
 return subscribedTopicNames;
 }
 
+/**
+ * Updates the subscription count with a set of members removed.
+ *
+ * @param removedMembersThe set of removed members.
+ *
+ * @return Copy of the map of topics to the count of number of subscribers.
+ */
+public Map computeSubscribedTopicNames(

Review Comment:
   nit: Should we add a unit test for this new method in ConsumerGroupTest?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16832: LeaveGroup API for upgrading ConsumerGroup [kafka]

2024-05-28 Thread via GitHub


dongnuo123 commented on code in PR #16057:
URL: https://github.com/apache/kafka/pull/16057#discussion_r1617332456


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -8695,7 +8691,7 @@ public void testLeaveGroupInvalidGroup() {
 .build();
 context.createClassicGroup("group-id");
 
-assertThrows(UnknownMemberIdException.class, () -> 
context.sendClassicGroupLeave(
+assertThrows(GroupIdNotFoundException.class, () -> 
context.sendClassicGroupLeave(

Review Comment:
   Yes it makes sense. I thought we wanted to address these GroupIdNotFound 
together somewhere else but that might not be possible



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16832: LeaveGroup API for upgrading ConsumerGroup [kafka]

2024-05-24 Thread via GitHub


dajac commented on code in PR #16057:
URL: https://github.com/apache/kafka/pull/16057#discussion_r1613991045


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -4424,14 +4419,124 @@ private ConsumerGroupMember 
validateConsumerGroupMember(
  * @param contextThe request context.
  * @param requestThe actual LeaveGroup request.
  *
+ * @return The LeaveGroup response and the records to append.
+ */
+public CoordinatorResult 
classicGroupLeave(
+RequestContext context,
+LeaveGroupRequestData request
+) throws UnknownMemberIdException, GroupIdNotFoundException {
+Group group = group(request.groupId());

Review Comment:
   ditto.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -4424,14 +4419,124 @@ private ConsumerGroupMember 
validateConsumerGroupMember(
  * @param contextThe request context.
  * @param requestThe actual LeaveGroup request.
  *
+ * @return The LeaveGroup response and the records to append.
+ */
+public CoordinatorResult 
classicGroupLeave(
+RequestContext context,
+LeaveGroupRequestData request
+) throws UnknownMemberIdException, GroupIdNotFoundException {
+Group group = group(request.groupId());
+
+if (group.type() == CLASSIC) {
+return classicGroupLeaveToClassicGroup((ClassicGroup) group, 
context, request);
+} else {
+return classicGroupLeaveToConsumerGroup((ConsumerGroup) group, 
context, request);
+}
+}
+
+/**
+ * Handle a classic LeaveGroupRequest to a ConsumerGroup.
+ *
+ * @param group  The ConsumerGroup.
+ * @param contextThe request context.
+ * @param requestThe actual LeaveGroup request.
+ *
+ * @return The LeaveGroup response and the records to append.
+ */
+private CoordinatorResult 
classicGroupLeaveToConsumerGroup(
+ConsumerGroup group,
+RequestContext context,
+LeaveGroupRequestData request
+) throws UnknownMemberIdException {
+String groupId = group.groupId();
+List memberResponses = new ArrayList<>();
+Set validLeaveGroupMembers = new HashSet<>();
+List records = new ArrayList<>();
+
+for (MemberIdentity memberIdentity : request.members()) {
+String memberId = memberIdentity.memberId();
+String instanceId = memberIdentity.groupInstanceId();
+String reason = memberIdentity.reason() != null ? 
memberIdentity.reason() : "not provided";
+
+ConsumerGroupMember member;
+try {
+if (instanceId == null) {
+member = group.getOrMaybeCreateMember(memberId, false);
+throwIfMemberDoesNotUseClassicProtocol(member);
+
+log.info("[Group {}] Dynamic Member {} has left group " +
+"through explicit `LeaveGroup` request; client 
reason: {}",
+groupId, memberId, reason);
+} else {
+member = group.staticMember(instanceId);
+throwIfStaticMemberIsUnknown(member, instanceId);
+// The LeaveGroup API allows administrative removal of 
members by GroupInstanceId
+// in which case we expect the MemberId to be undefined.
+if (!UNKNOWN_MEMBER_ID.equals(memberId)) {
+throwIfInstanceIdIsFenced(member, groupId, memberId, 
instanceId);
+}
+throwIfMemberDoesNotUseClassicProtocol(member);
+
+memberId = member.memberId();
+log.info("[Group {}] Static Member {} with instance id {} 
has left group " +
+"through explicit `LeaveGroup` request; client 
reason: {}",
+groupId, memberId, instanceId, reason);
+}
+
+removeMember(records, groupId, memberId);
+cancelTimers(groupId, memberId);
+memberResponses.add(
+new MemberResponse()
+.setMemberId(memberId)
+.setGroupInstanceId(instanceId)
+);
+validLeaveGroupMembers.add(member);
+} catch (KafkaException e) {
+memberResponses.add(
+new MemberResponse()
+.setMemberId(memberId)
+.setGroupInstanceId(instanceId)
+.setErrorCode(Errors.forException(e).code())
+);
+}
+}
+
+if (!records.isEmpty()) {
+// Maybe update the subscription metadata.
+Map subscriptionMetadata = 
group.computeSubscriptionMetadata(
+group.

Re: [PR] KAFKA-16832: LeaveGroup API for upgrading ConsumerGroup [kafka]

2024-05-24 Thread via GitHub


dajac commented on code in PR #16057:
URL: https://github.com/apache/kafka/pull/16057#discussion_r1613943967


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -8695,7 +8691,7 @@ public void testLeaveGroupInvalidGroup() {
 .build();
 context.createClassicGroup("group-id");
 
-assertThrows(UnknownMemberIdException.class, () -> 
context.sendClassicGroupLeave(
+assertThrows(GroupIdNotFoundException.class, () -> 
context.sendClassicGroupLeave(

Review Comment:
   This is incorrect. We need to keep the previous exception in all these cases.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16832: LeaveGroup API for upgrading ConsumerGroup [kafka]

2024-05-24 Thread via GitHub


dajac commented on code in PR #16057:
URL: https://github.com/apache/kafka/pull/16057#discussion_r1613943409


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -4249,13 +4249,7 @@ public CoordinatorResult classicGroupH
 RequestContext context,
 HeartbeatRequestData request
 ) {
-Group group = groups.get(request.groupId(), Long.MAX_VALUE);
-
-if (group == null) {
-throw new UnknownMemberIdException(
-String.format("Group %s not found.", request.groupId())
-);
-}

Review Comment:
   Do we need to catch the exception and to rethrow unknown member exception?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16832: LeaveGroup API for upgrading ConsumerGroup [kafka]

2024-05-24 Thread via GitHub


dongnuo123 commented on code in PR #16057:
URL: https://github.com/apache/kafka/pull/16057#discussion_r1613887650


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -4424,14 +4425,128 @@ private ConsumerGroupMember 
validateConsumerGroupMember(
  * @param contextThe request context.
  * @param requestThe actual LeaveGroup request.
  *
+ * @return The LeaveGroup response and the records to append.
+ */
+public CoordinatorResult 
classicGroupLeave(
+RequestContext context,
+LeaveGroupRequestData request
+) throws UnknownMemberIdException, GroupIdNotFoundException {
+Group group = groups.get(request.groupId(), Long.MAX_VALUE);
+
+if (group == null) {
+throw new UnknownMemberIdException(String.format("Group %s not 
found.", request.groupId()));
+}
+
+if (group.type() == CLASSIC) {
+return classicGroupLeaveToClassicGroup((ClassicGroup) group, 
context, request);
+} else {
+return classicGroupLeaveToConsumerGroup((ConsumerGroup) group, 
context, request);
+}
+}
+
+/**
+ * Handle a classic LeaveGroupRequest to a ConsumerGroup.
+ *
+ * @param group  The ConsumerGroup.
+ * @param contextThe request context.
+ * @param requestThe actual LeaveGroup request.
+ *
+ * @return The LeaveGroup response and the records to append.
+ */
+private CoordinatorResult 
classicGroupLeaveToConsumerGroup(
+ConsumerGroup group,
+RequestContext context,
+LeaveGroupRequestData request
+) throws UnknownMemberIdException {
+String groupId = group.groupId();
+List memberResponses = new ArrayList<>();
+Set validLeaveGroupMembers = new HashSet<>();
+List records = new ArrayList<>();
+
+for (MemberIdentity memberIdentity: request.members()) {
+String memberId = memberIdentity.memberId();
+String instanceId = memberIdentity.groupInstanceId();
+String reason = memberIdentity.reason() != null ? 
memberIdentity.reason() : "not provided";
+
+ConsumerGroupMember member;
+try {
+if (instanceId == null) {
+member = group.getOrMaybeCreateMember(memberId, false);
+throwIfMemberDoesNotUseClassicProtocol(member);
+
+log.info("[Group {}] Dynamic Member {} has left group " +
+"through explicit `LeaveGroup` request; client 
reason: {}",
+groupId, memberId, reason);
+} else {
+member = group.staticMember(instanceId);
+throwIfStaticMemberIsUnknown(member, instanceId);
+// The LeaveGroup API allows administrative removal of 
members by GroupInstanceId
+// in which case we expect the MemberId to be undefined.
+if (!UNKNOWN_MEMBER_ID.equals(memberId)) {
+throwIfInstanceIdIsFenced(member, groupId, memberId, 
instanceId);
+}
+throwIfMemberDoesNotUseClassicProtocol(member);
+
+memberId = member.memberId();
+log.info("[Group {}] Static Member {} with instance id {} 
has left group " +
+"through explicit `LeaveGroup` request; client 
reason: {}",
+groupId, memberId, instanceId, reason);
+}
+
+removeMember(records, groupId, memberId);
+cancelTimers(groupId, memberId);
+memberResponses.add(
+new MemberResponse()
+.setMemberId(memberId)
+.setGroupInstanceId(instanceId)
+);
+validLeaveGroupMembers.add(member);
+} catch (KafkaException e) {
+memberResponses.add(
+new MemberResponse()
+.setMemberId(memberId)
+.setGroupInstanceId(instanceId)
+.setErrorCode(Errors.forException(e).code())
+);
+}
+}
+
+if (!records.isEmpty()) {
+// Maybe update the subscription metadata.
+Map subscriptionMetadata = 
group.computeSubscriptionMetadata(
+group.computeSubscribedTopicNames(new 
ArrayList<>(validLeaveGroupMembers)),

Review Comment:
   It was because `validLeaveGroupMembers` is a set, but I feel it may be 
better to directly make `computeSubscribedTopicNames` take a set



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service,

Re: [PR] KAFKA-16832: LeaveGroup API for upgrading ConsumerGroup [kafka]

2024-05-24 Thread via GitHub


jeffkbkim commented on code in PR #16057:
URL: https://github.com/apache/kafka/pull/16057#discussion_r1613845801


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -12801,6 +12801,365 @@ public void 
testConsumerGroupMemberUsingClassicProtocolFencedWhenJoinTimeout() {
 );
 }
 
+@Test
+public void testConsumerGroupMemberUsingClassicProtocolBatchLeaveGroup() {
+String groupId = "group-id";
+String memberId1 = Uuid.randomUuid().toString();
+String memberId2 = Uuid.randomUuid().toString();
+String memberId3 = Uuid.randomUuid().toString();
+String instanceId2 = "instance-id-2";
+String instanceId3 = "instance-id-3";
+
+Uuid fooTopicId = Uuid.randomUuid();
+String fooTopicName = "foo";
+Uuid barTopicId = Uuid.randomUuid();
+String barTopicName = "bar";
+
+List protocol1 = 
Collections.singletonList(
+new ConsumerGroupMemberMetadataValue.ClassicProtocol()
+.setName("range")
+
.setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription(new 
ConsumerPartitionAssignor.Subscription(
+Arrays.asList(fooTopicName, barTopicName),
+null,
+Collections.singletonList(new TopicPartition(fooTopicName, 
0))
+
+);
+List protocol2 = 
Collections.singletonList(
+new ConsumerGroupMemberMetadataValue.ClassicProtocol()
+.setName("range")
+
.setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription(new 
ConsumerPartitionAssignor.Subscription(
+Arrays.asList(fooTopicName, barTopicName),
+null,
+Collections.singletonList(new TopicPartition(fooTopicName, 
1))
+
+);
+
+ConsumerGroupMember member1 = new 
ConsumerGroupMember.Builder(memberId1)
+.setState(MemberState.STABLE)
+.setMemberEpoch(10)
+.setPreviousMemberEpoch(9)
+.setClientId("client")
+.setClientHost("localhost/127.0.0.1")
+.setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+.setServerAssignorName("range")
+.setRebalanceTimeoutMs(45000)
+.setClassicMemberMetadata(
+new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
+.setSessionTimeoutMs(5000)
+.setSupportedProtocols(protocol1)
+)
+.setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId, 
0)))
+.build();
+ConsumerGroupMember member2 = new 
ConsumerGroupMember.Builder(memberId2)
+.setInstanceId(instanceId2)
+.setState(MemberState.STABLE)
+.setMemberEpoch(9)
+.setPreviousMemberEpoch(8)
+.setClientId("client")
+.setClientHost("localhost/127.0.0.1")
+.setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+.setServerAssignorName("range")
+.setRebalanceTimeoutMs(45000)
+.setClassicMemberMetadata(
+new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
+.setSessionTimeoutMs(5000)
+.setSupportedProtocols(protocol2)
+)
+.setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId, 
1)))
+.build();
+ConsumerGroupMember member3 = new 
ConsumerGroupMember.Builder(memberId3)
+.setInstanceId(instanceId3)
+.setState(MemberState.STABLE)
+.setMemberEpoch(10)
+.setPreviousMemberEpoch(9)
+.setClientId("client")
+.setClientHost("localhost/127.0.0.1")
+.setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+.setServerAssignorName("range")
+.setRebalanceTimeoutMs(45000)
+.setAssignedPartitions(mkAssignment(mkTopicAssignment(barTopicId, 
0)))
+.build();
+
+// Consumer group with three members.
+// Dynamic member 1 uses the classic protocol;
+// static member 2 uses the classic protocol;

Review Comment:
   nit: can we change the ";"s to "."s?



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -4424,14 +4425,128 @@ private ConsumerGroupMember 
validateConsumerGroupMember(
  * @param contextThe request context.
  * @param requestThe actual LeaveGroup request.
  *
+ * @return The LeaveGroup response and the records to append.
+ */
+public CoordinatorResult 
classicGroupLeave(
+RequestContext context,
+LeaveGroupRequestData request
+) throws UnknownMemberIdException, GroupIdNotFoundException {
+Group group = groups.get(request.groupId(), Long.MAX_VALU

Re: [PR] KAFKA-16832: LeaveGroup API for upgrading ConsumerGroup [kafka]

2024-05-24 Thread via GitHub


dajac commented on code in PR #16057:
URL: https://github.com/apache/kafka/pull/16057#discussion_r1613854091


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -4424,14 +4441,113 @@ private ConsumerGroupMember 
validateConsumerGroupMember(
  * @param contextThe request context.
  * @param requestThe actual LeaveGroup request.
  *
+ * @return The LeaveGroup response and the records to append.
+ */
+public CoordinatorResult 
classicGroupLeave(
+RequestContext context,
+LeaveGroupRequestData request
+) throws UnknownMemberIdException, GroupIdNotFoundException {
+Group group = groups.get(request.groupId(), Long.MAX_VALUE);
+
+if (group == null) {
+throw new UnknownMemberIdException(String.format("Group %s not 
found.", request.groupId()));
+}

Review Comment:
   Yes. I will update it. @dongnuo123 Could you please change it here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16832: LeaveGroup API for upgrading ConsumerGroup [kafka]

2024-05-24 Thread via GitHub


jeffkbkim commented on code in PR #16057:
URL: https://github.com/apache/kafka/pull/16057#discussion_r1613835643


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -4424,14 +4441,113 @@ private ConsumerGroupMember 
validateConsumerGroupMember(
  * @param contextThe request context.
  * @param requestThe actual LeaveGroup request.
  *
+ * @return The LeaveGroup response and the records to append.
+ */
+public CoordinatorResult 
classicGroupLeave(
+RequestContext context,
+LeaveGroupRequestData request
+) throws UnknownMemberIdException, GroupIdNotFoundException {
+Group group = groups.get(request.groupId(), Long.MAX_VALUE);
+
+if (group == null) {
+throw new UnknownMemberIdException(String.format("Group %s not 
found.", request.groupId()));
+}

Review Comment:
   should we hold onto https://github.com/apache/kafka/pull/16073 then?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16832: LeaveGroup API for upgrading ConsumerGroup [kafka]

2024-05-24 Thread via GitHub


jeffkbkim commented on code in PR #16057:
URL: https://github.com/apache/kafka/pull/16057#discussion_r1613834993


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -4424,14 +4441,113 @@ private ConsumerGroupMember 
validateConsumerGroupMember(
  * @param contextThe request context.
  * @param requestThe actual LeaveGroup request.
  *
+ * @return The LeaveGroup response and the records to append.
+ */
+public CoordinatorResult 
classicGroupLeave(
+RequestContext context,
+LeaveGroupRequestData request
+) throws UnknownMemberIdException, GroupIdNotFoundException {
+Group group = groups.get(request.groupId(), Long.MAX_VALUE);
+
+if (group == null) {
+throw new UnknownMemberIdException(String.format("Group %s not 
found.", request.groupId()));
+}

Review Comment:
   I like that approach. It's even more confusing now since we have mixed groups



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16832: LeaveGroup API for upgrading ConsumerGroup [kafka]

2024-05-24 Thread via GitHub


dajac commented on code in PR #16057:
URL: https://github.com/apache/kafka/pull/16057#discussion_r1613785754


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -4424,14 +4441,113 @@ private ConsumerGroupMember 
validateConsumerGroupMember(
  * @param contextThe request context.
  * @param requestThe actual LeaveGroup request.
  *
+ * @return The LeaveGroup response and the records to append.
+ */
+public CoordinatorResult 
classicGroupLeave(
+RequestContext context,
+LeaveGroupRequestData request
+) throws UnknownMemberIdException, GroupIdNotFoundException {
+Group group = groups.get(request.groupId(), Long.MAX_VALUE);
+
+if (group == null) {
+throw new UnknownMemberIdException(String.format("Group %s not 
found.", request.groupId()));
+}

Review Comment:
   Perhaps, a better strategy overall would be to always use 
`GroupIdNotFoundException` internally and to translate it to 
`UnknownMemberIdException` where needed (e.g. by catching and rethrowing) with 
a comment explaining why.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16832: LeaveGroup API for upgrading ConsumerGroup [kafka]

2024-05-24 Thread via GitHub


dongnuo123 commented on code in PR #16057:
URL: https://github.com/apache/kafka/pull/16057#discussion_r1613748822


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -4424,14 +4441,113 @@ private ConsumerGroupMember 
validateConsumerGroupMember(
  * @param contextThe request context.
  * @param requestThe actual LeaveGroup request.
  *
+ * @return The LeaveGroup response and the records to append.
+ */
+public CoordinatorResult 
classicGroupLeave(
+RequestContext context,
+LeaveGroupRequestData request
+) throws UnknownMemberIdException, GroupIdNotFoundException {
+Group group = groups.get(request.groupId(), Long.MAX_VALUE);
+
+if (group == null) {
+throw new UnknownMemberIdException(String.format("Group %s not 
found.", request.groupId()));
+}

Review Comment:
   Yeah, it's counterintuitive..



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16832: LeaveGroup API for upgrading ConsumerGroup [kafka]

2024-05-24 Thread via GitHub


dongnuo123 commented on code in PR #16057:
URL: https://github.com/apache/kafka/pull/16057#discussion_r1613724622


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -4424,14 +4441,113 @@ private ConsumerGroupMember 
validateConsumerGroupMember(
  * @param contextThe request context.
  * @param requestThe actual LeaveGroup request.
  *
+ * @return The LeaveGroup response and the records to append.
+ */
+public CoordinatorResult 
classicGroupLeave(
+RequestContext context,
+LeaveGroupRequestData request
+) throws UnknownMemberIdException, GroupIdNotFoundException {
+Group group = groups.get(request.groupId(), Long.MAX_VALUE);
+
+if (group == null) {
+throw new UnknownMemberIdException(String.format("Group %s not 
found.", request.groupId()));
+}
+
+if (group.type() == CLASSIC) {
+return classicGroupLeaveToClassicGroup((ClassicGroup) group, 
context, request);
+} else {
+return classicGroupLeaveToConsumerGroup((ConsumerGroup) group, 
context, request);
+}
+}
+
+/**
+ * Handle a classic LeaveGroupRequest to a ConsumerGroup.
+ *
+ * @param group  The ConsumerGroup.
+ * @param contextThe request context.
+ * @param requestThe actual LeaveGroup request.
+ *
+ * @return The LeaveGroup response and the records to append.
+ */
+private CoordinatorResult 
classicGroupLeaveToConsumerGroup(
+ConsumerGroup group,
+RequestContext context,
+LeaveGroupRequestData request
+) throws UnknownMemberIdException, GroupIdNotFoundException {
+List memberResponses = new ArrayList<>();
+List records = new ArrayList<>();
+boolean hasValidLeaveGroupMember = false;
+
+for (MemberIdentity memberIdentity: request.members()) {
+String memberId = memberIdentity.memberId();
+String instanceId = memberIdentity.groupInstanceId();
+String reason = memberIdentity.reason() != null ? 
memberIdentity.reason() : "not provided";
+
+ConsumerGroupMember member;
+try {
+if (instanceId == null) {
+member = group.getOrMaybeCreateMember(memberId, false);
+throwIfMemberDoesNotUseClassicProtocol(member);
+
+log.info("[Group {}] Static Member {} has left group " +
+"through explicit `LeaveGroup` request; client 
reason: {}",
+group.groupId(), memberId, reason);
+} else {
+member = group.staticMember(instanceId);
+throwIfStaticMemberIsUnknown(member, memberId);

Review Comment:
   Oh yes, thanks for the catch!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16832: LeaveGroup API for upgrading ConsumerGroup [kafka]

2024-05-24 Thread via GitHub


jeffkbkim commented on code in PR #16057:
URL: https://github.com/apache/kafka/pull/16057#discussion_r1613555271


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -4424,14 +4441,113 @@ private ConsumerGroupMember 
validateConsumerGroupMember(
  * @param contextThe request context.
  * @param requestThe actual LeaveGroup request.
  *
+ * @return The LeaveGroup response and the records to append.
+ */
+public CoordinatorResult 
classicGroupLeave(
+RequestContext context,
+LeaveGroupRequestData request
+) throws UnknownMemberIdException, GroupIdNotFoundException {
+Group group = groups.get(request.groupId(), Long.MAX_VALUE);
+
+if (group == null) {
+throw new UnknownMemberIdException(String.format("Group %s not 
found.", request.groupId()));
+}

Review Comment:
   i'm guessing we don't use
   ```
   Group group = group(request.memberId());
   ```
   as we throw unknown member id for the old coordinator, is this correct? it 
seems very counterintuitive unfortunately



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -4424,14 +4441,113 @@ private ConsumerGroupMember 
validateConsumerGroupMember(
  * @param contextThe request context.
  * @param requestThe actual LeaveGroup request.
  *
+ * @return The LeaveGroup response and the records to append.
+ */
+public CoordinatorResult 
classicGroupLeave(
+RequestContext context,
+LeaveGroupRequestData request
+) throws UnknownMemberIdException, GroupIdNotFoundException {
+Group group = groups.get(request.groupId(), Long.MAX_VALUE);
+
+if (group == null) {
+throw new UnknownMemberIdException(String.format("Group %s not 
found.", request.groupId()));
+}
+
+if (group.type() == CLASSIC) {
+return classicGroupLeaveToClassicGroup((ClassicGroup) group, 
context, request);
+} else {
+return classicGroupLeaveToConsumerGroup((ConsumerGroup) group, 
context, request);
+}
+}
+
+/**
+ * Handle a classic LeaveGroupRequest to a ConsumerGroup.
+ *
+ * @param group  The ConsumerGroup.
+ * @param contextThe request context.
+ * @param requestThe actual LeaveGroup request.
+ *
+ * @return The LeaveGroup response and the records to append.
+ */
+private CoordinatorResult 
classicGroupLeaveToConsumerGroup(
+ConsumerGroup group,
+RequestContext context,
+LeaveGroupRequestData request
+) throws UnknownMemberIdException, GroupIdNotFoundException {
+List memberResponses = new ArrayList<>();
+List records = new ArrayList<>();
+boolean hasValidLeaveGroupMember = false;
+
+for (MemberIdentity memberIdentity: request.members()) {
+String memberId = memberIdentity.memberId();
+String instanceId = memberIdentity.groupInstanceId();
+String reason = memberIdentity.reason() != null ? 
memberIdentity.reason() : "not provided";
+
+ConsumerGroupMember member;
+try {
+if (instanceId == null) {
+member = group.getOrMaybeCreateMember(memberId, false);
+throwIfMemberDoesNotUseClassicProtocol(member);
+
+log.info("[Group {}] Static Member {} has left group " +

Review Comment:
   nit: this should not be "Static Member"



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -4424,14 +4441,113 @@ private ConsumerGroupMember 
validateConsumerGroupMember(
  * @param contextThe request context.
  * @param requestThe actual LeaveGroup request.
  *
+ * @return The LeaveGroup response and the records to append.
+ */
+public CoordinatorResult 
classicGroupLeave(
+RequestContext context,
+LeaveGroupRequestData request
+) throws UnknownMemberIdException, GroupIdNotFoundException {
+Group group = groups.get(request.groupId(), Long.MAX_VALUE);
+
+if (group == null) {
+throw new UnknownMemberIdException(String.format("Group %s not 
found.", request.groupId()));
+}
+
+if (group.type() == CLASSIC) {
+return classicGroupLeaveToClassicGroup((ClassicGroup) group, 
context, request);
+} else {
+return classicGroupLeaveToConsumerGroup((ConsumerGroup) group, 
context, request);
+}
+}
+
+/**
+ * Handle a classic LeaveGroupRequest to a ConsumerGroup.
+ *
+ * @param group  The ConsumerGroup.
+ * @param contextThe request context.
+ * @param requestThe actual LeaveGroup request.
+ *
+ * @return The LeaveGroup response and the records

Re: [PR] KAFKA-16832: LeaveGroup API for upgrading ConsumerGroup [kafka]

2024-05-24 Thread via GitHub


dajac commented on code in PR #16057:
URL: https://github.com/apache/kafka/pull/16057#discussion_r1613089003


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -2041,6 +2027,36 @@ private  CoordinatorResult 
consumerGroupFenceMember(
 }
 }
 
+/**
+ * Remove the member and maybe update the subscription metadata without 
the removed member.
+ *
+ * @param group The ConsumerGroup.
+ * @param memberThe ConsumerGroupMember.
+ * @return The list of CoordinatorRecord.
+ */
+private List 
removeMemberAndMaybeUpdateSubscriptionMetadata(
+ConsumerGroup group,
+ConsumerGroupMember member
+) {
+List records = new ArrayList<>();
+removeMember(records, group.groupId(), member.memberId());
+
+// We update the subscription metadata without the leaving member.
+Map subscriptionMetadata = 
group.computeSubscriptionMetadata(
+group.computeSubscribedTopicNames(member, null),
+metadataImage.topics(),
+metadataImage.cluster()
+);
+
+if (!subscriptionMetadata.equals(group.subscriptionMetadata())) {
+log.info("[GroupId {}] Computed new subscription metadata: {}.",
+group.groupId(), subscriptionMetadata);
+records.add(newGroupSubscriptionMetadataRecord(group.groupId(), 
subscriptionMetadata));
+}

Review Comment:
   I think that we should rather do this once, after all the members are 
processed. I suppose that we could have a method like 
`computeSubscribedTopicNames` but which takes a list of members to remove.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -4424,14 +4441,113 @@ private ConsumerGroupMember 
validateConsumerGroupMember(
  * @param contextThe request context.
  * @param requestThe actual LeaveGroup request.
  *
+ * @return The LeaveGroup response and the records to append.
+ */
+public CoordinatorResult 
classicGroupLeave(
+RequestContext context,
+LeaveGroupRequestData request
+) throws UnknownMemberIdException, GroupIdNotFoundException {
+Group group = groups.get(request.groupId(), Long.MAX_VALUE);
+
+if (group == null) {
+throw new UnknownMemberIdException(String.format("Group %s not 
found.", request.groupId()));
+}
+
+if (group.type() == CLASSIC) {
+return classicGroupLeaveToClassicGroup((ClassicGroup) group, 
context, request);
+} else {
+return classicGroupLeaveToConsumerGroup((ConsumerGroup) group, 
context, request);
+}
+}
+
+/**
+ * Handle a classic LeaveGroupRequest to a ConsumerGroup.
+ *
+ * @param group  The ConsumerGroup.
+ * @param contextThe request context.
+ * @param requestThe actual LeaveGroup request.
+ *
+ * @return The LeaveGroup response and the records to append.
+ */
+private CoordinatorResult 
classicGroupLeaveToConsumerGroup(
+ConsumerGroup group,
+RequestContext context,
+LeaveGroupRequestData request
+) throws UnknownMemberIdException, GroupIdNotFoundException {
+List memberResponses = new ArrayList<>();
+List records = new ArrayList<>();
+boolean hasValidLeaveGroupMember = false;
+
+for (MemberIdentity memberIdentity: request.members()) {
+String memberId = memberIdentity.memberId();
+String instanceId = memberIdentity.groupInstanceId();
+String reason = memberIdentity.reason() != null ? 
memberIdentity.reason() : "not provided";
+
+ConsumerGroupMember member;
+try {
+if (instanceId == null) {
+member = group.getOrMaybeCreateMember(memberId, false);
+throwIfMemberDoesNotUseClassicProtocol(member);
+
+log.info("[Group {}] Static Member {} has left group " +
+"through explicit `LeaveGroup` request; client 
reason: {}",
+group.groupId(), memberId, reason);
+} else {
+member = group.staticMember(instanceId);
+throwIfStaticMemberIsUnknown(member, memberId);
+// The LeaveGroup API allows administrative removal of 
members by GroupInstanceId
+// in which case we expect the MemberId to be undefined.
+if (!UNKNOWN_MEMBER_ID.equals(memberId)) {
+throwIfInstanceIdIsFenced(member, group.groupId(), 
memberId, instanceId);
+}
+throwIfMemberDoesNotUseClassicProtocol(member);
+
+log.info("[Group {}] Static Member {} with instance id {} 
has left group " +
+"through explicit `Leav

Re: [PR] KAFKA-16832: LeaveGroup API for upgrading ConsumerGroup [kafka]

2024-05-23 Thread via GitHub


dongnuo123 commented on code in PR #16057:
URL: https://github.com/apache/kafka/pull/16057#discussion_r1612296899


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -4424,14 +4441,113 @@ private ConsumerGroupMember 
validateConsumerGroupMember(
  * @param contextThe request context.
  * @param requestThe actual LeaveGroup request.
  *
+ * @return The LeaveGroup response and the records to append.
+ */
+public CoordinatorResult 
classicGroupLeave(
+RequestContext context,
+LeaveGroupRequestData request
+) throws UnknownMemberIdException, GroupIdNotFoundException {
+Group group = groups.get(request.groupId(), Long.MAX_VALUE);
+
+if (group == null) {
+throw new UnknownMemberIdException(String.format("Group %s not 
found.", request.groupId()));
+}
+
+if (group.type() == CLASSIC) {
+return classicGroupLeaveToClassicGroup((ClassicGroup) group, 
context, request);
+} else {
+return classicGroupLeaveToConsumerGroup((ConsumerGroup) group, 
context, request);
+}
+}
+
+/**
+ * Handle a classic LeaveGroupRequest to a ConsumerGroup.
+ *
+ * @param group  The ConsumerGroup.
+ * @param contextThe request context.
+ * @param requestThe actual LeaveGroup request.
+ *
+ * @return The LeaveGroup response and the records to append.
+ */
+private CoordinatorResult 
classicGroupLeaveToConsumerGroup(
+ConsumerGroup group,
+RequestContext context,
+LeaveGroupRequestData request
+) throws UnknownMemberIdException, GroupIdNotFoundException {
+List memberResponses = new ArrayList<>();
+List records = new ArrayList<>();
+boolean hasValidLeaveGroupMember = false;
+
+for (MemberIdentity memberIdentity: request.members()) {
+String memberId = memberIdentity.memberId();
+String instanceId = memberIdentity.groupInstanceId();
+String reason = memberIdentity.reason() != null ? 
memberIdentity.reason() : "not provided";
+
+ConsumerGroupMember member;
+try {
+if (instanceId == null) {
+member = group.getOrMaybeCreateMember(memberId, false);
+throwIfMemberDoesNotUseClassicProtocol(member);
+
+log.info("[Group {}] Static Member {} has left group " +
+"through explicit `LeaveGroup` request; client 
reason: {}",
+group.groupId(), memberId, reason);
+} else {
+member = group.staticMember(instanceId);
+throwIfStaticMemberIsUnknown(member, memberId);
+// The LeaveGroup API allows administrative removal of 
members by GroupInstanceId
+// in which case we expect the MemberId to be undefined.
+if (!UNKNOWN_MEMBER_ID.equals(memberId)) {
+throwIfInstanceIdIsFenced(member, group.groupId(), 
memberId, instanceId);
+}
+throwIfMemberDoesNotUseClassicProtocol(member);
+
+log.info("[Group {}] Static Member {} with instance id {} 
has left group " +
+"through explicit `LeaveGroup` request; client 
reason: {}",
+group.groupId(), memberId, instanceId, reason);
+}
+
+
records.addAll(removeMemberAndMaybeUpdateSubscriptionMetadata(group, member));
+cancelTimers(group.groupId(), member.memberId());

Review Comment:
   This will not be reverted if the replay/persistence fails. Is this something 
we want to address with the snapshottable timer?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16832: LeaveGroup API for upgrading ConsumerGroup [kafka]

2024-05-23 Thread via GitHub


dongnuo123 opened a new pull request, #16057:
URL: https://github.com/apache/kafka/pull/16057

   This patch implements the LeaveGroup api to the consumer groups that are in 
the mixed mode.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org