chia7712 commented on a change in pull request #8657: URL: https://github.com/apache/kafka/pull/8657#discussion_r430253921
########## File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala ########## @@ -769,20 +813,25 @@ class GroupCoordinator(val brokerId: Int, // on heartbeat response to eventually notify the rebalance in progress signal to the consumer val member = group.get(memberId) completeAndScheduleNextHeartbeatExpiration(group, member) - groupManager.storeOffsets(group, memberId, offsetMetadata, responseCallback) + partitionsToComplete ++= groupManager.storeOffsets( + group = group, + consumerId = memberId, + offsetMetadata = offsetMetadata, + responseCallback = responseCallback, + completeDelayedRequests = false) case CompletingRebalance => // We should not receive a commit request if the group has not completed rebalance; // but since the consumer's member.id and generation is valid, it means it has received // the latest group generation information from the JoinResponse. // So let's return a REBALANCE_IN_PROGRESS to let consumer handle it gracefully. responseCallback(offsetMetadata.map { case (k, _) => k -> Errors.REBALANCE_IN_PROGRESS }) - case _ => throw new RuntimeException(s"Logic error: unexpected group state ${group.currentState}") } } } + completeDelayedRequests(partitionsToComplete) Review comment: @junrao Could you take a look? I'd like to address @hachikuji comment but it produces a big change to this PR. Hence, it would be better to have more reviews/suggestions before kicking off. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org