[GitHub] [kafka] guozhangwang commented on a change in pull request #8834: KAFKA-10134: Enable heartbeat during PrepareRebalance and Depend On State For Poll Timeout

2020-09-08 Thread GitBox


guozhangwang commented on a change in pull request #8834:
URL: https://github.com/apache/kafka/pull/8834#discussion_r485326873



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##
@@ -652,10 +644,10 @@ public void handle(JoinGroupResponse joinResponse, 
RequestFuture fut
 } else if (error == Errors.MEMBER_ID_REQUIRED) {
 // Broker requires a concrete member id to be allowed to join 
the group. Update member id
 // and send another join group request in next cycle.
+String memberId = joinResponse.data().memberId();
+log.debug("Attempt to join group returned {} error. Will set 
the member id as {} and then rejoin", error, memberId);
 synchronized (AbstractCoordinator.this) {
-AbstractCoordinator.this.generation = new 
Generation(OffsetCommitRequest.DEFAULT_GENERATION_ID,
-joinResponse.data().memberId(), null);
-AbstractCoordinator.this.resetStateAndRejoin();

Review comment:
   Yes, this is redundant since we are raising this error and 
`resetStateAndRejoin()` would still be executed at the handler anyways.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #8834: KAFKA-10134: Enable heartbeat during PrepareRebalance and Depend On State For Poll Timeout

2020-09-08 Thread GitBox


guozhangwang commented on a change in pull request #8834:
URL: https://github.com/apache/kafka/pull/8834#discussion_r485324085



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##
@@ -446,14 +453,15 @@ boolean joinGroupIfNeeded(final Timer timer) {
 resetJoinGroupFuture();
 needsJoinPrepare = true;
 } else {
-log.info("Generation data was cleared by heartbeat thread. 
Initiating rejoin.");
+log.info("Generation data was cleared by heartbeat thread 
to {} and state is now {} before " +
+ "the rebalance callback is triggered, marking this 
rebalance as failed and retry",
+ generation, state);
 resetStateAndRejoin();

Review comment:
   That's a good question. I just thought about this and I think I can 
change the caller of `resetGeneration` (which is the only place that hb thread 
can reset the generation) and move the `state = MemberState.UNJOINED;` into the 
callee to make sure that they are always changed together.

##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##
@@ -433,7 +440,7 @@ boolean joinGroupIfNeeded(final Timer timer) {
 generationSnapshot = this.generation;
 }
 
-if (generationSnapshot != Generation.NO_GENERATION) {
+if (generationSnapshot != Generation.NO_GENERATION && state == 
MemberState.STABLE) {

Review comment:
   Ack, will do.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #8834: KAFKA-10134: Enable heartbeat during PrepareRebalance and Depend On State For Poll Timeout

2020-08-31 Thread GitBox


guozhangwang commented on a change in pull request #8834:
URL: https://github.com/apache/kafka/pull/8834#discussion_r480429496



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##
@@ -528,7 +528,6 @@ public void onFailure(RuntimeException e) {
 }
 
 private void recordRebalanceFailure() {
-state = MemberState.UNJOINED;

Review comment:
   Previously this function has two lines: update the state and record 
sensors. Now that the first is called in the caller, this function becomes a 
one-liner and hence not worthy anymore so I in-lined it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #8834: KAFKA-10134: Enable heartbeat during PrepareRebalance and Depend On State For Poll Timeout

2020-08-31 Thread GitBox


guozhangwang commented on a change in pull request #8834:
URL: https://github.com/apache/kafka/pull/8834#discussion_r480425820



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##
@@ -497,40 +501,18 @@ private synchronized void resetStateAndRejoin() {
 joinFuture.addListener(new RequestFutureListener() {
 @Override
 public void onSuccess(ByteBuffer value) {
-// handle join completion in the callback so that the 
callback will be invoked

Review comment:
   Well I should say part of that (the enabling of the heartbeat thread) is 
in JoinGroup response handler, while the rest (update metrics, etc) is in 
SyncGroup response handler.

##
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##
@@ -639,7 +641,11 @@ class GroupCoordinator(val brokerId: Int,
   responseCallback(Errors.UNKNOWN_MEMBER_ID)
 
 case CompletingRebalance =>
-responseCallback(Errors.REBALANCE_IN_PROGRESS)
+  // consumers may start sending heartbeat after join-group 
response, in which case
+  // we should treat them as normal hb request and reset the timer
+  val member = group.get(memberId)

Review comment:
   It would return the error code before: that is because it does not 
expect clients to send heartbeat before sending sync-group requests. Now it is 
not the case any more.

##
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##
@@ -639,7 +641,11 @@ class GroupCoordinator(val brokerId: Int,
   responseCallback(Errors.UNKNOWN_MEMBER_ID)
 
 case CompletingRebalance =>
-responseCallback(Errors.REBALANCE_IN_PROGRESS)

Review comment:
   I had a discussion with @hachikuji about this. I think logically it 
should not return `REBALANCE_IN_PROGRESS` and clients in the future should 
update its handling logic too, maybe after some releases where we can break 
client-broker compatibility.

##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##
@@ -446,14 +453,15 @@ boolean joinGroupIfNeeded(final Timer timer) {
 resetJoinGroupFuture();
 needsJoinPrepare = true;
 } else {
-log.info("Generation data was cleared by heartbeat thread. 
Initiating rejoin.");
+log.info("Generation data was cleared by heartbeat thread 
to {} and state is now {} before " +
+ "the rebalance callback is triggered, marking this 
rebalance as failed and retry",
+ generation, state);
 resetStateAndRejoin();
 resetJoinGroupFuture();
-return false;
 }
 } else {
 final RuntimeException exception = future.exception();
-log.info("Join group failed with {}", exception.toString());
+log.info("Rebalance failed with {}", exception.toString());

Review comment:
   The reason I changed it is exactly that it may not always due to 
join-group :) If sync-group failed, this could also be triggered.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #8834: KAFKA-10134: Enable heartbeat during PrepareRebalance and Depend On State For Poll Timeout

2020-08-31 Thread GitBox


guozhangwang commented on a change in pull request #8834:
URL: https://github.com/apache/kafka/pull/8834#discussion_r476954744



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##
@@ -917,17 +938,14 @@ private synchronized void resetGeneration() {
 synchronized void resetGenerationOnResponseError(ApiKeys api, Errors 
error) {
 log.debug("Resetting generation after encountering {} from {} response 
and requesting re-join", error, api);
 
-// only reset the state to un-joined when it is not already in 
rebalancing

Review comment:
   We do not need this check any more since when we are only resetting 
generation if we see illegal generation or unknown member id, and in either 
case we should no longer heartbeat





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #8834: KAFKA-10134: Enable heartbeat during PrepareRebalance and Depend On State For Poll Timeout

2020-08-25 Thread GitBox


guozhangwang commented on a change in pull request #8834:
URL: https://github.com/apache/kafka/pull/8834#discussion_r476950994



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##
@@ -483,12 +492,7 @@ private synchronized void resetStateAndRejoin() {
 // rebalance in the call to poll below. This ensures that we do not 
mistakenly attempt
 // to rejoin before the pending rebalance has completed.
 if (joinFuture == null) {
-// fence off the heartbeat thread explicitly so that it cannot 
interfere with the join group.
-// Note that this must come after the call to onJoinPrepare since 
we must be able to continue
-// sending heartbeats if that callback takes some time.
-disableHeartbeatThread();

Review comment:
   We do not need to explicitly disable heartbeat thread since when the 
state is transited to PREPARING_REBALANCE, the thread would disable itself in 
the next iteration.

##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##
@@ -326,8 +331,9 @@ protected synchronized void pollHeartbeat(long now) {
 }
 
 protected synchronized long timeToNextHeartbeat(long now) {
-// if we have not joined the group, we don't need to send heartbeats
-if (state == MemberState.UNJOINED)
+// if we have not joined the group or we are preparing rebalance,

Review comment:
   This is the major fix 2) in description.

##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##
@@ -917,17 +938,14 @@ private synchronized void resetGeneration() {
 synchronized void resetGenerationOnResponseError(ApiKeys api, Errors 
error) {
 log.debug("Resetting generation after encountering {} from {} response 
and requesting re-join", error, api);
 
-// only reset the state to un-joined when it is not already in 
rebalancing

Review comment:
   We do not need this check any more since when we are only resetting 
generation if we see illegal generation or unknown member id, and in either 
case we should no longer heartbeats

##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##
@@ -497,40 +501,18 @@ private synchronized void resetStateAndRejoin() {
 joinFuture.addListener(new RequestFutureListener() {
 @Override
 public void onSuccess(ByteBuffer value) {
-// handle join completion in the callback so that the 
callback will be invoked

Review comment:
   Moved this into sync-group handler for readability.

##
File path: 
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
##
@@ -1098,44 +1136,6 @@ public void 
testWakeupAfterJoinGroupReceivedExternalCompletion() throws Exceptio
 awaitFirstHeartbeat(heartbeatReceived);
 }
 
-@Test
-public void testWakeupAfterSyncGroupSent() throws Exception {

Review comment:
   This is now a redundant test.

##
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##
@@ -639,7 +641,11 @@ class GroupCoordinator(val brokerId: Int,
   responseCallback(Errors.UNKNOWN_MEMBER_ID)
 
 case CompletingRebalance =>
-responseCallback(Errors.REBALANCE_IN_PROGRESS)
+  // consumers may start sending heartbeat after join-group 
response, in which case
+  // we should treat them as normal hb request and reset the timer
+  val member = group.get(memberId)

Review comment:
   This is the only logical change as 3) in the description. All others are 
logging changes.

##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##
@@ -1311,9 +1324,10 @@ public void run() {
 continue;
 }
 
-if (state != MemberState.STABLE) {
-// the group is not stable (perhaps because we 
left the group or because the coordinator
-// kicked us out), so disable heartbeats and wait 
for the main thread to rejoin.
+// we do not need to heartbeat we are not part of a 
group yet;
+// also if we already have fatal error, the client 
will be
+// crashed soon, hence we do not need to continue 
heartbeating either
+if (state.hasNotJoinedGroup() || hasFailed()) {

Review comment:
   This is the major fix 1).





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use th

[GitHub] [kafka] guozhangwang commented on a change in pull request #8834: KAFKA-10134: Enable heartbeat during PrepareRebalance and Depend On State For Poll Timeout

2020-08-25 Thread GitBox


guozhangwang commented on a change in pull request #8834:
URL: https://github.com/apache/kafka/pull/8834#discussion_r476065182



##
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##
@@ -287,7 +287,7 @@ class GroupCoordinator(val brokerId: Int,
 
   group.currentState match {
 case PreparingRebalance =>
-  updateMemberAndRebalance(group, member, protocols, 
responseCallback)
+  updateMemberAndRebalance(group, member, protocols, s"Member 
${member.memberId} joining group during ${group.currentState}", 
responseCallback)

Review comment:
   Yes, it only contains logging changes. But I will make some non logging 
changes later and will mark it explicitly.

##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##
@@ -528,7 +528,6 @@ public void onFailure(RuntimeException e) {
 }
 
 private void recordRebalanceFailure() {
-state = MemberState.UNJOINED;

Review comment:
   Yes I agree, I think we should just let the heartbeat thread access the 
state itself and then based on that decide whether or not to send heartbeats, I 
will update this logic.

##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##
@@ -1069,6 +1069,13 @@ private HeartbeatResponseHandler(final Generation 
generation) {
 public void handle(HeartbeatResponse heartbeatResponse, 
RequestFuture future) {
 sensors.heartbeatSensor.record(response.requestLatencyMs());
 Errors error = heartbeatResponse.error();
+
+if (state != MemberState.STABLE) {

Review comment:
   My thoughts were that, when we are in rebalancing then the purpose of 
heartbeat is only to keep the consumer alive at the broker side, not to take 
any instructions. But I think it should be handled case-by-case, I will try to 
refactor this piece a bit as well.

##
File path: 
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
##
@@ -604,6 +605,25 @@ public void 
testSyncGroupIllegalGenerationResponseWithOldGeneration() throws Int
 assertEquals(newGen, coordinator.generation());
 }
 
+@Test
+public void testHeartbeatSentWhenRebalancing() throws Exception {
+setupCoordinator();
+joinGroup();
+
+final AbstractCoordinator.Generation currGen = 
coordinator.generation();
+
+coordinator.setNewState(AbstractCoordinator.MemberState.REBALANCING);
+
+// the heartbeat thread should be sent out during a rebalance
+mockTime.sleep(HEARTBEAT_INTERVAL_MS);
+TestUtils.waitForCondition(() -> !mockClient.requests().isEmpty(), 
2000,
+"The heartbeat request was not sent");
+assertTrue(coordinator.heartbeat().hasInflight());
+
+mockClient.respond(heartbeatResponse(Errors.REBALANCE_IN_PROGRESS));

Review comment:
   Actually we do not need to :)

##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##
@@ -528,7 +528,6 @@ public void onFailure(RuntimeException e) {
 }
 
 private void recordRebalanceFailure() {

Review comment:
   Yup, I will just inline this then.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org