guozhangwang commented on a change in pull request #8834:
URL: https://github.com/apache/kafka/pull/8834#discussion_r476065182



##########
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##########
@@ -287,7 +287,7 @@ class GroupCoordinator(val brokerId: Int,
 
           group.currentState match {
             case PreparingRebalance =>
-              updateMemberAndRebalance(group, member, protocols, 
responseCallback)
+              updateMemberAndRebalance(group, member, protocols, s"Member 
${member.memberId} joining group during ${group.currentState}", 
responseCallback)

Review comment:
       Yes, it only contains logging changes. But I will make some non logging 
changes later and will mark it explicitly.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -528,7 +528,6 @@ public void onFailure(RuntimeException e) {
                 }
 
                 private void recordRebalanceFailure() {
-                    state = MemberState.UNJOINED;

Review comment:
       Yes I agree, I think we should just let the heartbeat thread access the 
state itself and then based on that decide whether or not to send heartbeats, I 
will update this logic.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -1069,6 +1069,13 @@ private HeartbeatResponseHandler(final Generation 
generation) {
         public void handle(HeartbeatResponse heartbeatResponse, 
RequestFuture<Void> future) {
             sensors.heartbeatSensor.record(response.requestLatencyMs());
             Errors error = heartbeatResponse.error();
+
+            if (state != MemberState.STABLE) {

Review comment:
       My thoughts were that, when we are in rebalancing then the purpose of 
heartbeat is only to keep the consumer alive at the broker side, not to take 
any instructions. But I think it should be handled case-by-case, I will try to 
refactor this piece a bit as well.

##########
File path: 
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
##########
@@ -604,6 +605,25 @@ public void 
testSyncGroupIllegalGenerationResponseWithOldGeneration() throws Int
         assertEquals(newGen, coordinator.generation());
     }
 
+    @Test
+    public void testHeartbeatSentWhenRebalancing() throws Exception {
+        setupCoordinator();
+        joinGroup();
+
+        final AbstractCoordinator.Generation currGen = 
coordinator.generation();
+
+        coordinator.setNewState(AbstractCoordinator.MemberState.REBALANCING);
+
+        // the heartbeat thread should be sent out during a rebalance
+        mockTime.sleep(HEARTBEAT_INTERVAL_MS);
+        TestUtils.waitForCondition(() -> !mockClient.requests().isEmpty(), 
2000,
+                "The heartbeat request was not sent");
+        assertTrue(coordinator.heartbeat().hasInflight());
+
+        mockClient.respond(heartbeatResponse(Errors.REBALANCE_IN_PROGRESS));

Review comment:
       Actually we do not need to :)

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -528,7 +528,6 @@ public void onFailure(RuntimeException e) {
                 }
 
                 private void recordRebalanceFailure() {

Review comment:
       Yup, I will just inline this then.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to