Re: [PR] MINOR: Add type check to classic group timeout operations [kafka]
dajac merged PR #15587: URL: https://github.com/apache/kafka/pull/15587 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Add type check to classic group timeout operations [kafka]
dajac commented on code in PR #15587: URL: https://github.com/apache/kafka/pull/15587#discussion_r1558986456 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -2415,6 +2415,22 @@ private CoordinatorResult classicGroupJoinExistingMember( return EMPTY_RESULT; } +/** + * An overload of {@link GroupMetadataManager#completeClassicGroupJoin(ClassicGroup)} used as + * timeout operation. It additionally looks up the group by the id and checks the group type. + * completeClassicGroupJoin will only be called if the group is CLASSIC. + */ +private CoordinatorResult completeClassicGroupJoin(String groupId) { +ClassicGroup group; +try { +group = getOrMaybeCreateClassicGroup(groupId, false); +} catch (UnknownMemberIdException | GroupIdNotFoundException exception) { Review Comment: Filed https://issues.apache.org/jira/browse/KAFKA-16503. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Add type check to classic group timeout operations [kafka]
dajac commented on code in PR #15587: URL: https://github.com/apache/kafka/pull/15587#discussion_r1558005191 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -2415,6 +2415,22 @@ private CoordinatorResult classicGroupJoinExistingMember( return EMPTY_RESULT; } +/** + * An overload of {@link GroupMetadataManager#completeClassicGroupJoin(ClassicGroup)} used as + * timeout operation. It additionally looks up the group by the id and checks the group type. + * completeClassicGroupJoin will only be called if the group is CLASSIC. + */ +private CoordinatorResult completeClassicGroupJoin(String groupId) { +ClassicGroup group; +try { +group = getOrMaybeCreateClassicGroup(groupId, false); +} catch (UnknownMemberIdException | GroupIdNotFoundException exception) { Review Comment: This makes sense. My concern is more on the join-group/sync-group paths. We use `getOrMaybeCreateClassicGroup` there and don't seem to convert `GroupIdNotFoundException`. Do we need to? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Add type check to classic group timeout operations [kafka]
jeffkbkim commented on code in PR #15587: URL: https://github.com/apache/kafka/pull/15587#discussion_r1557990553 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -2415,6 +2415,22 @@ private CoordinatorResult classicGroupJoinExistingMember( return EMPTY_RESULT; } +/** + * An overload of {@link GroupMetadataManager#completeClassicGroupJoin(ClassicGroup)} used as + * timeout operation. It additionally looks up the group by the id and checks the group type. + * completeClassicGroupJoin will only be called if the group is CLASSIC. + */ +private CoordinatorResult completeClassicGroupJoin(String groupId) { +ClassicGroup group; +try { +group = getOrMaybeCreateClassicGroup(groupId, false); +} catch (UnknownMemberIdException | GroupIdNotFoundException exception) { Review Comment: The existing coordinator completes responses with UNKNOWN_MEMBER_ID instead of GROUP_ID_NOT_FOUND for join and sync. DeleteGroup and DeleteOffset requests are completed with GROUP_ID_NOT_FOUND if the group does not exist. So just returning null and handling them separately for each API might be correct. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Add type check to classic group timeout operations [kafka]
dajac commented on code in PR #15587: URL: https://github.com/apache/kafka/pull/15587#discussion_r1557570573 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -2415,6 +2415,22 @@ private CoordinatorResult classicGroupJoinExistingMember( return EMPTY_RESULT; } +/** + * An overload of {@link GroupMetadataManager#completeClassicGroupJoin(ClassicGroup)} used as + * timeout operation. It additionally looks up the group by the id and checks the group type. + * completeClassicGroupJoin will only be called if the group is CLASSIC. + */ +private CoordinatorResult completeClassicGroupJoin(String groupId) { +ClassicGroup group; +try { +group = getOrMaybeCreateClassicGroup(groupId, false); +} catch (UnknownMemberIdException | GroupIdNotFoundException exception) { Review Comment: I actually wonder whether raising `GroupIdNotFoundException` in `getOrMaybeCreateClassicGroup` is correct. It is likely not expected on the join-group and the sync-group paths. @jeffkbkim What do you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Add type check to classic group timeout operations [kafka]
dajac commented on code in PR #15587: URL: https://github.com/apache/kafka/pull/15587#discussion_r1557570573 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -2415,6 +2415,22 @@ private CoordinatorResult classicGroupJoinExistingMember( return EMPTY_RESULT; } +/** + * An overload of {@link GroupMetadataManager#completeClassicGroupJoin(ClassicGroup)} used as + * timeout operation. It additionally looks up the group by the id and checks the group type. + * completeClassicGroupJoin will only be called if the group is CLASSIC. + */ +private CoordinatorResult completeClassicGroupJoin(String groupId) { +ClassicGroup group; +try { +group = getOrMaybeCreateClassicGroup(groupId, false); +} catch (UnknownMemberIdException | GroupIdNotFoundException exception) { Review Comment: I actually wonder whether raising `GroupIdNotFoundException` in `getOrMaybeCreateClassicGroup` is correct. It is likely not expected on the join-group and the sync-group paths. @jeffkbkim What do you think? We don't have to address this in this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Add type check to classic group timeout operations [kafka]
dongnuo123 commented on code in PR #15587: URL: https://github.com/apache/kafka/pull/15587#discussion_r1555019741 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -2415,6 +2415,20 @@ private CoordinatorResult classicGroupJoinExistingMember( return EMPTY_RESULT; } +/** + * An overload of {@link GroupMetadataManager#completeClassicGroupJoin(ClassicGroup)} used as + * timeout operation. It additionally looks up the group by the id and checks the group type. + * completeClassicGroupJoin will only be called if the group is CLASSIC. + */ +private CoordinatorResult completeClassicGroupJoin(String groupId) { +if (containsClassicGroup(groupId)) { +return completeClassicGroupJoin(getOrMaybeCreateClassicGroup(groupId, false)); Review Comment: Yes I agree. Let me use the try catch -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Add type check to classic group timeout operations [kafka]
dajac commented on code in PR #15587: URL: https://github.com/apache/kafka/pull/15587#discussion_r1553499653 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -2415,6 +2415,20 @@ private CoordinatorResult classicGroupJoinExistingMember( return EMPTY_RESULT; } +/** + * An overload of {@link GroupMetadataManager#completeClassicGroupJoin(ClassicGroup)} used as + * timeout operation. It additionally looks up the group by the id and checks the group type. + * completeClassicGroupJoin will only be called if the group is CLASSIC. + */ +private CoordinatorResult completeClassicGroupJoin(String groupId) { +if (containsClassicGroup(groupId)) { +return completeClassicGroupJoin(getOrMaybeCreateClassicGroup(groupId, false)); Review Comment: I am not a fan of this pattern because you effectively have to look up the group twice. One option would be to use a try..catch to catch the exception thrown by getOrMaybeCreateClassicGroup. Another option would be to 1) do the lookup, 2) verify non-null and group type and return if it fails. ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -2415,6 +2415,20 @@ private CoordinatorResult classicGroupJoinExistingMember( return EMPTY_RESULT; } +/** + * An overload of {@link GroupMetadataManager#completeClassicGroupJoin(ClassicGroup)} used as + * timeout operation. It additionally looks up the group by the id and checks the group type. + * completeClassicGroupJoin will only be called if the group is CLASSIC. + */ +private CoordinatorResult completeClassicGroupJoin(String groupId) { +if (containsClassicGroup(groupId)) { +return completeClassicGroupJoin(getOrMaybeCreateClassicGroup(groupId, false)); +} else { +log.info("Group {} is null or not a classic group, skipping rebalance stage.", groupId); Review Comment: I wonder if we could use `debug` here. ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -2805,31 +2826,36 @@ private CoordinatorResult maybeCompleteJoinElseSchedule( * Try to complete the join phase of the initial rebalance. * Otherwise, extend the rebalance. * - * @param group The group under initial rebalance. + * @param groupId The group under initial rebalance. * * @return The coordinator result that will be appended to the log. */ private CoordinatorResult tryCompleteInitialRebalanceElseSchedule( -ClassicGroup group, +String groupId, int delayMs, int remainingMs ) { -if (group.newMemberAdded() && remainingMs != 0) { -// A new member was added. Extend the delay. -group.setNewMemberAdded(false); -int newDelayMs = Math.min(classicGroupInitialRebalanceDelayMs, remainingMs); -int newRemainingMs = Math.max(remainingMs - delayMs, 0); - -timer.schedule( -classicGroupJoinKey(group.groupId()), -newDelayMs, -TimeUnit.MILLISECONDS, -false, -() -> tryCompleteInitialRebalanceElseSchedule(group, newDelayMs, newRemainingMs) -); +if (containsClassicGroup(groupId)) { Review Comment: ditto. ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -2533,45 +2547,52 @@ private void schedulePendingSync(ClassicGroup group) { group.rebalanceTimeoutMs(), TimeUnit.MILLISECONDS, false, -() -> expirePendingSync(group, group.generationId())); +() -> expirePendingSync(group.groupId(), group.generationId())); } /** * Invoked when the heartbeat operation is expired from the timer. Possibly remove the member and * try complete the join phase. * - * @param group The group. + * @param groupId The group id. * @param memberId The member id. * * @return The coordinator result that will be appended to the log. */ private CoordinatorResult expireClassicGroupMemberHeartbeat( -ClassicGroup group, +String groupId, String memberId ) { -if (group.isInState(DEAD)) { -log.info("Received notification of heartbeat expiration for member {} after group {} " + -"had already been unloaded or deleted.", -memberId, group.groupId()); -} else if (group.isPendingMember(memberId)) { -log.info("Pending member {} in group {} has been removed after session timeout expiration.", -memberId, group.groupId()); - -return removePendingMemberAndUpdateClassicGroup(group,