Re: [PR] KAFKA-18645: New consumer should align close timeout handling with classic consumer [kafka]

2025-02-05 Thread via GitHub


lianetm commented on PR #18702:
URL: https://github.com/apache/kafka/pull/18702#issuecomment-2636978057

   Merged to trunk and cherry picked to 4.0


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18645: New consumer should align close timeout handling with classic consumer [kafka]

2025-02-05 Thread via GitHub


lianetm merged PR #18702:
URL: https://github.com/apache/kafka/pull/18702


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18645: New consumer should align close timeout handling with classic consumer [kafka]

2025-02-04 Thread via GitHub


frankvicky commented on PR #18702:
URL: https://github.com/apache/kafka/pull/18702#issuecomment-2635593790

   Sure!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18645: New consumer should align close timeout handling with classic consumer [kafka]

2025-02-04 Thread via GitHub


lianetm commented on PR #18702:
URL: https://github.com/apache/kafka/pull/18702#issuecomment-2634993016

   @frankvicky could you please merge trunk latest changes? The failing 
transactions test has been fixed with 
https://github.com/apache/kafka/pull/18793 Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18645: New consumer should align close timeout handling with classic consumer [kafka]

2025-02-04 Thread via GitHub


frankvicky commented on code in PR #18702:
URL: https://github.com/apache/kafka/pull/18702#discussion_r1941409678


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1368,6 +1373,10 @@ private void close(Duration timeout, boolean 
swallowException) {
 }
 }
 
+private Timer createTimer(Duration timeout) {

Review Comment:
   Yes, it is nice to follow the style of classic consumers.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18645: New consumer should align close timeout handling with classic consumer [kafka]

2025-02-04 Thread via GitHub


lianetm commented on code in PR #18702:
URL: https://github.com/apache/kafka/pull/18702#discussion_r1941349655


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1368,6 +1373,10 @@ private void close(Duration timeout, boolean 
swallowException) {
 }
 }
 
+private Timer createTimer(Duration timeout) {

Review Comment:
   nit: should we show this is for the close op? `createTimerForCloseRequests` 
or similar maybe?



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1368,6 +1373,10 @@ private void close(Duration timeout, boolean 
swallowException) {
 }
 }
 
+private Timer createTimer(Duration timeout) {
+return time.timer(Duration.ofMillis(Math.min(timeout.toMillis(), 
requestTimeoutMs)));

Review Comment:
   ```suggestion
   return time.timer(Math.min(timeout.toMillis(), requestTimeoutMs));
   ```



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1368,6 +1373,10 @@ private void close(Duration timeout, boolean 
swallowException) {
 }
 }
 
