ableegoldman commented on a change in pull request #8834:
URL: https://github.com/apache/kafka/pull/8834#discussion_r477663328



##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -528,7 +528,6 @@ public void onFailure(RuntimeException e) {
                 }
 
                 private void recordRebalanceFailure() {
-                    state = MemberState.UNJOINED;

Review comment:
       Did you mean to say `recordRebalanceFailure` or is this comment just out 
of date after the latest changes?

##########
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##########
@@ -639,7 +641,11 @@ class GroupCoordinator(val brokerId: Int,
               responseCallback(Errors.UNKNOWN_MEMBER_ID)
 
             case CompletingRebalance =>
-                responseCallback(Errors.REBALANCE_IN_PROGRESS)
+              // consumers may start sending heartbeat after join-group 
response, in which case
+              // we should treat them as normal hb request and reset the timer
+              val member = group.get(memberId)

Review comment:
       Wait, so before this the coordinator wouldn't complete the current 
heartbeat? Doesn't that mean that heartbeating is pointless until the rebalance 
completes? Obviously that doesn't line up with my observations since members 
were clearly getting kicked from the group before the rebalance had completed, 
so I must be missing something here

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -497,40 +501,18 @@ private synchronized void resetStateAndRejoin() {
             joinFuture.addListener(new RequestFutureListener<ByteBuffer>() {
                 @Override
                 public void onSuccess(ByteBuffer value) {
-                    // handle join completion in the callback so that the 
callback will be invoked

Review comment:
       I assume you mean the JoinGroup response handler 🙂 

##########
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##########
@@ -639,7 +641,11 @@ class GroupCoordinator(val brokerId: Int,
               responseCallback(Errors.UNKNOWN_MEMBER_ID)
 
             case CompletingRebalance =>
-                responseCallback(Errors.REBALANCE_IN_PROGRESS)

Review comment:
       Honestly, if we have to keep handling the `REBALANCE_IN_PROGRESS` error 
on the client side anyway, then maybe it's best to keep things simple and just 
continue to send this in the response. Otherwise it just seems like asking for 
trouble if we have to consider different possible responses depending on which 
version of the broker the client is talking to.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -446,14 +453,15 @@ boolean joinGroupIfNeeded(final Timer timer) {
                     resetJoinGroupFuture();
                     needsJoinPrepare = true;
                 } else {
-                    log.info("Generation data was cleared by heartbeat thread. 
Initiating rejoin.");
+                    log.info("Generation data was cleared by heartbeat thread 
to {} and state is now {} before " +
+                         "the rebalance callback is triggered, marking this 
rebalance as failed and retry",
+                         generation, state);
                     resetStateAndRejoin();
                     resetJoinGroupFuture();
-                    return false;
                 }
             } else {
                 final RuntimeException exception = future.exception();
-                log.info("Join group failed with {}", exception.toString());
+                log.info("Rebalance failed with {}", exception.toString());

Review comment:
       Can we still specify that we failed during the JoinGroup? eg `Rebalance 
failed on JoinGroup with {}` or something




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to