This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.6 by this push:
     new 16ec179  KAFKA-10134: Enable heartbeat during PrepareRebalance and 
Depend On State For Poll Timeout (#8834)
16ec179 is described below

commit 16ec1793d53700623c9cb43e711f585aafd44dd4
Author: Guozhang Wang <wangg...@gmail.com>
AuthorDate: Thu Sep 10 14:34:38 2020 -0700

    KAFKA-10134: Enable heartbeat during PrepareRebalance and Depend On State 
For Poll Timeout (#8834)
    
    1. Split the consumer coordinator's REBALANCING state into 
PREPARING_REBALANCE and COMPLETING_REBALANCE. The first is when the join group 
request is sent, and the second is after the join group response is received. 
During the first state we should still not send hb since it shares the same 
socket with the join group request and the group coordinator has disabled 
timeout, however when we transit to the second state we should start sending hb 
in case leader's assign takes long time. Th [...]
    
    2. When deciding coordinator#timeToNextPoll, do not count in 
timeToNextHeartbeat if the state is in UNJOINED or PREPARING_REBALANCE since we 
would disable hb and hence its timer would not be updated.
    
    3. On the broker side, allow hb received during PREPARING_REBALANCE, return 
NONE error code instead of REBALANCE_IN_PROGRESS. However on client side, we 
still need to ignore REBALANCE_IN_PROGRESS if state is COMPLETING_REBALANCE in 
case it is talking to an old versioned broker.
    
    4. Piggy-backing a log4j improvement on the broker coordinator for 
triggering rebalance reason, as I found it a bit blurred during the 
investigation. Also subsumed #9038 with log4j improvements.
    
    The tricky part for allowing hb during COMPLETING_REBALANCE is in two 
parts: 1) before the sync-group response is received, a hb response may have 
reset the generation; also after the sync-group response but before the 
callback is triggered, a hb response can still reset the generation, we need to 
handle both cases by checking the generation / state. 2) with the hb thread 
enabled, the sync-group request may be sent by the hb thread even if the caller 
thread did not call poll yet.
    
    Reviewers: A. Sophie Blee-Goldman <sop...@confluent.io>, Boyang Chen 
<boy...@confluent.io>, John Roesler <j...@confluent.io>
---
 .../kafka/clients/consumer/KafkaConsumer.java      |  15 +-
 .../consumer/internals/AbstractCoordinator.java    | 205 +++++++++++----------
 .../consumer/internals/ConsumerCoordinator.java    |  19 +-
 .../clients/consumer/internals/Heartbeat.java      |  13 ++
 .../java/org/apache/kafka/clients/MockClient.java  |   8 +
 .../kafka/clients/consumer/KafkaConsumerTest.java  |  13 +-
 .../internals/AbstractCoordinatorTest.java         |  94 +++++-----
 .../internals/ConsumerCoordinatorTest.java         |  16 +-
 .../kafka/coordinator/group/GroupCoordinator.scala |  23 ++-
 .../coordinator/group/GroupCoordinatorTest.scala   |   2 +-
 10 files changed, 228 insertions(+), 180 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 33a2fbb..3a7339d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -1257,13 +1257,6 @@ public class KafkaConsumer<K, V> implements Consumer<K, 
V> {
         }
     }
 
