Hi again, Tomasz,

Your issue is really bugging me, since I'm pretty sure it shouldn't be 
happening.

I went ahead and added an integration test with your exact scenario, as I 
understand it: https://github.com/apache/kafka/pull/12706

The test passes for me.

Do you think you can check it out and try adjusting the test setup until you're 
able to reproduce the behavior you're seeing? If you can do that, I think we 
will get to the bottom of it.

Thanks,
-John

On Fri, Sep 30, 2022, at 09:51, John Roesler wrote:
> Hi Tomasz,
>
> Thanks for trying that out. It’s not the way I’d expect it to work. I 
> don’t remember if there were any follow-up bugs that have been solved 
> in subsequent releases. Just as a long shot, perhaps you can try it on 
> the latest release (3.3.0)?
>
> Otherwise, I think the best path forward would be to file a bug report 
> on the Apache Kafka Jira with enough information to reproduce the issue 
> (or if you’re able to provide a repro, that would be awesome).
>
> Thanks, and sorry for the trouble. 
> -John
>
> On Tue, Sep 27, 2022, at 03:15, Tomasz Gac wrote:
>> I upgraded to kafka streams 3.0.0 with positive task.max.idle.ms and it did
>> not help.
>> When lag is large, the application still consumes data batches without
>> interleaving.
>>
>>
>>
>> wt., 27 wrz 2022 o 05:51 John Roesler <vvcep...@apache.org> napisał(a):
>>
>>> Hi Tomasz,
>>>
>>> Thanks for asking. This sounds like the situation that we fixed in Apache
>>> Kafka 3.0, with KIP-695 (
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-695%3A+Further+Improve+Kafka+Streams+Timestamp+Synchronization
>>> ).
>>>
>>> Can you try upgrading and let us know if this fixes the problem?
>>>
>>> Thanks,
>>> -John
>>>
>>> On Mon, Sep 26, 2022, at 01:35, Tomasz Gac wrote:
>>> > Hi group,
>>> >
>>> > I wrote a simple kafka streams application with topology such as below:
>>> >
>>> > builder.addStateStore(
>>> >>     Stores.keyValueStoreBuilder(
>>> >>     Stores.persistentKeyValueStore("STORE"),
>>> >>     Serdes.String(), Serdes.String())
>>> >>         .withLoggingEnabled(storeConfig))|
>>> >
>>> >
>>> >
>>> > builder.stream("TOPIC_1", Consumed.with(...))
>>> >>     .merge(builder.stream("TOPIC_2", Consumed.with(...))
>>> >>     .merge(builder.stream("TOPIC_3", Consumed.with(...))
>>> >>     .map(...) // stateless
>>> >>     .transform(..., "STORE")  // stateful
>>> >
>>> >     .to("TOPIC_4");
>>> >
>>> >
>>> > All input topics have 6 partitions, and for the purpose of testing, we
>>> are
>>> > producing data to partition number 5.
>>> > We are using kafka streams version 2.8.1, broker version 2.12-2.1.1
>>> >
>>> > The application works as expected when it has caught up to the lag, eg.
>>> > when reset tool is used with --to-latest parameter.
>>> > However, when the application is processing the messages starting from
>>> the
>>> > earliest offset, the inputs are provided in batches such as:
>>> >
>>> >    - ~1000 messages from TOPIC_1
>>> >    - ~1000 messages from TOPIC_2
>>> >    - ~1000 messages from TOPIC_3
>>> >
>>> > All of the messages have timestamps provided in headers, so I would
>>> expect
>>> > the application to interleave the messages from these three topics so
>>> that
>>> > their timestamps are in the ascending order.
>>> > However, this is not the case that I am observing. The messages are
>>> > processed in batches.
>>> >
>>> > How do I configure my application so that it processes messages in order
>>> > when it is catching up to the lag?
>>>

Reply via email to