[jira] [Comment Edited] (KAFKA-14713) Kafka Streams global table startup takes too long

2023-02-16 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17690055#comment-17690055
 ] 

Matthias J. Sax edited comment on KAFKA-14713 at 2/16/23 11:52 PM:
---

Thanks for getting back. Glad it's resolved. I am not sure why though. 
Comparing 3.4 and 3.0 code, it seems they do the same thing.

In the end, if you have valid checkpoint on restart, you should not even hit 
`poll(pollMsPlusRequestTimeout)` during restore, because it should hold that 
`offset == highWatermark` and we should not enter the while-loop...

For K14442, we know that `offset == highWatermark - 1` (because we write the 
"incorrect" watermark into the checkpoint file), and thus `poll()` is executed 
and hangs because there is no data – the last "record" is just a commit marker.


was (Author: mjsax):
Thanks for getting back. Glad it's resolved. I am not sure why though. 
Comparing 3.4 and 3.0 code, it seems they do the same thing.

In the end, if you have valid checkpoint on restart, you should not even hit 
`poll(pollMsPlusRequestTimeout)` during restore, because it should hold that 
`offset == highWatermark` and we should not enter the while-loop...

For K14442, we know that `offset == highWatermark - 1` (because we write the 
"incorrect" watermark into the checkpoint file), and thus you thus `poll()` is 
executed and hang because there is no data – the last "record" is just a commit 
marker.

> Kafka Streams global table startup takes too long
> -
>
> Key: KAFKA-14713
> URL: https://issues.apache.org/jira/browse/KAFKA-14713
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.0.2
>Reporter: Tamas
>Priority: Critical
> Fix For: 3.4.0
>
>
> *Some context first*
> We have a spring based kafka streams application. This application is 
> listening to two topics. Let's call them apartment and visitor. The 
> apartments are stored in a global table, while the visitors are in the stream 
> we are processing, and at one point we are joining the visitor stream 
> together with the apartment table. In our test environment, both topics 
> contain 10 partitions.
> *Issue*
> At first deployment, everything goes fine, the global table is built and all 
> entries in the stream are processed.
> After everything is finished, we shut down the application, restart it and 
> send out a new set of visitors. The application seemingly does not respond.
> After some more debugging it turned out that it simply takes 5 minutes to 
> start up, because the global table takes 30 seconds (default value for the 
> global request timeout) to accept that there are no messages in the apartment 
> topics, for each and every partition. If we send out the list of apartments 
> as new messages, the application starts up immediately.
> To make matters worse, we have clients with 96 partitions, where the startup 
> time would be 48 minutes. Not having messages in the topics between 
> application shutdown and restart is a valid use case, so this is quite a big 
> problem.
> *Possible workarounds*
> We could reduce the request timeout, but since this value is not specific for 
> the global table initialization, but a global request timeout for a lot of 
> things, we do not know what else it will affect, so we are not very keen on 
> doing that. Even then, it would mean a 1.5 minute delay for this particular 
> client (more if we will have other use cases in the future where we will need 
> to use more global tables), which is far too much, considering that the 
> application would be able to otherwise start in about 20 seconds.
> *Potential solutions we see*
>  # Introduce a specific global table initialization timeout in 
> GlobalStateManagerImpl. Then we would be able to safely modify that value 
> without fear of making some other part of kafka unstable.
>  # Parallelize the initialization of the global table partitions in 
> GlobalStateManagerImpl: knowing that the delay at startup is constant instead 
> of linear with the number of partitions would be a huge help.
>  # As long as we receive a response, accept the empty map in the 
> KafkaConsumer, and continue instead of going into a busy-waiting loop.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-14713) Kafka Streams global table startup takes too long

2023-02-15 Thread Tamas (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17689533#comment-17689533
 ] 

Tamas edited comment on KAFKA-14713 at 2/16/23 5:51 AM:


Entry point would be the 
[GlobalStateManagerImpl|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java],
 where the 
