Re: [PR] KAFKA-18645: New consumer should align close timeout handling with classic consumer [kafka]
lianetm commented on PR #18702: URL: https://github.com/apache/kafka/pull/18702#issuecomment-2636978057 Merged to trunk and cherry picked to 4.0 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18645: New consumer should align close timeout handling with classic consumer [kafka]
lianetm merged PR #18702: URL: https://github.com/apache/kafka/pull/18702 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18645: New consumer should align close timeout handling with classic consumer [kafka]
frankvicky commented on PR #18702: URL: https://github.com/apache/kafka/pull/18702#issuecomment-2635593790 Sure! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18645: New consumer should align close timeout handling with classic consumer [kafka]
lianetm commented on PR #18702: URL: https://github.com/apache/kafka/pull/18702#issuecomment-2634993016 @frankvicky could you please merge trunk latest changes? The failing transactions test has been fixed with https://github.com/apache/kafka/pull/18793 Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18645: New consumer should align close timeout handling with classic consumer [kafka]
frankvicky commented on code in PR #18702: URL: https://github.com/apache/kafka/pull/18702#discussion_r1941409678 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1368,6 +1373,10 @@ private void close(Duration timeout, boolean swallowException) { } } +private Timer createTimer(Duration timeout) { Review Comment: Yes, it is nice to follow the style of classic consumers. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18645: New consumer should align close timeout handling with classic consumer [kafka]
lianetm commented on code in PR #18702: URL: https://github.com/apache/kafka/pull/18702#discussion_r1941349655 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1368,6 +1373,10 @@ private void close(Duration timeout, boolean swallowException) { } } +private Timer createTimer(Duration timeout) { Review Comment: nit: should we show this is for the close op? `createTimerForCloseRequests` or similar maybe? ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1368,6 +1373,10 @@ private void close(Duration timeout, boolean swallowException) { } } +private Timer createTimer(Duration timeout) { +return time.timer(Duration.ofMillis(Math.min(timeout.toMillis(), requestTimeoutMs))); Review Comment: ```suggestion return time.timer(Math.min(timeout.toMillis(), requestTimeoutMs)); ``` ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1368,6 +1373,10 @@ private void close(Duration timeout, boolean swallowException) { } } +private Timer createTimer(Duration timeout) { +return time.timer(Duration.ofMillis(Math.min(timeout.toMillis(), requestTimeoutMs))); Review Comment: Should we add a null check to the time obj here? Similar to what the classic does, since close can be called from the constructor at any point (finally) if something fails. ## clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java: ## @@ -1782,6 +1782,13 @@ public void close() { * timeout. If the consumer is unable to complete offset commits and gracefully leave the group * before the timeout expires, the consumer is force closed. Note that {@link #wakeup()} cannot be * used to interrupt close. + * + * The actual maximum wait time is bounded by the {@link ConsumerConfig#REQUEST_TIMEOUT_MS_CONFIG} setting, which + * only applies to operations performed with the coordinator (coordinator-related requests and Review Comment: Should we say this instead? (could be to the coordinator or the leader) ```suggestion * only applies to operations performed with the broker (coordinator-related requests and ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18645: New consumer should align close timeout handling with classic consumer [kafka]
frankvicky commented on code in PR #18702: URL: https://github.com/apache/kafka/pull/18702#discussion_r1940744792 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1326,7 +1331,7 @@ private void close(Duration timeout, boolean swallowException) { // We are already closing with a timeout, don't allow wake-ups from here on. wakeupTrigger.disableWakeups(); -final Timer closeTimer = time.timer(timeout); +final Timer closeTimer = createTimer(timeout); Review Comment: Yes, just walk through the corresponding logic of the classic consumer. The callback on close doesn't consume the time for the timer, to align the behavior, I think it's ok to remove the timer from `runRebalanceCallbacksOnClose` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18645: New consumer should align close timeout handling with classic consumer [kafka]
chia7712 commented on code in PR #18702: URL: https://github.com/apache/kafka/pull/18702#discussion_r1940738655 ## clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java: ## @@ -153,7 +153,8 @@ public class CommonClientConfigs { public static final String REQUEST_TIMEOUT_MS_DOC = "The configuration controls the maximum amount of time the client will wait " + "for the response of a request. If the response is not received before the timeout " + "elapses the client will resend the request if necessary or fail the request if " - + "retries are exhausted."; + + "retries are exhausted. This timeout also applies to the consumer close operation - " + + "even if a larger timeout is specified for close, it will be limited by this value."; Review Comment: It's crucial to highlight that neither the `OffsetCommitCallback` nor the `ConsumerRebalanceListener` callbacks consume time from the close timeout. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18645: New consumer should align close timeout handling with classic consumer [kafka]
chia7712 commented on code in PR #18702: URL: https://github.com/apache/kafka/pull/18702#discussion_r1940736112 ## clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java: ## @@ -153,7 +153,8 @@ public class CommonClientConfigs { public static final String REQUEST_TIMEOUT_MS_DOC = "The configuration controls the maximum amount of time the client will wait " + "for the response of a request. If the response is not received before the timeout " + "elapses the client will resend the request if necessary or fail the request if " - + "retries are exhausted."; + + "retries are exhausted. This timeout also applies to the consumer close operation - " + + "even if a larger timeout is specified for close, it will be limited by this value."; Review Comment: > isn't it enough to explain the behaviour on the close API java doc? +1 to add docs to close API -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18645: New consumer should align close timeout handling with classic consumer [kafka]
lianetm commented on code in PR #18702: URL: https://github.com/apache/kafka/pull/18702#discussion_r1939721414 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1326,7 +1331,7 @@ private void close(Duration timeout, boolean swallowException) { // We are already closing with a timeout, don't allow wake-ups from here on. wakeupTrigger.disableWakeups(); -final Timer closeTimer = time.timer(timeout); +final Timer closeTimer = createTimer(timeout); Review Comment: not introduced here but affected with this change. I notice that `runRebalanceCallbacksOnClose` consumes time from the close timeout, right? (receives the timer just to update it). But that behaviour is not the same in the Classic Consumer. In the classic, the close timeout only applies to requests really. The callbacks run when closing the Abstract coordinator, without time boundaries, and most importantly, without consuming time from the close timeout. We `runRebalanceCallbacksOnClose` without time boundaries too, but it does consume the time from the timeout param, right? Wouldn't that potentially leave less time for the following requests? I'm concerned about existing apps, running callbacks, calling close with a timeout that used to be "enough", but now it may not be. Should we simply remove the timer from the runRebalanceCallbacksOnClose? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18645: New consumer should align close timeout handling with classic consumer [kafka]
lianetm commented on code in PR #18702: URL: https://github.com/apache/kafka/pull/18702#discussion_r1939721414 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1326,7 +1331,7 @@ private void close(Duration timeout, boolean swallowException) { // We are already closing with a timeout, don't allow wake-ups from here on. wakeupTrigger.disableWakeups(); -final Timer closeTimer = time.timer(timeout); +final Timer closeTimer = createTimer(timeout); Review Comment: not introduced here but affected with this change. I notice that `runRebalanceCallbacksOnClose` consumes time from the close timeout (receives the timer just to update it). But that behaviour is not the same in the Classic Consumer. In the classic, the close timeout only applies to requests really. The callbacks run when closing the Abstract coordinator, without time boundaries, and most importantly, without consuming time from the close timeout. We `runRebalanceCallbacksOnClose` without time boundaries too, but it does consume the time from the timeout param, right? Wouldn't that potentially leave less time for the following requests? I'm concerned about existing apps, running callbacks, calling close with a timeout that used to be "enough", but now it may not be. Should we simply remove the timer from the runRebalanceCallbacksOnClose? ## clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java: ## @@ -153,7 +153,8 @@ public class CommonClientConfigs { public static final String REQUEST_TIMEOUT_MS_DOC = "The configuration controls the maximum amount of time the client will wait " + "for the response of a request. If the response is not received before the timeout " + "elapses the client will resend the request if necessary or fail the request if " - + "retries are exhausted."; + + "retries are exhausted. This timeout also applies to the consumer close operation - " + + "even if a larger timeout is specified for close, it will be limited by this value."; Review Comment: well just for our understanding and related to my other comment: the classic behaviour we're trying to keep is that the request timeout applies to operations performed with the coordinator and the leader (coordinator-related requests and fetch sessions), not to other close operations that do not perform any request. I'm not suggesting to add any of that here, would polute this config doc imo. Actually, since it's really specific to consumer.close, isn't it enough to explain the behaviour on the close API java doc? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18645: New consumer should align close timeout handling with classic consumer [kafka]
kirktrue commented on PR #18702: URL: https://github.com/apache/kafka/pull/18702#issuecomment-2625755604 > Hi @kirktrue > > > I'd like to see a sanity check unit test added to somewhere like KafkaConsumerTest that ensures that the value of request.timeout.ms is used over the timeout passed in. If that's the intended and documented behavior, we should validate it to be so. > > I think it's not easy to check it. Could we rely on `ConsumeBounceTest#testClose` to validate this behavior ? If it's a lot of work, I guess we can skip it and assume `testClose()` validates the desired behavior. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18645: New consumer should align close timeout handling with classic consumer [kafka]
frankvicky commented on PR #18702: URL: https://github.com/apache/kafka/pull/18702#issuecomment-2625037027 Hi @kirktrue > I'd like to see a sanity check unit test added to somewhere like KafkaConsumerTest that ensures that the value of request.timeout.ms is used over the timeout passed in. If that's the intended and documented behavior, we should validate it to be so. I think it's not easy to check it. Could we rely on `ConsumeBounceTest#testClose` to validate this behavior ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18645: New consumer should align close timeout handling with classic consumer [kafka]
kirktrue commented on PR #18702: URL: https://github.com/apache/kafka/pull/18702#issuecomment-2623352548 `request.timeout.ms` is designed to apply to _each_ discrete network request whereas the close timeout applies to the _entire_ `KafkaConsumer.close()` call. If the user provides a timeout of 60 seconds to `close()`, why should it give up after only 30 seconds? As a parallel, consider the relationship between `request.timeout.ms` and [`delivery.timeout.ms`](https://kafka.apache.org/documentation/#producerconfigs_delivery.timeout.ms) in the `KafkaProducer.send()` call. In that case, it's `delivery.timeout.ms`, not `request.timeout.ms`, that serves as the deadline by which all of the network calls in `send()` must complete. That said, let's fix this gap with the solution as proposed. No one has complained about the behavior of the existing consumer timeout. I have no intention of dying on this hill; I just personally find it confusing 😄 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18645: New consumer should align close timeout handling with classic consumer [kafka]
frankvicky commented on PR #18702: URL: https://github.com/apache/kafka/pull/18702#issuecomment-2619592934 Yes, thanks for the further explanation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18645: New consumer should align close timeout handling with classic consumer [kafka]
lianetm commented on PR #18702: URL: https://github.com/apache/kafka/pull/18702#issuecomment-2619566956 > classic consumer only used the timer for the coordinator and fetcher True, but let's keep in mind that translating that to the new consumer means that we should apply the min to the steps to commit, leave, fetcher close, and network client close I expect (because this last step will wait for any of the requests generated on the previous steps). Makes sense? (Agree that we shouldn't apply it blindly) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18645: New consumer should align close timeout handling with classic consumer [kafka]
frankvicky commented on PR #18702: URL: https://github.com/apache/kafka/pull/18702#issuecomment-2619541564 > The fetch improvements of [KAFKA-7109](https://issues.apache.org/jira/browse/KAFKA-7109) seems to just apply to the fetch requests on close the same timeout principle that was being applied to the coordinator close. According to the commit you mentioned, I think you are right. This behavior has existed for a long time and was recently applied to fetch requests on close at 3.5.0. > The change to the timeout behavior was introduced relatively recently in 3.5 via [KAFKA-7109](https://issues.apache.org/jira/browse/KAFKA-7109). Looking at https://github.com/apache/kafka/pull/12590, I'm not sure the change to ignore the user's timeout was necessarily intentional. > When closing, individual network requests should adhere to request.timeout.ms, but the overall timeout for closing should adhere to the user-provided timeout. This reminds me that the patch applied this close timeout principle to all closing behavior and I'm not sure if it is what we want. IIRC, classic consumer only used the timer for the coordinator and fetcher. 🤔 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18645: New consumer should align close timeout handling with classic consumer [kafka]
lianetm commented on PR #18702: URL: https://github.com/apache/kafka/pull/18702#issuecomment-2619153324 My understanding is that this behaviour on close regarding the timeout has been in place for a long time (introduced here https://github.com/apache/kafka/commit/f72203ee9223d3b724ee67bdad9912612dd72f63). The fetch improvements of KAFKA-7109 seems to just apply to the fetch requests on close the same timeout principle that was being applied to the coordinator close. Makes sense? I'll be taking a full pass today. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18645: New consumer should align close timeout handling with classic consumer [kafka]
kirktrue commented on PR #18702: URL: https://github.com/apache/kafka/pull/18702#issuecomment-2616799271 So given this configuration: ``` request.timeout.ms=3 group.protocol=classic ``` When the user calls `Consumer.close(Duration.ofSeconds(60))`, it will complete in 30 seconds, not 60 seconds? If so, it means the user's timeout parameter is ignored, right? The change to the timeout behavior was introduced relatively recently in 3.5 via KAFKA-7109. Looking at #12590, I'm not sure the change to ignore the user's timeout was necessarily intentional. When closing, individual network requests should adhere to `request.timeout.ms`, but the overall timeout for closing should adhere to the user-provided timeout. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18645: New consumer should align close timeout handling with classic consumer [kafka]
frankvicky commented on PR #18702: URL: https://github.com/apache/kafka/pull/18702#issuecomment-2613966221 doc preview:  -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18645: New consumer should align close timeout handling with classic consumer [kafka]
frankvicky commented on PR #18702: URL: https://github.com/apache/kafka/pull/18702#issuecomment-2613937192 I have looped the test case with this patch:  -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org