Re: [PR] MINOR: Add type check to classic group timeout operations [kafka]

2024-04-10 Thread via GitHub


dajac merged PR #15587:
URL: https://github.com/apache/kafka/pull/15587


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Add type check to classic group timeout operations [kafka]

2024-04-10 Thread via GitHub


dajac commented on code in PR #15587:
URL: https://github.com/apache/kafka/pull/15587#discussion_r1558986456


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -2415,6 +2415,22 @@ private CoordinatorResult 
classicGroupJoinExistingMember(
 return EMPTY_RESULT;
 }
 
+/**
+ * An overload of {@link 
GroupMetadataManager#completeClassicGroupJoin(ClassicGroup)} used as
+ * timeout operation. It additionally looks up the group by the id and 
checks the group type.
+ * completeClassicGroupJoin will only be called if the group is CLASSIC.
+ */
+private CoordinatorResult completeClassicGroupJoin(String 
groupId) {
+ClassicGroup group;
+try {
+group = getOrMaybeCreateClassicGroup(groupId, false);
+} catch (UnknownMemberIdException | GroupIdNotFoundException 
exception) {

Review Comment:
   Filed https://issues.apache.org/jira/browse/KAFKA-16503.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Add type check to classic group timeout operations [kafka]

2024-04-09 Thread via GitHub


dajac commented on code in PR #15587:
URL: https://github.com/apache/kafka/pull/15587#discussion_r1558005191


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -2415,6 +2415,22 @@ private CoordinatorResult 
classicGroupJoinExistingMember(
 return EMPTY_RESULT;
 }
 
+/**
+ * An overload of {@link 
GroupMetadataManager#completeClassicGroupJoin(ClassicGroup)} used as
+ * timeout operation. It additionally looks up the group by the id and 
checks the group type.
+ * completeClassicGroupJoin will only be called if the group is CLASSIC.
+ */
+private CoordinatorResult completeClassicGroupJoin(String 
groupId) {
+ClassicGroup group;
+try {
+group = getOrMaybeCreateClassicGroup(groupId, false);
+} catch (UnknownMemberIdException | GroupIdNotFoundException 
exception) {

Review Comment:
   This makes sense. My concern is more on the join-group/sync-group paths. We 
use `getOrMaybeCreateClassicGroup` there and don't seem to convert 
`GroupIdNotFoundException`. Do we need to?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Add type check to classic group timeout operations [kafka]

2024-04-09 Thread via GitHub


jeffkbkim commented on code in PR #15587:
URL: https://github.com/apache/kafka/pull/15587#discussion_r1557990553


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -2415,6 +2415,22 @@ private CoordinatorResult 
classicGroupJoinExistingMember(
 return EMPTY_RESULT;
 }
 
+/**
+ * An overload of {@link 
GroupMetadataManager#completeClassicGroupJoin(ClassicGroup)} used as
+ * timeout operation. It additionally looks up the group by the id and 
checks the group type.
+ * completeClassicGroupJoin will only be called if the group is CLASSIC.
+ */
+private CoordinatorResult completeClassicGroupJoin(String 
groupId) {
+ClassicGroup group;
+try {
+group = getOrMaybeCreateClassicGroup(groupId, false);
+} catch (UnknownMemberIdException | GroupIdNotFoundException 
exception) {

Review Comment:
   The existing coordinator completes responses with UNKNOWN_MEMBER_ID instead 
of GROUP_ID_NOT_FOUND for join and sync. DeleteGroup and DeleteOffset requests 
are completed with GROUP_ID_NOT_FOUND if the group does not exist.
   
   So just returning null and handling them separately for each API might be 
correct. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Add type check to classic group timeout operations [kafka]

2024-04-09 Thread via GitHub


dajac commented on code in PR #15587:
URL: https://github.com/apache/kafka/pull/15587#discussion_r1557570573


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -2415,6 +2415,22 @@ private CoordinatorResult 
classicGroupJoinExistingMember(
 return EMPTY_RESULT;
 }
 
+/**
+ * An overload of {@link 
GroupMetadataManager#completeClassicGroupJoin(ClassicGroup)} used as
+ * timeout operation. It additionally looks up the group by the id and 
checks the group type.
+ * completeClassicGroupJoin will only be called if the group is CLASSIC.
+ */
+private CoordinatorResult completeClassicGroupJoin(String 
groupId) {
+ClassicGroup group;
+try {
+group = getOrMaybeCreateClassicGroup(groupId, false);
+} catch (UnknownMemberIdException | GroupIdNotFoundException 
exception) {

Review Comment:
   I actually wonder whether raising `GroupIdNotFoundException` in 
`getOrMaybeCreateClassicGroup` is correct. It is likely not expected on the 
join-group and the sync-group paths. @jeffkbkim What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Add type check to classic group timeout operations [kafka]

2024-04-09 Thread via GitHub


dajac commented on code in PR #15587:
URL: https://github.com/apache/kafka/pull/15587#discussion_r1557570573


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -2415,6 +2415,22 @@ private CoordinatorResult 
classicGroupJoinExistingMember(
 return EMPTY_RESULT;
 }
 
+/**
+ * An overload of {@link 
GroupMetadataManager#completeClassicGroupJoin(ClassicGroup)} used as
+ * timeout operation. It additionally looks up the group by the id and 
checks the group type.
+ * completeClassicGroupJoin will only be called if the group is CLASSIC.
+ */
+private CoordinatorResult completeClassicGroupJoin(String 
groupId) {
+ClassicGroup group;
+try {
+group = getOrMaybeCreateClassicGroup(groupId, false);
+} catch (UnknownMemberIdException | GroupIdNotFoundException 
exception) {

Review Comment:
   I actually wonder whether raising `GroupIdNotFoundException` in 
`getOrMaybeCreateClassicGroup` is correct. It is likely not expected on the 
join-group and the sync-group paths. @jeffkbkim What do you think? We don't 
have to address this in this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Add type check to classic group timeout operations [kafka]

2024-04-07 Thread via GitHub


dongnuo123 commented on code in PR #15587:
URL: https://github.com/apache/kafka/pull/15587#discussion_r1555019741


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -2415,6 +2415,20 @@ private CoordinatorResult 
classicGroupJoinExistingMember(
 return EMPTY_RESULT;
 }
 
+/**
+ * An overload of {@link 
GroupMetadataManager#completeClassicGroupJoin(ClassicGroup)} used as
+ * timeout operation. It additionally looks up the group by the id and 
checks the group type.
+ * completeClassicGroupJoin will only be called if the group is CLASSIC.
+ */
+private CoordinatorResult completeClassicGroupJoin(String 
groupId) {
+if (containsClassicGroup(groupId)) {
+return 
completeClassicGroupJoin(getOrMaybeCreateClassicGroup(groupId, false));

Review Comment:
   Yes I agree. Let me use the try catch



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Add type check to classic group timeout operations [kafka]

2024-04-05 Thread via GitHub


dajac commented on code in PR #15587:
URL: https://github.com/apache/kafka/pull/15587#discussion_r1553499653


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -2415,6 +2415,20 @@ private CoordinatorResult 
classicGroupJoinExistingMember(
 return EMPTY_RESULT;
 }
 
+/**
+ * An overload of {@link 
GroupMetadataManager#completeClassicGroupJoin(ClassicGroup)} used as
+ * timeout operation. It additionally looks up the group by the id and 
checks the group type.
+ * completeClassicGroupJoin will only be called if the group is CLASSIC.
+ */
+private CoordinatorResult completeClassicGroupJoin(String 
groupId) {
+if (containsClassicGroup(groupId)) {
+return 
completeClassicGroupJoin(getOrMaybeCreateClassicGroup(groupId, false));

Review Comment:
   I am not a fan of this pattern because you effectively have to look up the 
group twice. One option would be to use a try..catch to catch the exception 
thrown by getOrMaybeCreateClassicGroup. Another option would be to 1) do the 
lookup, 2) verify non-null and group type and return if it fails.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -2415,6 +2415,20 @@ private CoordinatorResult 
classicGroupJoinExistingMember(
 return EMPTY_RESULT;
 }
 
+/**
+ * An overload of {@link 
GroupMetadataManager#completeClassicGroupJoin(ClassicGroup)} used as
+ * timeout operation. It additionally looks up the group by the id and 
checks the group type.
+ * completeClassicGroupJoin will only be called if the group is CLASSIC.
+ */
+private CoordinatorResult completeClassicGroupJoin(String 
groupId) {
+if (containsClassicGroup(groupId)) {
+return 
completeClassicGroupJoin(getOrMaybeCreateClassicGroup(groupId, false));
+} else {
+log.info("Group {} is null or not a classic group, skipping 
rebalance stage.", groupId);

Review Comment:
   I wonder if we could use `debug` here.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -2805,31 +2826,36 @@ private CoordinatorResult 
maybeCompleteJoinElseSchedule(
  * Try to complete the join phase of the initial rebalance.
  * Otherwise, extend the rebalance.
  *
- * @param group The group under initial rebalance.
+ * @param groupId The group under initial rebalance.
  *
  * @return The coordinator result that will be appended to the log.
  */
 private CoordinatorResult 
tryCompleteInitialRebalanceElseSchedule(
-ClassicGroup group,
+String groupId,
 int delayMs,
 int remainingMs
 ) {
-if (group.newMemberAdded() && remainingMs != 0) {
-// A new member was added. Extend the delay.
-group.setNewMemberAdded(false);
-int newDelayMs = Math.min(classicGroupInitialRebalanceDelayMs, 
remainingMs);
-int newRemainingMs = Math.max(remainingMs - delayMs, 0);
-
-timer.schedule(
-classicGroupJoinKey(group.groupId()),
-newDelayMs,
-TimeUnit.MILLISECONDS,
-false,
-() -> tryCompleteInitialRebalanceElseSchedule(group, 
newDelayMs, newRemainingMs)
-);
+if (containsClassicGroup(groupId)) {

Review Comment:
   ditto.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -2533,45 +2547,52 @@ private void schedulePendingSync(ClassicGroup group) {
 group.rebalanceTimeoutMs(),
 TimeUnit.MILLISECONDS,
 false,
-() -> expirePendingSync(group, group.generationId()));
+() -> expirePendingSync(group.groupId(), group.generationId()));
 }
 
 /**
  * Invoked when the heartbeat operation is expired from the timer. 
Possibly remove the member and
  * try complete the join phase.
  *
- * @param group The group.
+ * @param groupId   The group id.
  * @param memberId  The member id.
  *
  * @return The coordinator result that will be appended to the log.
  */
 private CoordinatorResult expireClassicGroupMemberHeartbeat(
-ClassicGroup group,
+String groupId,
 String memberId
 ) {
-if (group.isInState(DEAD)) {
-log.info("Received notification of heartbeat expiration for member 
{} after group {} " +
-"had already been unloaded or deleted.",
-memberId, group.groupId());
-} else if (group.isPendingMember(memberId)) {
-log.info("Pending member {} in group {} has been removed after 
session timeout expiration.",
-memberId, group.groupId());
-
-return removePendingMemberAndUpdateClassicGroup(group,