[pollMsPlusRequestTimeout|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java#L115]
 is defined as POLL_MS_CONFIG (0.1 sec, which is fine), plus 
[REQUEST_TIMEOUT_MS_CONFIG|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java#L113]
 (30 sec, which is the part causing the problem). Then in 
[restoreState|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java#L240]
 we start [polling the 
globalConsumer|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java#L274]
 with this inflated poll timeout.

After that we jump into the 
[KafkaConsumer|https://github.com/apache/kafka/blob/3.0.2/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java],
 and start the 
[polling|https://github.com/apache/kafka/blob/3.0.2/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L1238]
 for new records, which will give us back an empty map, since there are no new 
entries coming in, and this loop will get stuck for 30 seconds.

Now the first set of links are pointing to the trunk, while the second set are 
pointing to the 3.0.2 tag, because the trunk has changed significantly since 
what I have, and the changes in 3.4.0 might actually solve my problem, so 
before continuing, let me spend some time trying to verify this (I have to 
admit previously I only checked the latest version of the 
GlobalStateManagerImpl, and there there were no significant changes yet) :)

Edit: okay, 3.4.0 does not have this issue anymore, which means that 
KAFKA-14442 might also be resolved with it. Anyway I'm closing this ticket. 
Thanks for the help :)


was (Author: JIRAUSER298942):
Entry point would be the 
[GlobalStateManagerImpl|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java],
 where the 
[pollMsPlusRequestTimeout|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java#L115]
 is defined as POLL_MS_CONFIG (0.1 sec, which is fine), plus 
[REQUEST_TIMEOUT_MS_CONFIG|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java#L113]
 (30 sec, which is the part causing the problem). Then in 
[restoreState|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java#L240]
 we start [polling the 
globalConsumer|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java#L274]
 with this inflated poll timeout.

After that we jump into the 
[KafkaConsumer|https://github.com/apache/kafka/blob/3.0.2/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java],
 and start the 
[polling|https://github.com/apache/kafka/blob/3.0.2/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L1238]
 for new records, which will give us back an empty map, since there are no new 
entries coming in, and this loop will get stuck for 30 seconds.

Now the first set of links are pointing to the trunk, while the second set are 
pointing to the 3.0.2 tag, because the trunk has changed significantly since 
what I have, and the changes in 3.4.0 might actually solve my problem, so 
before continuing, let me spend some time trying to verify this (I have to 
admit previously I only checked the latest version of the 
GlobalStateManagerImpl, and there there were no significant changes yet) :)

> Kafka Streams global table startup takes too long
> -
>
> Key: KAFKA-14713
> URL: https://issues.apache.org/jira/browse/KAFKA-14713
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.0.2
>Reporter: Tamas
>Priority: Critical
>
> *Some context first*
> We have a spring based kafka streams application. This application is 
> listening to two topics. Let's call them apartment and visitor. The 
> apartments are stored in a global table, while the visitors are in the 

[jira] [Comment Edited] (KAFKA-14713) Kafka Streams global table startup takes too long

2023-02-15 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17689356#comment-17689356
 ] 

Matthias J. Sax edited comment on KAFKA-14713 at 2/15/23 9:44 PM:
--

What version are you using? – Also, can you point me to the code where it 
actually waits/hangs (as you did already looked into it, it would be quicker 
this way). – I am not sure yet, if both issues are actually the same though or 
not. (Maybe the "eos" config on the other ticket is a red herring.) But I guess 
we can dig into it a little bit.


was (Author: mjsax):
What version are you using? – Also, can you point me to the code where is 
actually waits/hangs (as you did already looked into it, it would be quicker 
this way). – I am not sure yet, if the issue is still the same though or not. 
But I guess we can dig into it a little bit.

> Kafka Streams global table startup takes too long
> -
>
> Key: KAFKA-14713
> URL: https://issues.apache.org/jira/browse/KAFKA-14713
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Tamas
>Priority: Critical
>
> *Some context first*
> We have a spring based kafka streams application. This application is 
> listening to two topics. Let's call them apartment and visitor. The 
> apartments are stored in a global table, while the visitors are in the stream 
> we are processing, and at one point we are joining the visitor stream 
> together with the apartment table. In our test environment, both topics 
> contain 10 partitions.
> *Issue*
> At first deployment, everything goes fine, the global table is built and all 
> entries in the stream are processed.
> After everything is finished, we shut down the application, restart it and 
> send out a new set of visitors. The application seemingly does not respond.
> After some more debugging it turned out that it simply takes 5 minutes to 
> start up, because the global table takes 30 seconds (default value for the 
> global request timeout) to accept that there are no messages in the apartment 
> topics, for each and every partition. If we send out the list of apartments 
> as new messages, the application starts up immediately.
> To make matters worse, we have clients with 96 partitions, where the startup 
> time would be 48 minutes. Not having messages in the topics between 
> application shutdown and restart is a valid use case, so this is quite a big 
> problem.
> *Possible workarounds*
> We could reduce the request timeout, but since this value is not specific for 
> the global table initialization, but a global request timeout for a lot of 
> things, we do not know what else it will affect, so we are not very keen on 
> doing that. Even then, it would mean a 1.5 minute delay for this particular 
> client (more if we will have other use cases in the future where we will need 
> to use more global tables), which is far too much, considering that the 
> application would be able to otherwise start in about 20 seconds.
> *Potential solutions we see*
>  # Introduce a specific global table initialization timeout in 
> GlobalStateManagerImpl. Then we would be able to safely modify that value 
> without fear of making some other part of kafka unstable.
>  # Parallelize the initialization of the global table partitions in 
> GlobalStateManagerImpl: knowing that the delay at startup is constant instead 
> of linear with the number of partitions would be a huge help.
>  # As long as we receive a response, accept the empty map in the 
> KafkaConsumer, and continue instead of going into a busy-waiting loop.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-14713) Kafka Streams global table startup takes too long

2023-02-15 Thread Tamas (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17688945#comment-17688945
 ] 

Tamas edited comment on KAFKA-14713 at 2/15/23 8:52 AM:


Hi [~mjsax] looks similar, but not exactly. They see this issue with 
exactly_once_beta processing guarantee, while we have it with the default 
at_least_once. For us the important part is that is issue is fixed (or at least 
a safe workaround is provided) as soon as possible, because right now I am 
between a rock and a hard place because of it.


was (Author: JIRAUSER298942):
Hi [~mjsax] looks similar, but not exactly. They see this issue with 
exactly_once_beta processing guarantee, while we have it with the default 
at_least_once. For us the important part is that is issue is fixed as soon as 
possible, because right now I am between a rock and a hard place because of it.

> Kafka Streams global table startup takes too long
> -
>
> Key: KAFKA-14713
> URL: https://issues.apache.org/jira/browse/KAFKA-14713
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Tamas
>Priority: Critical
>
> *Some context first*
> We have a spring based kafka streams application. This application is 
> listening to two topics. Let's call them apartment and visitor. The 
> apartments are stored in a global table, while the visitors are in the stream 
> we are processing, and at one point we are joining the visitor stream 
> together with the apartment table. In our test environment, both topics 
> contain 10 partitions.
> *Issue*
> At first deployment, everything goes fine, the global table is built and all 
> entries in the stream are processed.
> After everything is finished, we shut down the application, restart it and 
> send out a new set of visitors. The application seemingly does not respond.
> After some more debugging it turned out that it simply takes 5 minutes to 
> start up, because the global table takes 30 seconds (default value for the 
> global request timeout) to accept that there are no messages in the apartment 
> topics, for each and every partition. If we send out the list of apartments 
> as new messages, the application starts up immediately.
> To make matters worse, we have clients with 96 partitions, where the startup 
> time would be 48 minutes. Not having messages in the topics between 
> application shutdown and restart is a valid use case, so this is quite a big 
> problem.
> *Possible workarounds*
> We could reduce the request timeout, but since this value is not specific for 
> the global table initialization, but a global request timeout for a lot of 
> things, we do not know what else it will affect, so we are not very keen on 
> doing that. Even then, it would mean a 1.5 minute delay for this particular 
> client (more if we will have other use cases in the future where we will need 
> to use more global tables), which is far too much, considering that the 
> application would be able to otherwise start in about 20 seconds.
> *Potential solutions we see*
>  # Introduce a specific global table initialization timeout in 
> GlobalStateManagerImpl. Then we would be able to safely modify that value 
> without fear of making some other part of kafka unstable.
>  # Parallelize the initialization of the global table partitions in 
> GlobalStateManagerImpl: knowing that the delay at startup is constant instead 
> of linear with the number of partitions would be a huge help.
>  # As long as we receive a response, accept the empty map in the 
> KafkaConsumer, and continue instead of going into a busy-waiting loop.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)