guozhangwang commented on a change in pull request #8834:
URL: https://github.com/apache/kafka/pull/8834#discussion_r476950994



##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -483,12 +492,7 @@ private synchronized void resetStateAndRejoin() {
         // rebalance in the call to poll below. This ensures that we do not 
mistakenly attempt
         // to rejoin before the pending rebalance has completed.
         if (joinFuture == null) {
-            // fence off the heartbeat thread explicitly so that it cannot 
interfere with the join group.
-            // Note that this must come after the call to onJoinPrepare since 
we must be able to continue
-            // sending heartbeats if that callback takes some time.
-            disableHeartbeatThread();

Review comment:
       We do not need to explicitly disable heartbeat thread since when the 
state is transited to PREPARING_REBALANCE, the thread would disable itself in 
the next iteration.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -326,8 +331,9 @@ protected synchronized void pollHeartbeat(long now) {
     }
 
     protected synchronized long timeToNextHeartbeat(long now) {
-        // if we have not joined the group, we don't need to send heartbeats
-        if (state == MemberState.UNJOINED)
+        // if we have not joined the group or we are preparing rebalance,

Review comment:
       This is the major fix 2) in description.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -917,17 +938,14 @@ private synchronized void resetGeneration() {
     synchronized void resetGenerationOnResponseError(ApiKeys api, Errors 
error) {
         log.debug("Resetting generation after encountering {} from {} response 
and requesting re-join", error, api);
 
-        // only reset the state to un-joined when it is not already in 
rebalancing

Review comment:
       We do not need this check any more since when we are only resetting 
generation if we see illegal generation or unknown member id, and in either 
case we should no longer heartbeats

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -497,40 +501,18 @@ private synchronized void resetStateAndRejoin() {
             joinFuture.addListener(new RequestFutureListener<ByteBuffer>() {
                 @Override
                 public void onSuccess(ByteBuffer value) {
-                    // handle join completion in the callback so that the 
callback will be invoked

Review comment:
       Moved this into sync-group handler for readability.

##########
File path: 
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
##########
@@ -1098,44 +1136,6 @@ public void 
testWakeupAfterJoinGroupReceivedExternalCompletion() throws Exceptio
         awaitFirstHeartbeat(heartbeatReceived);
     }
 
-    @Test
-    public void testWakeupAfterSyncGroupSent() throws Exception {

Review comment:
       This is now a redundant test.

##########
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##########
@@ -639,7 +641,11 @@ class GroupCoordinator(val brokerId: Int,
               responseCallback(Errors.UNKNOWN_MEMBER_ID)
 
             case CompletingRebalance =>
-                responseCallback(Errors.REBALANCE_IN_PROGRESS)
+              // consumers may start sending heartbeat after join-group 
response, in which case
+              // we should treat them as normal hb request and reset the timer
+              val member = group.get(memberId)

Review comment:
       This is the only logical change as 3) in the description. All others are 
logging changes.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -1311,9 +1324,10 @@ public void run() {
                             continue;
                         }
 
-                        if (state != MemberState.STABLE) {
-                            // the group is not stable (perhaps because we 
left the group or because the coordinator
-                            // kicked us out), so disable heartbeats and wait 
for the main thread to rejoin.
+                        // we do not need to heartbeat we are not part of a 
group yet;
+                        // also if we already have fatal error, the client 
will be
+                        // crashed soon, hence we do not need to continue 
heartbeating either
+                        if (state.hasNotJoinedGroup() || hasFailed()) {

Review comment:
       This is the major fix 1).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to