+private Timer createTimer(Duration timeout) {
+return time.timer(Duration.ofMillis(Math.min(timeout.toMillis(), 
requestTimeoutMs)));

Review Comment:
   Should we add a null check to the time obj here? Similar to what the classic 
does, since close can be called from the constructor at any point (finally) if 
something fails.



##
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java:
##
@@ -1782,6 +1782,13 @@ public void close() {
  * timeout. If the consumer is unable to complete offset commits and 
gracefully leave the group
  * before the timeout expires, the consumer is force closed. Note that 
{@link #wakeup()} cannot be
  * used to interrupt close.
+ * 
+ * The actual maximum wait time is bounded by the {@link 
ConsumerConfig#REQUEST_TIMEOUT_MS_CONFIG} setting, which
+ * only applies to operations performed with the coordinator 
(coordinator-related requests and

Review Comment:
   Should we say this instead? (could be to the coordinator or the leader)
   ```suggestion
* only applies to operations performed with the broker 
(coordinator-related requests and
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18645: New consumer should align close timeout handling with classic consumer [kafka]

2025-02-04 Thread via GitHub


frankvicky commented on code in PR #18702:
URL: https://github.com/apache/kafka/pull/18702#discussion_r1940744792


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1326,7 +1331,7 @@ private void close(Duration timeout, boolean 
swallowException) {
 // We are already closing with a timeout, don't allow wake-ups from 
here on.
 wakeupTrigger.disableWakeups();
 
-final Timer closeTimer = time.timer(timeout);
+final Timer closeTimer = createTimer(timeout);

Review Comment:
   Yes, just walk through the corresponding logic of the classic consumer.
   The callback on close doesn't consume the time for the timer, to align the 
behavior, I think it's ok to remove the timer from 
`runRebalanceCallbacksOnClose`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18645: New consumer should align close timeout handling with classic consumer [kafka]

2025-02-04 Thread via GitHub


chia7712 commented on code in PR #18702:
URL: https://github.com/apache/kafka/pull/18702#discussion_r1940738655


##
clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java:
##
@@ -153,7 +153,8 @@ public class CommonClientConfigs {
 public static final String REQUEST_TIMEOUT_MS_DOC = "The configuration 
controls the maximum amount of time the client will wait "
  + "for the response 
of a request. If the response is not received before the timeout "
  + "elapses the client 
will resend the request if necessary or fail the request if "
- + "retries are 
exhausted.";
+ + "retries are 
exhausted. This timeout also applies to the consumer close operation - "
+ + "even if a larger 
timeout is specified for close, it will be limited by this value.";

Review Comment:
   It's crucial to highlight that neither the `OffsetCommitCallback` nor the 
`ConsumerRebalanceListener` callbacks consume time from the close timeout.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18645: New consumer should align close timeout handling with classic consumer [kafka]

2025-02-04 Thread via GitHub


chia7712 commented on code in PR #18702:
URL: https://github.com/apache/kafka/pull/18702#discussion_r1940736112


##
clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java:
##
@@ -153,7 +153,8 @@ public class CommonClientConfigs {
 public static final String REQUEST_TIMEOUT_MS_DOC = "The configuration 
controls the maximum amount of time the client will wait "
  + "for the response 
of a request. If the response is not received before the timeout "
  + "elapses the client 
will resend the request if necessary or fail the request if "
- + "retries are 
exhausted.";
+ + "retries are 
exhausted. This timeout also applies to the consumer close operation - "
+ + "even if a larger 
timeout is specified for close, it will be limited by this value.";

Review Comment:
   > isn't it enough to explain the behaviour on the close API java doc?
   
   +1 to add docs to close API



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18645: New consumer should align close timeout handling with classic consumer [kafka]

2025-02-03 Thread via GitHub


lianetm commented on code in PR #18702:
URL: https://github.com/apache/kafka/pull/18702#discussion_r1939721414


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1326,7 +1331,7 @@ private void close(Duration timeout, boolean 
swallowException) {
 // We are already closing with a timeout, don't allow wake-ups from 
here on.
 wakeupTrigger.disableWakeups();
 
-final Timer closeTimer = time.timer(timeout);
+final Timer closeTimer = createTimer(timeout);

Review Comment:
   not introduced here but affected with this change. I notice that 
`runRebalanceCallbacksOnClose` consumes time from the close timeout, right? 
(receives the timer just to update it). But that behaviour is not the same in 
the Classic Consumer. 
   
   In the classic, the close timeout only applies to requests really. The 
callbacks run when closing the Abstract coordinator, without time boundaries, 
and most importantly, without  consuming time from the close timeout. We 
`runRebalanceCallbacksOnClose` without time boundaries too, but it does consume 
the time from the timeout param, right? Wouldn't that potentially leave less 
time for the following requests? I'm concerned about existing apps, running 
callbacks, calling close with a timeout that used to be "enough", but now it 
may not be. Should we simply remove the timer from the 
runRebalanceCallbacksOnClose? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18645: New consumer should align close timeout handling with classic consumer [kafka]

2025-02-03 Thread via GitHub


lianetm commented on code in PR #18702:
URL: https://github.com/apache/kafka/pull/18702#discussion_r1939721414


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1326,7 +1331,7 @@ private void close(Duration timeout, boolean 
swallowException) {
 // We are already closing with a timeout, don't allow wake-ups from 
here on.
 wakeupTrigger.disableWakeups();
 
-final Timer closeTimer = time.timer(timeout);
+final Timer closeTimer = createTimer(timeout);

Review Comment:
   not introduced here but affected with this change. I notice that 
`runRebalanceCallbacksOnClose` consumes time from the close timeout (receives 
the timer just to update it). But that behaviour is not the same in the Classic 
Consumer. 
   
   In the classic, the close timeout only applies to requests really. The 
callbacks run when closing the Abstract coordinator, without time boundaries, 
and most importantly, without  consuming time from the close timeout. We 
`runRebalanceCallbacksOnClose` without time boundaries too, but it does consume 
the time from the timeout param, right? Wouldn't that potentially leave less 
time for the following requests? I'm concerned about existing apps, running 
callbacks, calling close with a timeout that used to be "enough", but now it 
may not be. Should we simply remove the timer from the 
runRebalanceCallbacksOnClose? 



##
clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java:
##
@@ -153,7 +153,8 @@ public class CommonClientConfigs {
 public static final String REQUEST_TIMEOUT_MS_DOC = "The configuration 
controls the maximum amount of time the client will wait "
  + "for the response 
of a request. If the response is not received before the timeout "
  + "elapses the client 
will resend the request if necessary or fail the request if "
- + "retries are 
exhausted.";
+ + "retries are 
exhausted. This timeout also applies to the consumer close operation - "
+ + "even if a larger 
timeout is specified for close, it will be limited by this value.";

Review Comment:
   well just for our understanding and related to my other comment: the classic 
behaviour we're trying to keep is that the request timeout applies to 
operations performed with the coordinator and the leader (coordinator-related 
requests and fetch sessions), not to other close operations that do not perform 
any request. I'm not suggesting to add any of that here, would polute this 
config doc imo. Actually, since it's really specific to consumer.close, isn't 
it enough to explain the behaviour on the close API java doc? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18645: New consumer should align close timeout handling with classic consumer [kafka]

2025-01-30 Thread via GitHub


kirktrue commented on PR #18702:
URL: https://github.com/apache/kafka/pull/18702#issuecomment-2625755604

   > Hi @kirktrue
   > 
   > > I'd like to see a sanity check unit test added to somewhere like 
KafkaConsumerTest that ensures that the value of request.timeout.ms is used 
over the timeout passed in. If that's the intended and documented behavior, we 
should validate it to be so.
   > 
   > I think it's not easy to check it. Could we rely on 
`ConsumeBounceTest#testClose` to validate this behavior ?
   
   If it's a lot of work, I guess we can skip it and assume `testClose()` 
validates the desired behavior.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18645: New consumer should align close timeout handling with classic consumer [kafka]

2025-01-30 Thread via GitHub


frankvicky commented on PR #18702:
URL: https://github.com/apache/kafka/pull/18702#issuecomment-2625037027

   Hi @kirktrue 
   > I'd like to see a sanity check unit test added to somewhere like 
KafkaConsumerTest that ensures that the value of request.timeout.ms is used 
over the timeout passed in. If that's the intended and documented behavior, we 
should validate it to be so.
   
   I think it's not easy to check it. Could we rely on 
`ConsumeBounceTest#testClose` to validate this behavior ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18645: New consumer should align close timeout handling with classic consumer [kafka]

2025-01-29 Thread via GitHub


kirktrue commented on PR #18702:
URL: https://github.com/apache/kafka/pull/18702#issuecomment-2623352548

   `request.timeout.ms` is designed to apply to _each_ discrete network request 
whereas the close timeout applies to the _entire_ `KafkaConsumer.close()` call. 
If the user provides a timeout of 60 seconds to `close()`, why should it give 
up after only 30 seconds? As a parallel, consider the relationship between 
`request.timeout.ms` and 
[`delivery.timeout.ms`](https://kafka.apache.org/documentation/#producerconfigs_delivery.timeout.ms)
 in the `KafkaProducer.send()` call. In that case, it's `delivery.timeout.ms`, 
not `request.timeout.ms`, that serves as the deadline by which all of the 
network calls in `send()` must complete.
   
   That said, let's fix this gap with the solution as proposed. No one has 
complained about the behavior of the existing consumer timeout. I have no 
intention of dying on this hill; I just personally find it confusing 😄


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18645: New consumer should align close timeout handling with classic consumer [kafka]

2025-01-28 Thread via GitHub


frankvicky commented on PR #18702:
URL: https://github.com/apache/kafka/pull/18702#issuecomment-2619592934

   Yes, thanks for the further explanation. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18645: New consumer should align close timeout handling with classic consumer [kafka]

2025-01-28 Thread via GitHub


lianetm commented on PR #18702:
URL: https://github.com/apache/kafka/pull/18702#issuecomment-2619566956

   > classic consumer only used the timer for the coordinator and fetcher
   
   True, but let's keep in mind that translating that to the new consumer means 
that we should apply the min to the steps to commit, leave, fetcher close, and 
network client close I expect (because this last step will wait for any of the 
requests generated on the previous steps). Makes sense? (Agree that we 
shouldn't apply it blindly)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18645: New consumer should align close timeout handling with classic consumer [kafka]

2025-01-28 Thread via GitHub


frankvicky commented on PR #18702:
URL: https://github.com/apache/kafka/pull/18702#issuecomment-2619541564

   > The fetch improvements of 
[KAFKA-7109](https://issues.apache.org/jira/browse/KAFKA-7109) seems to just 
apply to the fetch requests on close the same timeout principle that was being 
applied to the coordinator close.
   
   According to the commit you mentioned, I think you are right.
   This behavior has existed for a long time and was recently applied to fetch 
requests on close at 3.5.0.
   
   > The change to the timeout behavior was introduced relatively recently in 
3.5 via [KAFKA-7109](https://issues.apache.org/jira/browse/KAFKA-7109). Looking 
at https://github.com/apache/kafka/pull/12590, I'm not sure the change to 
ignore the user's timeout was necessarily intentional.
   > When closing, individual network requests should adhere to 
request.timeout.ms, but the overall timeout for closing should adhere to the 
user-provided timeout.
   
   This reminds me that the patch applied this close timeout principle to all 
closing behavior and I'm not sure if it is what we want. IIRC, classic consumer 
only used the timer for the coordinator and fetcher. 🤔 
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18645: New consumer should align close timeout handling with classic consumer [kafka]

2025-01-28 Thread via GitHub


lianetm commented on PR #18702:
URL: https://github.com/apache/kafka/pull/18702#issuecomment-2619153324

   My understanding is that this behaviour on close regarding the timeout has 
been in place for a long time (introduced here 
https://github.com/apache/kafka/commit/f72203ee9223d3b724ee67bdad9912612dd72f63).
 The fetch improvements of KAFKA-7109 seems to just apply to the fetch requests 
on close the same timeout principle that was being applied to the coordinator 
close. Makes sense? 
   
   I'll be taking a full pass today. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18645: New consumer should align close timeout handling with classic consumer [kafka]

2025-01-27 Thread via GitHub


kirktrue commented on PR #18702:
URL: https://github.com/apache/kafka/pull/18702#issuecomment-2616799271

   So given this configuration:
   
   ```
   request.timeout.ms=3
   group.protocol=classic
   ```
   
   When the user calls `Consumer.close(Duration.ofSeconds(60))`, it will 
complete in 30 seconds, not 60 seconds? If so, it means the user's timeout 
parameter is ignored, right?
   
   The change to the timeout behavior was introduced relatively recently in 3.5 
via KAFKA-7109. Looking at #12590, I'm not sure the change to ignore the user's 
timeout was necessarily intentional.
   
   When closing, individual network requests should adhere to 
`request.timeout.ms`, but the overall timeout for closing should adhere to the 
user-provided timeout.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18645: New consumer should align close timeout handling with classic consumer [kafka]

2025-01-25 Thread via GitHub


frankvicky commented on PR #18702:
URL: https://github.com/apache/kafka/pull/18702#issuecomment-2613966221

   doc preview:
   ![Screenshot from 2025-01-25 
21-26-14](https://github.com/user-attachments/assets/689731a6-6b7d-47e8-b94f-fd99f88e1135)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18645: New consumer should align close timeout handling with classic consumer [kafka]

2025-01-25 Thread via GitHub


frankvicky commented on PR #18702:
URL: https://github.com/apache/kafka/pull/18702#issuecomment-2613937192

   I have looped the test case with this patch:
   ![Screenshot from 2025-01-25 
19-37-58](https://github.com/user-attachments/assets/b65642c0-029e-4c61-b480-3794f132042d)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org