guozhangwang commented on a change in pull request #8834: URL: https://github.com/apache/kafka/pull/8834#discussion_r476065182
########## File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala ########## @@ -287,7 +287,7 @@ class GroupCoordinator(val brokerId: Int, group.currentState match { case PreparingRebalance => - updateMemberAndRebalance(group, member, protocols, responseCallback) + updateMemberAndRebalance(group, member, protocols, s"Member ${member.memberId} joining group during ${group.currentState}", responseCallback) Review comment: Yes, it only contains logging changes. But I will make some non logging changes later and will mark it explicitly. ########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ########## @@ -528,7 +528,6 @@ public void onFailure(RuntimeException e) { } private void recordRebalanceFailure() { - state = MemberState.UNJOINED; Review comment: Yes I agree, I think we should just let the heartbeat thread access the state itself and then based on that decide whether or not to send heartbeats, I will update this logic. ########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ########## @@ -1069,6 +1069,13 @@ private HeartbeatResponseHandler(final Generation generation) { public void handle(HeartbeatResponse heartbeatResponse, RequestFuture<Void> future) { sensors.heartbeatSensor.record(response.requestLatencyMs()); Errors error = heartbeatResponse.error(); + + if (state != MemberState.STABLE) { Review comment: My thoughts were that, when we are in rebalancing then the purpose of heartbeat is only to keep the consumer alive at the broker side, not to take any instructions. But I think it should be handled case-by-case, I will try to refactor this piece a bit as well. ########## File path: clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java ########## @@ -604,6 +605,25 @@ public void testSyncGroupIllegalGenerationResponseWithOldGeneration() throws Int assertEquals(newGen, coordinator.generation()); } + @Test + public void testHeartbeatSentWhenRebalancing() throws Exception { + setupCoordinator(); + joinGroup(); + + final AbstractCoordinator.Generation currGen = coordinator.generation(); + + coordinator.setNewState(AbstractCoordinator.MemberState.REBALANCING); + + // the heartbeat thread should be sent out during a rebalance + mockTime.sleep(HEARTBEAT_INTERVAL_MS); + TestUtils.waitForCondition(() -> !mockClient.requests().isEmpty(), 2000, + "The heartbeat request was not sent"); + assertTrue(coordinator.heartbeat().hasInflight()); + + mockClient.respond(heartbeatResponse(Errors.REBALANCE_IN_PROGRESS)); Review comment: Actually we do not need to :) ########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ########## @@ -528,7 +528,6 @@ public void onFailure(RuntimeException e) { } private void recordRebalanceFailure() { Review comment: Yup, I will just inline this then. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org