guozhangwang commented on a change in pull request #8834: URL: https://github.com/apache/kafka/pull/8834#discussion_r476950994
########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ########## @@ -483,12 +492,7 @@ private synchronized void resetStateAndRejoin() { // rebalance in the call to poll below. This ensures that we do not mistakenly attempt // to rejoin before the pending rebalance has completed. if (joinFuture == null) { - // fence off the heartbeat thread explicitly so that it cannot interfere with the join group. - // Note that this must come after the call to onJoinPrepare since we must be able to continue - // sending heartbeats if that callback takes some time. - disableHeartbeatThread(); Review comment: We do not need to explicitly disable heartbeat thread since when the state is transited to PREPARING_REBALANCE, the thread would disable itself in the next iteration. ########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ########## @@ -326,8 +331,9 @@ protected synchronized void pollHeartbeat(long now) { } protected synchronized long timeToNextHeartbeat(long now) { - // if we have not joined the group, we don't need to send heartbeats - if (state == MemberState.UNJOINED) + // if we have not joined the group or we are preparing rebalance, Review comment: This is the major fix 2) in description. ########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ########## @@ -917,17 +938,14 @@ private synchronized void resetGeneration() { synchronized void resetGenerationOnResponseError(ApiKeys api, Errors error) { log.debug("Resetting generation after encountering {} from {} response and requesting re-join", error, api); - // only reset the state to un-joined when it is not already in rebalancing Review comment: We do not need this check any more since when we are only resetting generation if we see illegal generation or unknown member id, and in either case we should no longer heartbeats ########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ########## @@ -497,40 +501,18 @@ private synchronized void resetStateAndRejoin() { joinFuture.addListener(new RequestFutureListener<ByteBuffer>() { @Override public void onSuccess(ByteBuffer value) { - // handle join completion in the callback so that the callback will be invoked Review comment: Moved this into sync-group handler for readability. ########## File path: clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java ########## @@ -1098,44 +1136,6 @@ public void testWakeupAfterJoinGroupReceivedExternalCompletion() throws Exceptio awaitFirstHeartbeat(heartbeatReceived); } - @Test - public void testWakeupAfterSyncGroupSent() throws Exception { Review comment: This is now a redundant test. ########## File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala ########## @@ -639,7 +641,11 @@ class GroupCoordinator(val brokerId: Int, responseCallback(Errors.UNKNOWN_MEMBER_ID) case CompletingRebalance => - responseCallback(Errors.REBALANCE_IN_PROGRESS) + // consumers may start sending heartbeat after join-group response, in which case + // we should treat them as normal hb request and reset the timer + val member = group.get(memberId) Review comment: This is the only logical change as 3) in the description. All others are logging changes. ########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ########## @@ -1311,9 +1324,10 @@ public void run() { continue; } - if (state != MemberState.STABLE) { - // the group is not stable (perhaps because we left the group or because the coordinator - // kicked us out), so disable heartbeats and wait for the main thread to rejoin. + // we do not need to heartbeat we are not part of a group yet; + // also if we already have fatal error, the client will be + // crashed soon, hence we do not need to continue heartbeating either + if (state.hasNotJoinedGroup() || hasFailed()) { Review comment: This is the major fix 1). ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org