ableegoldman commented on a change in pull request #8834: URL: https://github.com/apache/kafka/pull/8834#discussion_r477663328
########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ########## @@ -528,7 +528,6 @@ public void onFailure(RuntimeException e) { } private void recordRebalanceFailure() { - state = MemberState.UNJOINED; Review comment: Did you mean to say `recordRebalanceFailure` or is this comment just out of date after the latest changes? ########## File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala ########## @@ -639,7 +641,11 @@ class GroupCoordinator(val brokerId: Int, responseCallback(Errors.UNKNOWN_MEMBER_ID) case CompletingRebalance => - responseCallback(Errors.REBALANCE_IN_PROGRESS) + // consumers may start sending heartbeat after join-group response, in which case + // we should treat them as normal hb request and reset the timer + val member = group.get(memberId) Review comment: Wait, so before this the coordinator wouldn't complete the current heartbeat? Doesn't that mean that heartbeating is pointless until the rebalance completes? Obviously that doesn't line up with my observations since members were clearly getting kicked from the group before the rebalance had completed, so I must be missing something here ########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ########## @@ -497,40 +501,18 @@ private synchronized void resetStateAndRejoin() { joinFuture.addListener(new RequestFutureListener<ByteBuffer>() { @Override public void onSuccess(ByteBuffer value) { - // handle join completion in the callback so that the callback will be invoked Review comment: I assume you mean the JoinGroup response handler 🙂 ########## File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala ########## @@ -639,7 +641,11 @@ class GroupCoordinator(val brokerId: Int, responseCallback(Errors.UNKNOWN_MEMBER_ID) case CompletingRebalance => - responseCallback(Errors.REBALANCE_IN_PROGRESS) Review comment: Honestly, if we have to keep handling the `REBALANCE_IN_PROGRESS` error on the client side anyway, then maybe it's best to keep things simple and just continue to send this in the response. Otherwise it just seems like asking for trouble if we have to consider different possible responses depending on which version of the broker the client is talking to. ########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ########## @@ -446,14 +453,15 @@ boolean joinGroupIfNeeded(final Timer timer) { resetJoinGroupFuture(); needsJoinPrepare = true; } else { - log.info("Generation data was cleared by heartbeat thread. Initiating rejoin."); + log.info("Generation data was cleared by heartbeat thread to {} and state is now {} before " + + "the rebalance callback is triggered, marking this rebalance as failed and retry", + generation, state); resetStateAndRejoin(); resetJoinGroupFuture(); - return false; } } else { final RuntimeException exception = future.exception(); - log.info("Join group failed with {}", exception.toString()); + log.info("Rebalance failed with {}", exception.toString()); Review comment: Can we still specify that we failed during the JoinGroup? eg `Rebalance failed on JoinGroup with {}` or something ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org