-    /**
-     * Visible for testing
-     */
-    boolean updateAssignmentMetadataIfNeeded(final Timer timer) {
-        return updateAssignmentMetadataIfNeeded(timer, true);
-    }
-
     boolean updateAssignmentMetadataIfNeeded(final Timer timer, final boolean 
waitForJoinGroup) {
         if (coordinator != null && !coordinator.poll(timer, waitForJoinGroup)) 
{
             return false;
@@ -1297,6 +1290,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, 
V> {
             pollTimeout = retryBackoffMs;
         }
 
+        log.trace("Polling for fetches with timeout {}", pollTimeout);
+
         Timer pollTimer = time.timer(pollTimeout);
         client.poll(pollTimer, () -> {
             // since a fetch might be completed by the background thread, we 
need this poll condition
@@ -2478,8 +2473,12 @@ public class KafkaConsumer<K, V> implements Consumer<K, 
V> {
             offsetAndMetadata.leaderEpoch().ifPresent(epoch -> 
metadata.updateLastSeenEpochIfNewer(topicPartition, epoch));
     }
 
-    // Visible for testing
+    // Functions below are for testing only
     String getClientId() {
         return clientId;
     }
+
+    boolean updateAssignmentMetadataIfNeeded(final Timer timer) {
+        return updateAssignmentMetadataIfNeeded(timer, true);
+    }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 619146e..a565c21 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -113,18 +113,23 @@ public abstract class AbstractCoordinator implements 
Closeable {
     public static final int JOIN_GROUP_TIMEOUT_LAPSE = 5000;
 
     protected enum MemberState {
-        UNJOINED,    // the client is not part of a group
-        REBALANCING, // the client has begun rebalancing
-        STABLE,      // the client has joined and is sending heartbeats
+        UNJOINED,             // the client is not part of a group
+        PREPARING_REBALANCE,  // the client has sent the join group request, 
but have not received response
+        COMPLETING_REBALANCE, // the client has received join group response, 
but have not received assignment
+        STABLE;               // the client has joined and is sending 
heartbeats
+
+        public boolean hasNotJoinedGroup() {
+            return equals(UNJOINED) || equals(PREPARING_REBALANCE);
+        }
     }
 
     private final Logger log;
-    private final GroupCoordinatorMetrics sensors;
     private final Heartbeat heartbeat;
+    private final GroupCoordinatorMetrics sensors;
     private final GroupRebalanceConfig rebalanceConfig;
-    protected final ConsumerNetworkClient client;
+
     protected final Time time;
-    protected MemberState state = MemberState.UNJOINED;
+    protected final ConsumerNetworkClient client;
 
     private Node coordinator = null;
     private boolean rejoinNeeded = true;
@@ -137,6 +142,8 @@ public abstract class AbstractCoordinator implements 
Closeable {
     private long lastRebalanceStartMs = -1L;
     private long lastRebalanceEndMs = -1L;
 
+    protected MemberState state = MemberState.UNJOINED;
+
 
     /**
      * Initialize the coordination manager.
@@ -326,8 +333,9 @@ public abstract class AbstractCoordinator implements 
Closeable {
     }
 
     protected synchronized long timeToNextHeartbeat(long now) {
-        // if we have not joined the group, we don't need to send heartbeats
-        if (state == MemberState.UNJOINED)
+        // if we have not joined the group or we are preparing rebalance,
+        // we don't need to send heartbeats
+        if (state.hasNotJoinedGroup())
             return Long.MAX_VALUE;
         return heartbeat.timeToNextHeartbeat(now);
     }
@@ -366,13 +374,8 @@ public abstract class AbstractCoordinator implements 
Closeable {
         }
     }
 
-    private synchronized void disableHeartbeatThread() {
-        if (heartbeatThread != null)
-            heartbeatThread.disable();
-    }
-
     private void closeHeartbeatThread() {
-        HeartbeatThread thread = null;
+        HeartbeatThread thread;
         synchronized (this) {
             if (heartbeatThread == null)
                 return;
@@ -391,6 +394,12 @@ public abstract class AbstractCoordinator implements 
Closeable {
     /**
      * Joins the group without starting the heartbeat thread.
      *
+     * If this function returns true, the state must always be in STABLE and 
heartbeat enabled.
+     * If this function returns false, the state can be in one of the 
following:
+     *  * UNJOINED: got error response but times out before being able to 
re-join, heartbeat disabled
+     *  * PREPARING_REBALANCE: not yet received join-group response before 
timeout, heartbeat disabled
+     *  * COMPLETING_REBALANCE: not yet received sync-group response before 
timeout, hearbeat enabled
+     *
      * Visible for testing.
      *
      * @param timer Timer bounding how long this method can block
@@ -424,16 +433,18 @@ public abstract class AbstractCoordinator implements 
Closeable {
 
             if (future.succeeded()) {
                 Generation generationSnapshot;
+                MemberState stateSnapshot;
 
                 // Generation data maybe concurrently cleared by Heartbeat 
thread.
                 // Can't use synchronized for {@code onJoinComplete}, because 
it can be long enough
-                // and  shouldn't block heartbeat thread.
+                // and shouldn't block heartbeat thread.
                 // See {@link 
PlaintextConsumerTest#testMaxPollIntervalMsDelayInAssignment}
                 synchronized (AbstractCoordinator.this) {
                     generationSnapshot = this.generation;
+                    stateSnapshot = this.state;
                 }
 
-                if (generationSnapshot != Generation.NO_GENERATION) {
+                if (generationSnapshot != Generation.NO_GENERATION && 
stateSnapshot == MemberState.STABLE) {
                     // Duplicate the buffer in case `onJoinComplete` does not 
complete and needs to be retried.
                     ByteBuffer memberAssignment = future.value().duplicate();
 
@@ -446,14 +457,15 @@ public abstract class AbstractCoordinator implements 
Closeable {
                     resetJoinGroupFuture();
                     needsJoinPrepare = true;
                 } else {
-                    log.info("Generation data was cleared by heartbeat thread. 
Initiating rejoin.");
+                    log.info("Generation data was cleared by heartbeat thread 
to {} and state is now {} before " +
+                         "the rebalance callback is triggered, marking this 
rebalance as failed and retry",
+                         generationSnapshot, stateSnapshot);
                     resetStateAndRejoin();
                     resetJoinGroupFuture();
-                    return false;
                 }
             } else {
                 final RuntimeException exception = future.exception();
-                log.info("Join group failed with {}", exception.toString());
+                log.info("Rebalance failed.", exception);
                 resetJoinGroupFuture();
                 if (exception instanceof UnknownMemberIdException ||
                     exception instanceof RebalanceInProgressException ||
@@ -463,6 +475,7 @@ public abstract class AbstractCoordinator implements 
Closeable {
                 else if (!future.isRetriable())
                     throw exception;
 
+                resetStateAndRejoin();
                 timer.sleep(rebalanceConfig.retryBackoffMs);
             }
         }
@@ -473,22 +486,12 @@ public abstract class AbstractCoordinator implements 
Closeable {
         this.joinFuture = null;
     }
 
-    private synchronized void resetStateAndRejoin() {
-        rejoinNeeded = true;
-        state = MemberState.UNJOINED;
-    }
-
     private synchronized RequestFuture<ByteBuffer> initiateJoinGroup() {
         // we store the join future in case we are woken up by the user after 
beginning the
         // rebalance in the call to poll below. This ensures that we do not 
mistakenly attempt
         // to rejoin before the pending rebalance has completed.
         if (joinFuture == null) {
-            // fence off the heartbeat thread explicitly so that it cannot 
interfere with the join group.
-            // Note that this must come after the call to onJoinPrepare since 
we must be able to continue
-            // sending heartbeats if that callback takes some time.
-            disableHeartbeatThread();
-
-            state = MemberState.REBALANCING;
+            state = MemberState.PREPARING_REBALANCE;
             // a rebalance can be triggered consecutively if the previous one 
failed,
             // in this case we would not update the start time.
             if (lastRebalanceStartMs == -1L)
@@ -497,40 +500,18 @@ public abstract class AbstractCoordinator implements 
Closeable {
             joinFuture.addListener(new RequestFutureListener<ByteBuffer>() {
                 @Override
                 public void onSuccess(ByteBuffer value) {
-                    // handle join completion in the callback so that the 
callback will be invoked
-                    // even if the consumer is woken up before finishing the 
rebalance
-                    synchronized (AbstractCoordinator.this) {
-                        if (generation != Generation.NO_GENERATION) {
-                            log.info("Successfully joined group with 
generation {}", generation.generationId);
-                            state = MemberState.STABLE;
-                            rejoinNeeded = false;
-                            // record rebalance latency
-                            lastRebalanceEndMs = time.milliseconds();
-                            
sensors.successfulRebalanceSensor.record(lastRebalanceEndMs - 
lastRebalanceStartMs);
-                            lastRebalanceStartMs = -1L;
-
-                            if (heartbeatThread != null)
-                                heartbeatThread.enable();
-                        } else {
-                            log.info("Generation data was cleared by heartbeat 
thread. Rejoin failed.");
-                            recordRebalanceFailure();
-                        }
-                    }
+                    // do nothing since all the handler logic are in 
SyncGroupResponseHandler already
                 }
 
                 @Override
                 public void onFailure(RuntimeException e) {
                     // we handle failures below after the request finishes. if 
the join completes
-                    // after having been woken up, the exception is ignored 
and we will rejoin
+                    // after having been woken up, the exception is ignored 
and we will rejoin;
+                    // this can be triggered when either join or sync request 
failed
                     synchronized (AbstractCoordinator.this) {
-                        recordRebalanceFailure();
+                        sensors.failedRebalanceSensor.record();
                     }
                 }
-
-                private void recordRebalanceFailure() {
-                    state = MemberState.UNJOINED;
-                    sensors.failedRebalanceSensor.record();
-                }
             });
         }
         return joinFuture;
@@ -590,14 +571,24 @@ public abstract class AbstractCoordinator implements 
Closeable {
                     sensors.joinSensor.record(response.requestLatencyMs());
 
                     synchronized (AbstractCoordinator.this) {
-                        if (state != MemberState.REBALANCING) {
+                        if (state != MemberState.PREPARING_REBALANCE) {
                             // if the consumer was woken up before a rebalance 
completes, we may have already left
                             // the group. In this case, we do not want to 
continue with the sync group.
                             future.raise(new UnjoinedGroupException());
                         } else {
+                            state = MemberState.COMPLETING_REBALANCE;
+
+                            // we only need to enable heartbeat thread 
whenever we transit to
+                            // COMPLETING_REBALANCE state since we always 
transit from this state to STABLE
+                            if (heartbeatThread != null)
+                                heartbeatThread.enable();
+
                             AbstractCoordinator.this.generation = new 
Generation(
                                 joinResponse.data().generationId(),
                                 joinResponse.data().memberId(), 
joinResponse.data().protocolName());
+
+                            log.info("Successfully joined group with 
generation {}", AbstractCoordinator.this.generation);
+
                             if (joinResponse.isLeader()) {
                                 onJoinLeader(joinResponse).chain(future);
                             } else {
@@ -652,10 +643,10 @@ public abstract class AbstractCoordinator implements 
Closeable {
             } else if (error == Errors.MEMBER_ID_REQUIRED) {
                 // Broker requires a concrete member id to be allowed to join 
the group. Update member id
                 // and send another join group request in next cycle.
+                String memberId = joinResponse.data().memberId();
+                log.debug("Attempt to join group returned {} error. Will set 
the member id as {} and then rejoin", error, memberId);
                 synchronized (AbstractCoordinator.this) {
-                    AbstractCoordinator.this.generation = new 
Generation(OffsetCommitRequest.DEFAULT_GENERATION_ID,
-                            joinResponse.data().memberId(), null);
-                    AbstractCoordinator.this.resetStateAndRejoin();
+                    AbstractCoordinator.this.generation = new 
Generation(OffsetCommitRequest.DEFAULT_GENERATION_ID, memberId, null);
                 }
                 future.raise(error);
             } else {
@@ -736,12 +727,41 @@ public abstract class AbstractCoordinator implements 
Closeable {
                     log.error("SyncGroup failed due to inconsistent Protocol 
Type, received {} but expected {}",
                         syncResponse.data.protocolType(), protocolType());
                     future.raise(Errors.INCONSISTENT_GROUP_PROTOCOL);
-                } else if (isProtocolNameInconsistent(ApiKeys.SYNC_GROUP, 
syncResponse.data.protocolName())) {
-                    future.raise(Errors.INCONSISTENT_GROUP_PROTOCOL);
                 } else {
                     log.debug("Received successful SyncGroup response: {}", 
syncResponse);
                     sensors.syncSensor.record(response.requestLatencyMs());
-                    
future.complete(ByteBuffer.wrap(syncResponse.data.assignment()));
+
+                    synchronized (AbstractCoordinator.this) {
+                        if (generation != Generation.NO_GENERATION && state == 
MemberState.COMPLETING_REBALANCE) {
+                            // check protocol name only if the generation is 
not reset
+                            final String protocolName = 
syncResponse.data.protocolName();
+                            final boolean protocolNameInconsistent = 
protocolName != null &&
+                                !protocolName.equals(generation.protocolName);
+
+                            if (protocolNameInconsistent) {
+                                log.error("SyncGroup failed due to 
inconsistent Protocol Name, received {} but expected {}",
+                                    protocolName, generation.protocolName);
+
+                                
future.raise(Errors.INCONSISTENT_GROUP_PROTOCOL);
+                            } else {
+                                log.info("Successfully synced group in 
generation {}", generation);
+                                state = MemberState.STABLE;
+                                rejoinNeeded = false;
+                                // record rebalance latency
+                                lastRebalanceEndMs = time.milliseconds();
+                                
sensors.successfulRebalanceSensor.record(lastRebalanceEndMs - 
lastRebalanceStartMs);
+                                lastRebalanceStartMs = -1L;
+
+                                
future.complete(ByteBuffer.wrap(syncResponse.data.assignment()));
+                            }
+                        } else {
+                            log.info("Generation data was cleared by heartbeat 
thread as {} and state is now {} before " +
+                                "received SyncGroup response, marking this 
rebalance as failed and retry",
+                                generation, state);
+                            // use ILLEGAL_GENERATION error code to let it 
retry immediately
+                            future.raise(Errors.ILLEGAL_GENERATION);
+                        }
+                    }
                 }
             } else {
                 requestRejoin();
@@ -902,35 +922,33 @@ public abstract class AbstractCoordinator implements 
Closeable {
     }
 
     protected synchronized boolean rebalanceInProgress() {
-        return this.state == MemberState.REBALANCING;
+        return this.state == MemberState.PREPARING_REBALANCE || this.state == 
MemberState.COMPLETING_REBALANCE;
     }
 
     protected synchronized String memberId() {
         return generation.memberId;
     }
 
-    private synchronized void resetGeneration() {
-        rejoinNeeded = true;
+    private synchronized void resetState() {
+        state = MemberState.UNJOINED;
         generation = Generation.NO_GENERATION;
     }
 
+    private synchronized void resetStateAndRejoin() {
+        resetState();
+        rejoinNeeded = true;
+    }
+
     synchronized void resetGenerationOnResponseError(ApiKeys api, Errors 
error) {
         log.debug("Resetting generation after encountering {} from {} response 
and requesting re-join", error, api);
 
-        // only reset the state to un-joined when it is not already in 
rebalancing
-        if (state != MemberState.REBALANCING)
-            state = MemberState.UNJOINED;
-
-        resetGeneration();
+        resetState();
     }
 
     synchronized void resetGenerationOnLeaveGroup() {
         log.debug("Resetting generation due to consumer pro-actively leaving 
the group");
 
-        // always set the state to un-joined
-        state = MemberState.UNJOINED;
-
-        resetGeneration();
+        resetStateAndRejoin();
     }
 
     public synchronized void requestRejoin() {
@@ -941,19 +959,6 @@ public abstract class AbstractCoordinator implements 
Closeable {
         return protocolType != null && !protocolType.equals(protocolType());
     }
 
-    private boolean isProtocolNameInconsistent(ApiKeys key, String 
protocolName) {
-        final Generation currentGeneration = generation();
-        final boolean protocolNameInconsistent = protocolName != null &&
-            currentGeneration != Generation.NO_GENERATION &&
-            !protocolName.equals(currentGeneration.protocolName);
-
-        if (protocolNameInconsistent) {
-            log.error("{} failed due to inconsistent Protocol Name, received 
{} but expected {}",
-                key, protocolName, currentGeneration.protocolName);
-        }
-        return protocolNameInconsistent;
-    }
-
     /**
      * Close the coordinator, waiting if needed to send LeaveGroup.
      */
@@ -1069,6 +1074,7 @@ public abstract class AbstractCoordinator implements 
Closeable {
         public void handle(HeartbeatResponse heartbeatResponse, 
RequestFuture<Void> future) {
             sensors.heartbeatSensor.record(response.requestLatencyMs());
             Errors error = heartbeatResponse.error();
+
             if (error == Errors.NONE) {
                 log.debug("Received successful Heartbeat response");
                 future.complete(null);
@@ -1079,9 +1085,16 @@ public abstract class AbstractCoordinator implements 
Closeable {
                 markCoordinatorUnknown();
                 future.raise(error);
             } else if (error == Errors.REBALANCE_IN_PROGRESS) {
-                log.info("Attempt to heartbeat failed since group is 
rebalancing");
-                requestRejoin();
-                future.raise(error);
+                // since we may be sending the request during rebalance, we 
should check
+                // this case and ignore the REBALANCE_IN_PROGRESS error
+                if (state == MemberState.STABLE) {
+                    log.info("Attempt to heartbeat failed since group is 
rebalancing");
+                    requestRejoin();
+                    future.raise(error);
+                } else {
+                    log.debug("Ignoring heartbeat response with error {} 
during {} state", error, state);
+                    future.complete(null);
+                }
             } else if (error == Errors.ILLEGAL_GENERATION ||
                        error == Errors.UNKNOWN_MEMBER_ID ||
                        error == Errors.FENCED_INSTANCE_ID) {
@@ -1260,7 +1273,7 @@ public abstract class AbstractCoordinator implements 
Closeable {
     private class HeartbeatThread extends KafkaThread implements AutoCloseable 
{
         private boolean enabled = false;
         private boolean closed = false;
-        private AtomicReference<RuntimeException> failed = new 
AtomicReference<>(null);
+        private final AtomicReference<RuntimeException> failed = new 
AtomicReference<>(null);
 
         private HeartbeatThread() {
             super(HEARTBEAT_THREAD_PREFIX + (rebalanceConfig.groupId.isEmpty() 
? "" : " | " + rebalanceConfig.groupId), true);
@@ -1311,9 +1324,10 @@ public abstract class AbstractCoordinator implements 
Closeable {
                             continue;
                         }
 
-                        if (state != MemberState.STABLE) {
-                            // the group is not stable (perhaps because we 
left the group or because the coordinator
-                            // kicked us out), so disable heartbeats and wait 
for the main thread to rejoin.
+                        // we do not need to heartbeat we are not part of a 
group yet;
+                        // also if we already have fatal error, the client 
will be
+                        // crashed soon, hence we do not need to continue 
heartbeating either
+                        if (state.hasNotJoinedGroup() || hasFailed()) {
                             disable();
                             continue;
                         }
@@ -1366,7 +1380,6 @@ public abstract class AbstractCoordinator implements 
Closeable {
                                         } else if (e instanceof 
FencedInstanceIdException) {
                                             log.error("Caught fenced 
group.instance.id {} error in heartbeat thread", 
rebalanceConfig.groupInstanceId);
                                             heartbeatThread.failed.set(e);
-                                            heartbeatThread.disable();
                                         } else {
                                             heartbeat.failHeartbeat();
                                             // wake up the thread if it's 
sleeping to reschedule the heartbeat
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 9a932f9..89c9d5e 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -452,11 +452,6 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
         }
     }
 
-    // for testing
-    boolean poll(Timer timer) {
-        return poll(timer, true);
-    }
-
     /**
      * Poll for coordinator events. This ensures that the coordinator is known 
and that the consumer
      * has joined the group (if it is using group management). This also 
handles periodic offset commits
@@ -511,6 +506,10 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
 
                 // if not wait for join group, we would just use a timer of 0
                 if (!ensureActiveGroup(waitForJoinGroup ? timer : 
time.timer(0L))) {
+                    // since we may use a different timer in the callee, we'd 
still need
+                    // to update the original timer's current time after the 
call
+                    timer.update(time.milliseconds());
+
                     return false;
                 }
             }
@@ -532,7 +531,7 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
     }
 
     /**
-     * Return the time to the next needed invocation of {@link #poll(Timer)}.
+     * Return the time to the next needed invocation of {@link 
ConsumerNetworkClient#poll(Timer)}.
      * @param now current time in milliseconds
      * @return the maximum time in milliseconds the caller should wait before 
the next invocation of poll()
      */
@@ -1213,7 +1212,7 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
                             if (generationUnchanged()) {
                                 future.raise(error);
                             } else {
-                                if (ConsumerCoordinator.this.state == 
MemberState.REBALANCING) {
+                                if (ConsumerCoordinator.this.state == 
MemberState.PREPARING_REBALANCE) {
                                     future.raise(new 
RebalanceInProgressException("Offset commit cannot be completed since the " +
                                         "consumer member's old generation is 
fenced by its group instance id, it is possible that " +
                                         "this consumer has already 
participated another rebalance and got a new generation"));
@@ -1242,7 +1241,7 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
 
                             // only need to reset generation and re-join group 
if generation has not changed or we are not in rebalancing;
                             // otherwise only raise rebalance-in-progress error
-                            if (!generationUnchanged() && 
ConsumerCoordinator.this.state == MemberState.REBALANCING) {
+                            if (!generationUnchanged() && 
ConsumerCoordinator.this.state == MemberState.PREPARING_REBALANCE) {
                                 future.raise(new 
RebalanceInProgressException("Offset commit cannot be completed since the " +
                                     "consumer member's generation is already 
stale, meaning it has already participated another rebalance and " +
                                     "got a new generation. You can try 
completing the rebalance by calling poll() and then retry commit again"));
@@ -1459,4 +1458,8 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
     RebalanceProtocol getProtocol() {
         return protocol;
     }
+
+    boolean poll(Timer timer) {
+        return poll(timer, true);
+    }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
index 2e9a5ad..dfb9f85 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
@@ -17,9 +17,12 @@
 package org.apache.kafka.clients.consumer.internals;
 
 import org.apache.kafka.clients.GroupRebalanceConfig;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Timer;
 
+import org.slf4j.Logger;
+
 /**
  * A helper class for managing the heartbeat to the coordinator
  */
@@ -30,6 +33,7 @@ public final class Heartbeat {
     private final Timer heartbeatTimer;
     private final Timer sessionTimer;
     private final Timer pollTimer;
+    private final Logger log;
 
     private volatile long lastHeartbeatSend = 0L;
     private volatile boolean heartbeatInFlight = false;
@@ -44,6 +48,9 @@ public final class Heartbeat {
         this.sessionTimer = time.timer(config.sessionTimeoutMs);
         this.maxPollIntervalMs = config.rebalanceTimeoutMs;
         this.pollTimer = time.timer(maxPollIntervalMs);
+
+        final LogContext logContext = new LogContext("[Heartbeat groupID=" + 
config.groupId + "] ");
+        this.log = logContext.logger(getClass());
     }
 
     private void update(long now) {
@@ -66,12 +73,18 @@ public final class Heartbeat {
         heartbeatInFlight = true;
         update(now);
         heartbeatTimer.reset(rebalanceConfig.heartbeatIntervalMs);
+
+        if (log.isTraceEnabled()) {
+            log.trace("Sending heartbeat request with {}ms remaining on 
timer", heartbeatTimer.remainingMs());
+        }
     }
 
     void failHeartbeat() {
         update(time.milliseconds());
         heartbeatInFlight = false;
         heartbeatTimer.reset(rebalanceConfig.retryBackoffMs);
+
+        log.trace("Heartbeat failed, reset the timer to {}ms remaining", 
heartbeatTimer.remainingMs());
     }
 
     void receiveHeartbeat() {
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java 
b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index 6cfc4fd..38c0d84 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -312,6 +312,14 @@ public class MockClient implements KafkaClient {
         return this.requests;
     }
 
+    public Queue<ClientResponse> responses() {
+        return this.responses;
+    }
+
+    public Queue<FutureResponse> futureResponses() {
+        return this.futureResponses;
+    }
+
     public void respond(AbstractResponse response) {
         respond(response, false);
     }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 9fc54bf..8cfbfd6 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -508,13 +508,17 @@ public class KafkaConsumerTest {
 
         final KafkaConsumer<String, String> consumer = newConsumer(time, 
client, subscription, metadata, assignor, true, groupInstanceId);
         consumer.subscribe(singleton(topic), 
getConsumerRebalanceListener(consumer));
-        prepareRebalance(client, node, assignor, singletonList(tp0), null);
+        // Since we would enable the heartbeat thread after received 
join-response which could
+        // send the sync-group on behalf of the consumer if it is enqueued, we 
may still complete
+        // the rebalance and send out the fetch; in order to avoid it we do 
not prepare sync response here.
+        
client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
node), node);
+        Node coordinator = new Node(Integer.MAX_VALUE - node.id(), 
node.host(), node.port());
+        client.prepareResponseFrom(joinGroupFollowerResponse(assignor, 1, 
memberId, leaderId, Errors.NONE), coordinator);
 
         consumer.poll(Duration.ZERO);
 
-        // The underlying client should NOT get a fetch request
         final Queue<ClientRequest> requests = client.requests();
-        Assert.assertEquals(0, requests.size());
+        Assert.assertEquals(0, requests.stream().filter(request -> 
request.apiKey().equals(ApiKeys.FETCH)).count());
     }
 
     @SuppressWarnings("deprecation")
@@ -1253,9 +1257,6 @@ public class KafkaConsumerTest {
         time.sleep(heartbeatIntervalMs);
         TestUtils.waitForCondition(heartbeatReceived::get, "Heartbeat response 
did not occur within timeout.");
 
-        consumer.updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE));
-        assertTrue(heartbeatReceived.get());
-
         RuntimeException unsubscribeException = 
assertThrows(RuntimeException.class, consumer::unsubscribe);
         assertEquals(partitionLost + singleTopicPartition, 
unsubscribeException.getCause().getMessage());
     }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
index 331a6f3..d0a6575 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
@@ -450,6 +450,26 @@ public class AbstractCoordinatorTest {
                        && 
syncGroupRequest.data.protocolName().equals(PROTOCOL_NAME);
         }, syncGroupResponse(Errors.NONE, PROTOCOL_TYPE, wrongProtocolName));
 
+        // let the retry to complete successfully to break out of the while 
loop
+        mockClient.prepareResponse(body -> {
+            if (!(body instanceof JoinGroupRequest)) {
+                return false;
+            }
+            JoinGroupRequest joinGroupRequest = (JoinGroupRequest) body;
+            return 
joinGroupRequest.data().protocolType().equals(PROTOCOL_TYPE);
+        }, joinGroupFollowerResponse(1, memberId,
+                "memberid", Errors.NONE, PROTOCOL_TYPE));
+
+        mockClient.prepareResponse(body -> {
+            if (!(body instanceof SyncGroupRequest)) {
+                return false;
+            }
+
+            SyncGroupRequest syncGroupRequest = (SyncGroupRequest) body;
+            return syncGroupRequest.data.protocolType().equals(PROTOCOL_TYPE)
+                    && 
syncGroupRequest.data.protocolName().equals(PROTOCOL_NAME);
+        }, syncGroupResponse(Errors.NONE, PROTOCOL_TYPE, PROTOCOL_NAME));
+
         // No exception shall be thrown as the generation is reset.
         coordinator.joinGroupIfNeeded(mockTime.timer(100L));
     }
@@ -531,7 +551,7 @@ public class AbstractCoordinatorTest {
 
         final AbstractCoordinator.Generation currGen = 
coordinator.generation();
 
-        coordinator.setNewState(AbstractCoordinator.MemberState.REBALANCING);
+        
coordinator.setNewState(AbstractCoordinator.MemberState.PREPARING_REBALANCE);
         RequestFuture<ByteBuffer> future = coordinator.sendJoinGroupRequest();
 
         TestUtils.waitForCondition(() -> {
@@ -571,7 +591,7 @@ public class AbstractCoordinatorTest {
 
         final AbstractCoordinator.Generation currGen = 
coordinator.generation();
 
-        coordinator.setNewState(AbstractCoordinator.MemberState.REBALANCING);
+        
coordinator.setNewState(AbstractCoordinator.MemberState.PREPARING_REBALANCE);
         RequestFuture<ByteBuffer> future = coordinator.sendJoinGroupRequest();
 
         TestUtils.waitForCondition(() -> {
@@ -605,6 +625,25 @@ public class AbstractCoordinatorTest {
     }
 
     @Test
+    public void testHeartbeatSentWhenCompletingRebalance() throws Exception {
+        setupCoordinator();
+        joinGroup();
+
+        final AbstractCoordinator.Generation currGen = 
coordinator.generation();
+
+        
coordinator.setNewState(AbstractCoordinator.MemberState.COMPLETING_REBALANCE);
+
+        // the heartbeat should be sent out during a rebalance
+        mockTime.sleep(HEARTBEAT_INTERVAL_MS);
+        TestUtils.waitForCondition(() -> !mockClient.requests().isEmpty(), 
2000,
+                "The heartbeat request was not sent");
+        assertTrue(coordinator.heartbeat().hasInflight());
+
+        mockClient.respond(heartbeatResponse(Errors.REBALANCE_IN_PROGRESS));
+        assertEquals(currGen, coordinator.generation());
+    }
+
+    @Test
     public void testHeartbeatIllegalGenerationResponseWithOldGeneration() 
throws InterruptedException {
         setupCoordinator();
         joinGroup();
@@ -673,7 +712,7 @@ public class AbstractCoordinatorTest {
     }
 
     @Test
-    public void testHeartbeatUnknownMemberResponseDuringRebalancing() throws 
InterruptedException {
+    public void testHeartbeatRebalanceInProgressResponseDuringRebalancing() 
throws InterruptedException {
         setupCoordinator();
         joinGroup();
 
@@ -687,8 +726,7 @@ public class AbstractCoordinatorTest {
 
         assertTrue(coordinator.heartbeat().hasInflight());
 
-        // set the client to re-join group
-        mockClient.respond(heartbeatResponse(Errors.UNKNOWN_MEMBER_ID));
+        mockClient.respond(heartbeatResponse(Errors.REBALANCE_IN_PROGRESS));
 
         coordinator.requestRejoin();
 
@@ -699,8 +737,8 @@ public class AbstractCoordinatorTest {
             2000,
             "The heartbeat response was not received");
 
-        // the generation should be reset but the rebalance should still 
proceed
-        assertEquals(AbstractCoordinator.Generation.NO_GENERATION, 
coordinator.generation());
+        // the generation would not be reset while the rebalance is in progress
+        assertEquals(currGen, coordinator.generation());
 
         mockClient.respond(joinGroupFollowerResponse(currGen.generationId, 
memberId, JoinGroupRequest.UNKNOWN_MEMBER_ID, Errors.NONE));
         mockClient.prepareResponse(syncGroupResponse(Errors.NONE));
@@ -1099,44 +1137,6 @@ public class AbstractCoordinatorTest {
     }
 
     @Test
-    public void testWakeupAfterSyncGroupSent() throws Exception {
-        setupCoordinator();
-
-        mockClient.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE));
-        mockClient.prepareResponse(joinGroupFollowerResponse(1, memberId, 
leaderId, Errors.NONE));
-        mockClient.prepareResponse(new MockClient.RequestMatcher() {
-            private int invocations = 0;
-            @Override
-            public boolean matches(AbstractRequest body) {
-                invocations++;
-                boolean isSyncGroupRequest = body instanceof SyncGroupRequest;
-                if (isSyncGroupRequest && invocations == 1)
-                    // simulate wakeup after the request sent
-                    throw new WakeupException();
-                return isSyncGroupRequest;
-            }
-        }, syncGroupResponse(Errors.NONE));
-        AtomicBoolean heartbeatReceived = prepareFirstHeartbeat();
-
-        try {
-            coordinator.ensureActiveGroup();
-            fail("Should have woken up from ensureActiveGroup()");
-        } catch (WakeupException ignored) {
-        }
-
-        assertEquals(1, coordinator.onJoinPrepareInvokes);
-        assertEquals(0, coordinator.onJoinCompleteInvokes);
-        assertFalse(heartbeatReceived.get());
-
-        coordinator.ensureActiveGroup();
-
-        assertEquals(1, coordinator.onJoinPrepareInvokes);
-        assertEquals(1, coordinator.onJoinCompleteInvokes);
-
-        awaitFirstHeartbeat(heartbeatReceived);
-    }
-
-    @Test
     public void testWakeupAfterSyncGroupSentExternalCompletion() throws 
Exception {
         setupCoordinator();
 
@@ -1149,8 +1149,8 @@ public class AbstractCoordinatorTest {
                 invocations++;
                 boolean isSyncGroupRequest = body instanceof SyncGroupRequest;
                 if (isSyncGroupRequest && invocations == 1)
-                    // simulate wakeup after the request sent
-                    throw new WakeupException();
+                    // wakeup after the request returns
+                    consumerClient.wakeup();
                 return isSyncGroupRequest;
             }
         }, syncGroupResponse(Errors.NONE));
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 7b2495c..ae2ab8a 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -2119,7 +2119,7 @@ public class ConsumerCoordinatorTest {
             "memberId-new",
             null);
         coordinator.setNewGeneration(newGen);
-        coordinator.setNewState(AbstractCoordinator.MemberState.REBALANCING);
+        
coordinator.setNewState(AbstractCoordinator.MemberState.PREPARING_REBALANCE);
 
         assertTrue(consumerClient.poll(future, time.timer(30000)));
         
assertTrue(future.exception().getClass().isInstance(Errors.REBALANCE_IN_PROGRESS.exception()));
@@ -2174,7 +2174,7 @@ public class ConsumerCoordinatorTest {
             "memberId-new",
             null);
         coordinator.setNewGeneration(newGen);
-        coordinator.setNewState(AbstractCoordinator.MemberState.REBALANCING);
+        
coordinator.setNewState(AbstractCoordinator.MemberState.PREPARING_REBALANCE);
 
         assertTrue(consumerClient.poll(future, time.timer(30000)));
         
assertTrue(future.exception().getClass().isInstance(Errors.REBALANCE_IN_PROGRESS.exception()));
@@ -2218,7 +2218,7 @@ public class ConsumerCoordinatorTest {
             "memberId",
             null);
         coordinator.setNewGeneration(currGen);
-        coordinator.setNewState(AbstractCoordinator.MemberState.REBALANCING);
+        
coordinator.setNewState(AbstractCoordinator.MemberState.PREPARING_REBALANCE);
 
         prepareOffsetCommitRequest(singletonMap(t1p, 100L), 
Errors.FENCED_INSTANCE_ID);
         RequestFuture<Void> future = 
coordinator.sendOffsetCommitRequest(singletonMap(t1p,
@@ -2818,14 +2818,18 @@ public class ConsumerCoordinatorTest {
             res = coordinator.joinGroupIfNeeded(time.timer(1));
 
             assertFalse(res);
+
+            // should have retried sending a join group request already
             assertFalse(client.hasPendingResponses());
-            assertFalse(client.hasInFlightRequests());
+            assertEquals(1, client.inFlightRequestCount());
+
+            System.out.println(client.requests());
 
             // Retry join should then succeed
-            client.prepareResponse(joinGroupFollowerResponse(generationId, 
memberId, "leader", Errors.NONE));
+            client.respond(joinGroupFollowerResponse(generationId, memberId, 
"leader", Errors.NONE));
             client.prepareResponse(syncGroupResponse(singletonList(t1p), 
Errors.NONE));
 
-            res = coordinator.joinGroupIfNeeded(time.timer(2));
+            res = coordinator.joinGroupIfNeeded(time.timer(3000));
 
             assertTrue(res);
             assertFalse(client.hasPendingResponses());
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala 
b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
index 00dd09b..aa957cd 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
@@ -287,7 +287,7 @@ class GroupCoordinator(val brokerId: Int,
 
           group.currentState match {
             case PreparingRebalance =>
-              updateMemberAndRebalance(group, member, protocols, 
responseCallback)
+              updateMemberAndRebalance(group, member, protocols, s"Member 
${member.memberId} joining group during ${group.currentState}", 
responseCallback)
 
             case CompletingRebalance =>
               if (member.matches(protocols)) {
@@ -308,16 +308,18 @@ class GroupCoordinator(val brokerId: Int,
                   error = Errors.NONE))
               } else {
                 // member has changed metadata, so force a rebalance
-                updateMemberAndRebalance(group, member, protocols, 
responseCallback)
+                updateMemberAndRebalance(group, member, protocols, s"Updating 
metadata for member ${member.memberId} during ${group.currentState}", 
responseCallback)
               }
 
             case Stable =>
               val member = group.get(memberId)
-              if (group.isLeader(memberId) || !member.matches(protocols)) {
-                // force a rebalance if a member has changed metadata or if 
the leader sends JoinGroup.
-                // The latter allows the leader to trigger rebalances for 
changes affecting assignment
+              if (group.isLeader(memberId)) {
+                // force a rebalance if the leader sends JoinGroup;
+                // This allows the leader to trigger rebalances for changes 
affecting assignment
                 // which do not affect the member metadata (such as topic 
metadata changes for the consumer)
-                updateMemberAndRebalance(group, member, protocols, 
responseCallback)
+                updateMemberAndRebalance(group, member, protocols, s"leader 
${member.memberId} re-joining group during ${group.currentState}", 
responseCallback)
+              } else if (!member.matches(protocols)) {
+                updateMemberAndRebalance(group, member, protocols, s"Updating 
metadata for member ${member.memberId} during ${group.currentState}", 
responseCallback)
               } else {
                 // for followers with no actual change to their metadata, just 
return group information
                 // for the current generation which will allow them to issue 
SyncGroup
@@ -639,7 +641,11 @@ class GroupCoordinator(val brokerId: Int,
               responseCallback(Errors.UNKNOWN_MEMBER_ID)
 
             case CompletingRebalance =>
-                responseCallback(Errors.REBALANCE_IN_PROGRESS)
+              // consumers may start sending heartbeat after join-group 
response, in which case
+              // we should treat them as normal hb request and reset the timer
+              val member = group.get(memberId)
+              completeAndScheduleNextHeartbeatExpiration(group, member)
+              responseCallback(Errors.NONE)
 
             case PreparingRebalance =>
                 val member = group.get(memberId)
@@ -1071,9 +1077,10 @@ class GroupCoordinator(val brokerId: Int,
   private def updateMemberAndRebalance(group: GroupMetadata,
                                        member: MemberMetadata,
                                        protocols: List[(String, Array[Byte])],
+                                       reason: String,
                                        callback: JoinCallback): Unit = {
     group.updateMember(member, protocols, callback)
-    maybePrepareRebalance(group, s"Updating metadata for member 
${member.memberId}")
+    maybePrepareRebalance(group, reason)
   }
 
   private def maybePrepareRebalance(group: GroupMetadata, reason: String): 
Unit = {
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
index 9791cd6..875a9a1 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
@@ -1719,7 +1719,7 @@ class GroupCoordinatorTest {
 
     EasyMock.reset(replicaManager)
     val heartbeatResult = heartbeat(groupId, assignedMemberId, 1)
-    assertEquals(Errors.REBALANCE_IN_PROGRESS, heartbeatResult)
+    assertEquals(Errors.NONE, heartbeatResult)
   }
 
   @Test

Reply via email to