I had heard when doing a join, the timestamp of the generated
message is taken from the message triggering the join or the biggest
timestamp of the two.

In older versions it was the timestamp of the record that triggered the join. Since 2.3, it is the maximum of both (cf https://issues.apache.org/jira/browse/KAFKA-6455)

You don't need to do anything for this. It's hard-coded. Of course, if you want you _could_ manually change the timestamp as pointed out by Luke.

For proper timestamp ordering, you should also upgrade to 3.0 to get the latest changes (ie, the KIP improvements mentioned by Luke).

-Matthias

On 12/5/21 12:14 AM, Luke Chen wrote:
Hi Miguel,
Of course you can use "Processor API" to achieve what you want. But it
needs more coding.

Alternatively, I think you can define a better value for "*max.task.idle.ms
<http://max.task.idle.ms>*" configuration. Default value is 0, which means
it basically doesn't wait for more data in empty partitions. You can check
the doc here
<https://kafka.apache.org/30/documentation/streams/developer-guide/config-streams#max-task-idle-ms>,
and also the KIP-695
<https://cwiki.apache.org/confluence/display/KAFKA/KIP-695%3A+Further+Improve+Kafka+Streams+Timestamp+Synchronization>,
KIP-353
<https://cwiki.apache.org/confluence/display/KAFKA/KIP-353%3A+Improve+Kafka+Streams+Timestamp+Synchronization>
for more information.

Thank you.
Luke

On Sat, Dec 4, 2021 at 2:10 AM Miguel González <miguel.gonza...@klar.mx>
wrote:

Hello

So I've been using a Streams app to join two input topics... the messages
have a certain order... but I have seen the messages on the output topic
arriving with a different ordering .... Even before, when doing a
map/flatmap operation are processed with different ordering.

Example:

Stream 1: A---B---C---D
Stream 2: A--B--C--D

Output topic: BB---A---AA---CC---DD

I need it to be A--AA--BB---CC---DD

Is there a way that Kafka Streams guarantees the order of messages in the
output topic? I had heard when doing a join, the timestamp of the generated
message is taken from the message triggering the join or the biggest
timestamp of the two. I don't know if this is the case, if it's done... is
that an automatic process or do I need to set the timestamp somewhere?

Another option I have read about is doing something like this:
https://dzone.com/articles/how-to-order-streamed-dataframes where the
author is doing some kind of sorting using an AbstractProcessor, which
seems like a pretty old article. But right I guess I could use a
Transformer or a Processor


Any guidance is really appreciated!

many thanks
- Miguel


Reply via email to