Re: [PR] KAFKA-16528: Client HB timing fix [kafka]

2024-04-26 Thread via GitHub


cadonna merged PR #15698:
URL: https://github.com/apache/kafka/pull/15698


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16528: Client HB timing fix [kafka]

2024-04-25 Thread via GitHub


lianetm commented on PR #15698:
URL: https://github.com/apache/kafka/pull/15698#issuecomment-2077313831

   Thanks for the helpful comments @cadonna , all addressed. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16528: Client HB timing fix [kafka]

2024-04-25 Thread via GitHub


lianetm commented on code in PR #15698:
URL: https://github.com/apache/kafka/pull/15698#discussion_r1579555839


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -469,19 +469,33 @@ public void resetTimer() {
 this.heartbeatTimer.reset(heartbeatIntervalMs);
 }
 
+/**
+ * Check if a heartbeat request should be sent on the current time. A 
heartbeat should be
+ * sent if the heartbeat timer has expired, backoff has expired, and 
there is no request
+ * in-flight.
+ */
 @Override
 public boolean canSendRequest(final long currentTimeMs) {
 update(currentTimeMs);
 return heartbeatTimer.isExpired() && 
super.canSendRequest(currentTimeMs);
 }
 
-public long nextHeartbeatMs(final long currentTimeMs) {
+public long timeToNextHeartbeatMs(final long currentTimeMs) {
 if (heartbeatTimer.remainingMs() == 0) {

Review Comment:
   They achieve the same here, and totally agree that `isExpired` is more 
readable, fixed. (Sensible "since we're here..." to me too btw)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16528: Client HB timing fix [kafka]

2024-04-25 Thread via GitHub


lianetm commented on code in PR #15698:
URL: https://github.com/apache/kafka/pull/15698#discussion_r1579555219


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -164,6 +165,23 @@ public void testHeartbeatOnStartup() {
 assertEquals(0, result2.unsentRequests.size());
 }
 
+@Test
+public void testSuccessfulHeartbeatTiming() {
+mockStableMember();
+NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
+assertEquals(0, result.unsentRequests.size(),
+"No heartbeat should be sent while interval has not expired");
+long currentTimeMs = time.milliseconds();
+
assertEquals(heartbeatRequestState.timeToNextHeartbeatMs(currentTimeMs), 
result.timeUntilNextPollMs);
+assertNextHeartbeatTiming(DEFAULT_HEARTBEAT_INTERVAL_MS);
+
+result = heartbeatRequestManager.poll(time.milliseconds());
+assertEquals(1, result.unsentRequests.size(), "A heartbeat should be 
sent when interval expires");
+NetworkClientDelegate.UnsentRequest inflightReq = 
result.unsentRequests.get(0);
+inflightReq.handler().onComplete(createHeartbeatResponse(inflightReq, 
Errors.NONE));
+assertNextHeartbeatTiming(DEFAULT_HEARTBEAT_INTERVAL_MS);

Review Comment:
   This relates to your comment above. You're right that we did not have the 
check to ensure the reset happened on the sent and not on the response, so I 
added above the check for the `timeToNextHeartbeatMs` that would fail if the 
timer is not reset on the send, with a specific message for it. That check 
covers it, but still I also added the steps for advance the timer just a bit, 
check that no HB is sent and that the time is updated with the difference, as 
you suggested. All done. 



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -164,6 +165,23 @@ public void testHeartbeatOnStartup() {
 assertEquals(0, result2.unsentRequests.size());
 }
 
+@Test
+public void testSuccessfulHeartbeatTiming() {
+mockStableMember();
+NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
+assertEquals(0, result.unsentRequests.size(),
+"No heartbeat should be sent while interval has not expired");
+long currentTimeMs = time.milliseconds();
+
assertEquals(heartbeatRequestState.timeToNextHeartbeatMs(currentTimeMs), 
result.timeUntilNextPollMs);
+assertNextHeartbeatTiming(DEFAULT_HEARTBEAT_INTERVAL_MS);
+
+result = heartbeatRequestManager.poll(time.milliseconds());
+assertEquals(1, result.unsentRequests.size(), "A heartbeat should be 
sent when interval expires");

Review Comment:
   Sure! I missed it. Added. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16528: Client HB timing fix [kafka]

2024-04-25 Thread via GitHub


cadonna commented on code in PR #15698:
URL: https://github.com/apache/kafka/pull/15698#discussion_r1579191646


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -164,6 +165,23 @@ public void testHeartbeatOnStartup() {
 assertEquals(0, result2.unsentRequests.size());
 }
 
+@Test
+public void testSuccessfulHeartbeatTiming() {
+mockStableMember();
+NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
+assertEquals(0, result.unsentRequests.size(),
+"No heartbeat should be sent while interval has not expired");
+long currentTimeMs = time.milliseconds();
+
assertEquals(heartbeatRequestState.timeToNextHeartbeatMs(currentTimeMs), 
result.timeUntilNextPollMs);
+assertNextHeartbeatTiming(DEFAULT_HEARTBEAT_INTERVAL_MS);
+
+result = heartbeatRequestManager.poll(time.milliseconds());
+assertEquals(1, result.unsentRequests.size(), "A heartbeat should be 
sent when interval expires");
+NetworkClientDelegate.UnsentRequest inflightReq = 
result.unsentRequests.get(0);
+inflightReq.handler().onComplete(createHeartbeatResponse(inflightReq, 
Errors.NONE));
+assertNextHeartbeatTiming(DEFAULT_HEARTBEAT_INTERVAL_MS);

Review Comment:
   How do you know whether the heartbeat timer was reset in 
`makeHeartbeatRequest()` or in `onResponse()` if you verify the reset after you 
complete the future of the request?
   I would verify the timer reset after the poll, progress the time a bit (less 
then the heartbeat interval), and then verify here that the time to the next 
heartbeat is the heartbeat interval minus the amount of time I progressed the 
time after the poll.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16528: Client HB timing fix [kafka]

2024-04-25 Thread via GitHub


cadonna commented on code in PR #15698:
URL: https://github.com/apache/kafka/pull/15698#discussion_r1579191646


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -164,6 +165,23 @@ public void testHeartbeatOnStartup() {
 assertEquals(0, result2.unsentRequests.size());
 }
 
+@Test
+public void testSuccessfulHeartbeatTiming() {
+mockStableMember();
+NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
+assertEquals(0, result.unsentRequests.size(),
+"No heartbeat should be sent while interval has not expired");
+long currentTimeMs = time.milliseconds();
+
assertEquals(heartbeatRequestState.timeToNextHeartbeatMs(currentTimeMs), 
result.timeUntilNextPollMs);
+assertNextHeartbeatTiming(DEFAULT_HEARTBEAT_INTERVAL_MS);
+
+result = heartbeatRequestManager.poll(time.milliseconds());
+assertEquals(1, result.unsentRequests.size(), "A heartbeat should be 
sent when interval expires");
+NetworkClientDelegate.UnsentRequest inflightReq = 
result.unsentRequests.get(0);
+inflightReq.handler().onComplete(createHeartbeatResponse(inflightReq, 
Errors.NONE));
+assertNextHeartbeatTiming(DEFAULT_HEARTBEAT_INTERVAL_MS);

Review Comment:
   How do you know whether the heartbeat timer was reset in 
`makeHeartbeatRequest()` or in `onResponse()` if you verify the reset after you 
complete the future of the request?
   I would verify the reset after the poll, progress the time a bit (less then 
the heartbeat interval), and then verify here that the time did not change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16528: Client HB timing fix [kafka]

2024-04-25 Thread via GitHub


cadonna commented on code in PR #15698:
URL: https://github.com/apache/kafka/pull/15698#discussion_r1579191646


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -164,6 +165,23 @@ public void testHeartbeatOnStartup() {
 assertEquals(0, result2.unsentRequests.size());
 }
 
+@Test
+public void testSuccessfulHeartbeatTiming() {
+mockStableMember();
+NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
+assertEquals(0, result.unsentRequests.size(),
+"No heartbeat should be sent while interval has not expired");
+long currentTimeMs = time.milliseconds();
+
assertEquals(heartbeatRequestState.timeToNextHeartbeatMs(currentTimeMs), 
result.timeUntilNextPollMs);
+assertNextHeartbeatTiming(DEFAULT_HEARTBEAT_INTERVAL_MS);
+
+result = heartbeatRequestManager.poll(time.milliseconds());
+assertEquals(1, result.unsentRequests.size(), "A heartbeat should be 
sent when interval expires");
+NetworkClientDelegate.UnsentRequest inflightReq = 
result.unsentRequests.get(0);
+inflightReq.handler().onComplete(createHeartbeatResponse(inflightReq, 
Errors.NONE));
+assertNextHeartbeatTiming(DEFAULT_HEARTBEAT_INTERVAL_MS);

Review Comment:
   How do you know whether the heartbeat timer was reset in 
`makeHeartbeatRequest()` or in `onResponse()` if you verify the reset after you 
complete the future of the request?
   I would verify the reset after the poll, progress the time a bit (less then 
the heartbeat interval), and then verify here that the time to the next 
heartbeat did not change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16528: Client HB timing fix [kafka]

2024-04-25 Thread via GitHub


cadonna commented on code in PR #15698:
URL: https://github.com/apache/kafka/pull/15698#discussion_r1579187392


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -164,6 +165,23 @@ public void testHeartbeatOnStartup() {
 assertEquals(0, result2.unsentRequests.size());
 }
 
+@Test
+public void testSuccessfulHeartbeatTiming() {
+mockStableMember();
+NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
+assertEquals(0, result.unsentRequests.size(),
+"No heartbeat should be sent while interval has not expired");
+long currentTimeMs = time.milliseconds();
+
assertEquals(heartbeatRequestState.timeToNextHeartbeatMs(currentTimeMs), 
result.timeUntilNextPollMs);
+assertNextHeartbeatTiming(DEFAULT_HEARTBEAT_INTERVAL_MS);
+
+result = heartbeatRequestManager.poll(time.milliseconds());
+assertEquals(1, result.unsentRequests.size(), "A heartbeat should be 
sent when interval expires");

Review Comment:
   Don't you need a verification here that ensures that the heartbeat timer was 
reset after the poll? 



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -164,6 +165,23 @@ public void testHeartbeatOnStartup() {
 assertEquals(0, result2.unsentRequests.size());
 }
 
+@Test
+public void testSuccessfulHeartbeatTiming() {
+mockStableMember();
+NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
+assertEquals(0, result.unsentRequests.size(),
+"No heartbeat should be sent while interval has not expired");
+long currentTimeMs = time.milliseconds();
+
assertEquals(heartbeatRequestState.timeToNextHeartbeatMs(currentTimeMs), 
result.timeUntilNextPollMs);
+assertNextHeartbeatTiming(DEFAULT_HEARTBEAT_INTERVAL_MS);
+
+result = heartbeatRequestManager.poll(time.milliseconds());
+assertEquals(1, result.unsentRequests.size(), "A heartbeat should be 
sent when interval expires");
+NetworkClientDelegate.UnsentRequest inflightReq = 
result.unsentRequests.get(0);
+inflightReq.handler().onComplete(createHeartbeatResponse(inflightReq, 
Errors.NONE));
+assertNextHeartbeatTiming(DEFAULT_HEARTBEAT_INTERVAL_MS);

Review Comment:
   How do you know whether the heartbeat timer was reset in 
`makeHeartbeatRequest()` or in `onResponse()` if you verify the reset after you 
complete the future of the request?
   I would verify the reset after the poll, progress the time a bit (less then 
the heartbeat interval), and then verify here that the time did not change 
after the progress.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -469,19 +469,33 @@ public void resetTimer() {
 this.heartbeatTimer.reset(heartbeatIntervalMs);
 }
 
+/**
+ * Check if a heartbeat request should be sent on the current time. A 
heartbeat should be
+ * sent if the heartbeat timer has expired, backoff has expired, and 
there is no request
+ * in-flight.
+ */
 @Override
 public boolean canSendRequest(final long currentTimeMs) {
 update(currentTimeMs);
 return heartbeatTimer.isExpired() && 
super.canSendRequest(currentTimeMs);
 }
 
-public long nextHeartbeatMs(final long currentTimeMs) {
+public long timeToNextHeartbeatMs(final long currentTimeMs) {
 if (heartbeatTimer.remainingMs() == 0) {

Review Comment:
   Sorry if I comment on code outside the PR. Isn't this the same as 
`heartbeatTimer.isExpired()`? If yes, could we please change this to make the 
the code more readable?  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16528: Client HB timing fix [kafka]

2024-04-22 Thread via GitHub


lianetm commented on PR #15698:
URL: https://github.com/apache/kafka/pull/15698#issuecomment-2069725394

   Hi @cadonna, thanks for the comments!
   - the unit test I added initially fails on 
[this](https://github.com/apache/kafka/blob/d242c444562fe4be41a2bc53d47ab2a6f523b955/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java#L276)
 assertion without the changes on this PR, I guess that's what you were looking 
for with your first comment? 
   -  agreed, we did not have test coverage to make sure that the 
`canSendRequest` and `nextHeartbeat` move along consistently. I added a func to 
validate them together, included it in the existing test that covers responses 
with errors, and added a new test to include it in the successful path too.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16528: Client HB timing fix [kafka]

2024-04-22 Thread via GitHub


cadonna commented on code in PR #15698:
URL: https://github.com/apache/kafka/pull/15698#discussion_r1574708196


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -482,6 +484,14 @@ public long nextHeartbeatMs(final long currentTimeMs) {
 return heartbeatTimer.remainingMs();
 }
 
+public void onFailedAttempt(final long currentTimeMs) {
+// Expire timer to allow sending HB after a failure. After a 
failure, a next HB may be
+// needed with backoff (ex. errors that lead to retries, like 
coordinator load error),
+// or immediately (ex. errors that lead to rejoining, like fencing 
errors).
+heartbeatTimer.update(heartbeatTimer.currentTimeMs() + 
heartbeatTimer.remainingMs());

Review Comment:
   Sorry, my previous comment is not correct. The `heartbeatTimer` is expired 
when you call `heartbeatTimer.reset(0)`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16528: Client HB timing fix [kafka]

2024-04-22 Thread via GitHub


cadonna commented on code in PR #15698:
URL: https://github.com/apache/kafka/pull/15698#discussion_r1574660830


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -482,6 +484,14 @@ public long nextHeartbeatMs(final long currentTimeMs) {
 return heartbeatTimer.remainingMs();
 }
 
+public void onFailedAttempt(final long currentTimeMs) {
+// Expire timer to allow sending HB after a failure. After a 
failure, a next HB may be
+// needed with backoff (ex. errors that lead to retries, like 
coordinator load error),
+// or immediately (ex. errors that lead to rejoining, like fencing 
errors).
+heartbeatTimer.update(heartbeatTimer.currentTimeMs() + 
heartbeatTimer.remainingMs());

Review Comment:
   Your change to `heartbeatTimer.reset(0)` is not totally equivalent  to 
`heartbeatTimer.update(heartbeatTimer.currentTimeMs() + 
heartbeatTimer.remainingMs());` because in the former the `heartbeatTimer` is 
not expired until `heartbeatTimer.update()` is called, whereas in the latter 
the heartbeat is expired after the call, but I think in this specific case it 
does not matter. Is my assumption correct?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16528: Client HB timing fix [kafka]

2024-04-18 Thread via GitHub


lianetm commented on PR #15698:
URL: https://github.com/apache/kafka/pull/15698#issuecomment-2064258399

   Hey @cadonna, could you take a look when you have chance? Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16528: Client HB timing fix [kafka]

2024-04-17 Thread via GitHub


lianetm commented on PR #15698:
URL: https://github.com/apache/kafka/pull/15698#issuecomment-2062253184

   Thanks for the comments @kirktrue, all addressed. 
   
   Regarding your comment regarding tests 
[here](https://github.com/apache/kafka/pull/15698#pullrequestreview-2004676007),
 we have that covered with the existing `testHeartbeatResponseOnErrorHandling`. 
That one is validating that we get the right `nextHeartbeatMs` time (which 
considers the timer), for each specific error type. Is that what you were 
looking for? Thanks!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16528: Client HB timing fix [kafka]

2024-04-17 Thread via GitHub


lianetm commented on code in PR #15698:
URL: https://github.com/apache/kafka/pull/15698#discussion_r1569511540


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -231,6 +231,35 @@ public void testTimerNotDue() {
 assertEquals(Long.MAX_VALUE, result.timeUntilNextPollMs);
 }
 
+@Test
+public void testHeartbeatNotSentIfAnotherOneInFlight() {
+mockStableMember();
+time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);
+
+// Heartbeat sent (no response received)
+NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
+assertEquals(1, result.unsentRequests.size());
+NetworkClientDelegate.UnsentRequest inflightReq = 
result.unsentRequests.get(0);
+
+result = heartbeatRequestManager.poll(time.milliseconds());
+assertEquals(0, result.unsentRequests.size(), "No heartbeat should be 
sent while a " +
+"previous on in-flight");

Review Comment:
   Fixed (not nit-picky at all, ugly typo, good catch!)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16528: Client HB timing fix [kafka]

2024-04-17 Thread via GitHub


lianetm commented on code in PR #15698:
URL: https://github.com/apache/kafka/pull/15698#discussion_r1569504137


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -482,6 +482,15 @@ public long nextHeartbeatMs(final long currentTimeMs) {
 return heartbeatTimer.remainingMs();
 }
 
+public void onFailedAttempt(final long currentTimeMs) {
+// Reset timer to allow sending HB after a failure without waiting 
for the interval.
+// After a failure, a next HB may be needed with backoff (ex. 
errors that lead to
+// retries, like coordinator load error), or immediately (ex. 
errors that lead to
+// rejoining, like fencing errors).
+heartbeatTimer.reset(0);
+super.onFailedAttempt(currentTimeMs);

Review Comment:
   no, it doesn't. The timer is only to indicate that an interval should be 
respected. In cases of failures, we don't want to follow the interval (so we 
reset timer to 0). Each error will :
   - send a next HB based on other conditions (ex. as soon as the coordinator 
is discovered, when releasing assignment finishes after getting fenced)
   -  not send a next HB at all (fatal errors)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16528: Client HB timing fix [kafka]

2024-04-17 Thread via GitHub


lianetm commented on code in PR #15698:
URL: https://github.com/apache/kafka/pull/15698#discussion_r1569498394


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -380,7 +380,7 @@ private void onErrorResponse(final 
ConsumerGroupHeartbeatResponse response,
 break;
 
 case UNRELEASED_INSTANCE_ID:
-logger.error("GroupHeartbeatRequest failed due to the instance 
id {} was not released: {}",
+logger.error("GroupHeartbeatRequest failed due to unreleased 
instance id {}: {}",

Review Comment:
   both. The change for the default log is needed: it was not including the 
errorMessage, and that makes it hard to know what happened when you get errors 
like INVALID_REQUEST (I personally got it and lost time investigating, so fixed 
it). The other log changes are just improvements because I was already there. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16528: Client HB timing fix [kafka]

2024-04-16 Thread via GitHub


kirktrue commented on code in PR #15698:
URL: https://github.com/apache/kafka/pull/15698#discussion_r1567992301


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -482,6 +482,15 @@ public long nextHeartbeatMs(final long currentTimeMs) {
 return heartbeatTimer.remainingMs();
 }
 
+public void onFailedAttempt(final long currentTimeMs) {
+// Reset timer to allow sending HB after a failure without waiting 
for the interval.
+// After a failure, a next HB may be needed with backoff (ex. 
errors that lead to
+// retries, like coordinator load error), or immediately (ex. 
errors that lead to
+// rejoining, like fencing errors).
+heartbeatTimer.reset(0);
+super.onFailedAttempt(currentTimeMs);

Review Comment:
   Does the decision to reset the heartbeat timer depend on what _type_ of 
error is received?



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -380,7 +380,7 @@ private void onErrorResponse(final 
ConsumerGroupHeartbeatResponse response,
 break;
 
 case UNRELEASED_INSTANCE_ID:
-logger.error("GroupHeartbeatRequest failed due to the instance 
id {} was not released: {}",
+logger.error("GroupHeartbeatRequest failed due to unreleased 
instance id {}: {}",

Review Comment:
   QQ: are these logging changes of the ‘I'll just clean this up as long as I'm 
in here?’ variety, or dow it have some bearing on the correctness of the logs?



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -231,6 +231,35 @@ public void testTimerNotDue() {
 assertEquals(Long.MAX_VALUE, result.timeUntilNextPollMs);
 }
 
+@Test
+public void testHeartbeatNotSentIfAnotherOneInFlight() {
+mockStableMember();
+time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);
+
+// Heartbeat sent (no response received)
+NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
+assertEquals(1, result.unsentRequests.size());
+NetworkClientDelegate.UnsentRequest inflightReq = 
result.unsentRequests.get(0);
+
+result = heartbeatRequestManager.poll(time.milliseconds());
+assertEquals(0, result.unsentRequests.size(), "No heartbeat should be 
sent while a " +
+"previous on in-flight");

Review Comment:
   Super nit-picky, sorry  
   
   ```suggestion
   "previous one in-flight");
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16528: Client HB timing fix [kafka]

2024-04-15 Thread via GitHub


lianetm commented on code in PR #15698:
URL: https://github.com/apache/kafka/pull/15698#discussion_r1565783934


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -269,6 +269,9 @@ private NetworkClientDelegate.UnsentRequest 
makeHeartbeatRequest(final long curr
 heartbeatRequestState.onSendAttempt(currentTimeMs);
 membershipManager.onHeartbeatRequestSent();
 metricsManager.recordHeartbeatSentMs(currentTimeMs);
+// Reset timer when sending the request, to make sure that, if waiting 
for the interval,
+// we don't include the response time (which may introduce delay)

Review Comment:
   You're right, I don't think it's bringing anything not clear with the func 
name and action themselves. Removed.
   This is covered in the new test I added 
[here](https://github.com/apache/kafka/blob/fe483ff816b62133291f77f29b00e3bc706b581f/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java#L258).
 I just included now an assert message along the lines of this comment to make 
it clearer in the test.   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16528: Client HB timing fix [kafka]

2024-04-15 Thread via GitHub


lianetm commented on PR #15698:
URL: https://github.com/apache/kafka/pull/15698#issuecomment-2056883089

   Hey @cadonna, thanks a lot for your feedback! All comments addressed. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16528: Client HB timing fix [kafka]

2024-04-15 Thread via GitHub


lianetm commented on code in PR #15698:
URL: https://github.com/apache/kafka/pull/15698#discussion_r1565815246


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -482,6 +484,14 @@ public long nextHeartbeatMs(final long currentTimeMs) {
 return heartbeatTimer.remainingMs();
 }
 
+public void onFailedAttempt(final long currentTimeMs) {
+// Expire timer to allow sending HB after a failure. After a 
failure, a next HB may be
+// needed with backoff (ex. errors that lead to retries, like 
coordinator load error),
+// or immediately (ex. errors that lead to rejoining, like fencing 
errors).
+heartbeatTimer.update(heartbeatTimer.currentTimeMs() + 
heartbeatTimer.remainingMs());

Review Comment:
   Done, good point. I changed it to reset to 0, that shows the intention of 
not having an interval to wait for, which is what we want on these failure 
scenarios.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16528: Client HB timing fix [kafka]

2024-04-15 Thread via GitHub


lianetm commented on code in PR #15698:
URL: https://github.com/apache/kafka/pull/15698#discussion_r1565783934


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -269,6 +269,9 @@ private NetworkClientDelegate.UnsentRequest 
makeHeartbeatRequest(final long curr
 heartbeatRequestState.onSendAttempt(currentTimeMs);
 membershipManager.onHeartbeatRequestSent();
 metricsManager.recordHeartbeatSentMs(currentTimeMs);
+// Reset timer when sending the request, to make sure that, if waiting 
for the interval,
+// we don't include the response time (which may introduce delay)

Review Comment:
   You're right, I don't think it's bringing anything not clear with the func 
name and action themselves. Removed.
   This is covered in the new test I added. I just included now an assert 
message along the lines of this comment to make it clearer in the test.   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16528: Client HB timing fix [kafka]

2024-04-12 Thread via GitHub


cadonna commented on code in PR #15698:
URL: https://github.com/apache/kafka/pull/15698#discussion_r1562500382


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -269,6 +269,9 @@ private NetworkClientDelegate.UnsentRequest 
makeHeartbeatRequest(final long curr
 heartbeatRequestState.onSendAttempt(currentTimeMs);
 membershipManager.onHeartbeatRequestSent();
 metricsManager.recordHeartbeatSentMs(currentTimeMs);
+// Reset timer when sending the request, to make sure that, if waiting 
for the interval,
+// we don't include the response time (which may introduce delay)

Review Comment:
   Do we really need this comment?
   Additionally, I could not find a verification of this call in unit tests. 
Since you added a comment it seems to be important enough for a verification. 



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -231,6 +231,34 @@ public void testTimerNotDue() {
 assertEquals(Long.MAX_VALUE, result.timeUntilNextPollMs);
 }
 
+@Test
+public void testHeartbeatNotSentIfAnotherOnInFlight() {
+mockStableMember();
+time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);
+
+// Heartbeat sent (no response received)
+NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
+assertEquals(1, result.unsentRequests.size());
+NetworkClientDelegate.UnsentRequest inflightReq = 
result.unsentRequests.get(0);
+
+result = heartbeatRequestManager.poll(time.milliseconds());
+assertEquals(0, result.unsentRequests.size(), "No heartbeat should be 
sent while a " +
+"previous on in-flight");
+
+time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);
+result = heartbeatRequestManager.poll(time.milliseconds());
+assertEquals(0, result.unsentRequests.size(), "No heartbeat should be 
sent when the " +
+"interval expires if there is a previous HB request in-flight");
+
+// Receive response for the inflight. The next HB should be sent on 
the next poll after
+// the interval expires.
+inflightReq.handler().onComplete(createHeartbeatResponse(inflightReq, 
Errors.NONE));
+time.sleep(DEFAULT_RETRY_BACKOFF_MS);
+result = heartbeatRequestManager.poll(time.milliseconds());
+assertEquals(1, result.unsentRequests.size());
+

Review Comment:
   ```suggestion
   ```



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -231,6 +231,34 @@ public void testTimerNotDue() {
 assertEquals(Long.MAX_VALUE, result.timeUntilNextPollMs);
 }
 
+@Test
+public void testHeartbeatNotSentIfAnotherOnInFlight() {

Review Comment:
   typo
   ```suggestion
   public void testHeartbeatNotSentIfAnotherOneInFlight() {
   ```



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -482,6 +484,14 @@ public long nextHeartbeatMs(final long currentTimeMs) {
 return heartbeatTimer.remainingMs();
 }
 
+public void onFailedAttempt(final long currentTimeMs) {
+// Expire timer to allow sending HB after a failure. After a 
failure, a next HB may be
+// needed with backoff (ex. errors that lead to retries, like 
coordinator load error),
+// or immediately (ex. errors that lead to rejoining, like fencing 
errors).
+heartbeatTimer.update(heartbeatTimer.currentTimeMs() + 
heartbeatTimer.remainingMs());

Review Comment:
   What about adding a method to `Timer` that expires the timer without 
updating it with a time point in the future. Alternatively, I think you could 
reset the `Timer` to 0 with `heartbeatTimer.reset(0)`.
   Do we need a verification in the unit tests for this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16528: Client HB timing fix [kafka]

2024-04-11 Thread via GitHub


lianetm commented on PR #15698:
URL: https://github.com/apache/kafka/pull/15698#issuecomment-2050401151

   Hey @cadonna , could you take a look at this small fix when you have a 
chance? Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org