Hi Tomasz,

Thanks for asking. This sounds like the situation that we fixed in Apache Kafka 
3.0, with KIP-695 

Can you try upgrading and let us know if this fixes the problem?


On Mon, Sep 26, 2022, at 01:35, Tomasz Gac wrote:
> Hi group,
> I wrote a simple kafka streams application with topology such as below:
> builder.addStateStore(
>>     Stores.keyValueStoreBuilder(
>>     Stores.persistentKeyValueStore("STORE"),
>>     Serdes.String(), Serdes.String())
>>         .withLoggingEnabled(storeConfig))|
> builder.stream("TOPIC_1", Consumed.with(...))
>>     .merge(builder.stream("TOPIC_2", Consumed.with(...))
>>     .merge(builder.stream("TOPIC_3", Consumed.with(...))
>>     .map(...) // stateless
>>     .transform(..., "STORE")  // stateful
>     .to("TOPIC_4");
> All input topics have 6 partitions, and for the purpose of testing, we are
> producing data to partition number 5.
> We are using kafka streams version 2.8.1, broker version 2.12-2.1.1
> The application works as expected when it has caught up to the lag, eg.
> when reset tool is used with --to-latest parameter.
> However, when the application is processing the messages starting from the
> earliest offset, the inputs are provided in batches such as:
>    - ~1000 messages from TOPIC_1
>    - ~1000 messages from TOPIC_2
>    - ~1000 messages from TOPIC_3
> All of the messages have timestamps provided in headers, so I would expect
> the application to interleave the messages from these three topics so that
> their timestamps are in the ascending order.
> However, this is not the case that I am observing. The messages are
> processed in batches.
> How do I configure my application so that it processes messages in order
> when it is catching up to the lag?

Reply via email to