Hi Vasily,

Probably in this case, with the constraints you’re providing, the first branch 
would output first, but I wouldn’t depend on it. Any small change in your 
program could mess this up, and also any change in Streams could alter the 
exact execution order also. 

The right way to think about these programs is as “data flows”. You’re taking a 
stream of data and defining two separate branches into smaller streams, and 
then later on merging those back into one stream. In general, there would be no 
defined ordering, just like if you imagine doing the same thing with literal 
water streams. 

If you want a guarantee about the relative ordering, You’d have to use a 
specific operator that does what you want. If nothing else comes to mind, then 
a custom transformer or processor that gets records from both branches, and 
buffers records from the second so that it can emit the record from the first 
branch first would do the trick. 

Thanks,
John

On Tue, Dec 3, 2019, at 06:32, Vasily Sulatskov wrote:
> Hello,
> 
> I wonder if ordering of the messages is preserved by kafka streams when 
> the messages are processes by the same sub-topology without 
> redistribution and in the end there are multiple sinks for the same 
> topic. 
> 
> I couldn't find the answer to this question in the docs/mailing 
> list/stack overflow.
> 
> You can arrive to this situation with the code like this:
>  
> val source = builder.stream[Key, Value]("input")
> source
>   .filter(...)
>   .mapValues(...)
>   .transform(...)
>   .to("output")
> 
> source
>   .filter(...)
>   .mapValues(...)
>   .transform(...)
>   .to("output")
> 
> Basically it's two different processing branches, that process each 
> input value slightly differently. I.e. if one branch produces a 
> message, in response to an input message, the other branch will produce 
> the message as well. So keeping the ordering in this case means, all 
> messages produces for earlier source messages on one branch should 
> precede messages produced by the other branch for later source messages.
> 
> Here's my topology:
> 
>   Sub-topology: 2
>     Source: KSTREAM-SOURCE-0000000019 (topics: [input])
>       --> KSTREAM-MAPVALUES-0000000020
>     Processor: KSTREAM-MAPVALUES-0000000020 (stores: [])
>       --> KSTREAM-MAPVALUES-0000000022, KSTREAM-TRANSFORM-0000000021
>       <-- KSTREAM-SOURCE-0000000019
>     Processor: KSTREAM-MAPVALUES-0000000022 (stores: [])
>       --> KSTREAM-TRANSFORM-0000000023
>       <-- KSTREAM-MAPVALUES-0000000020
>     Processor: KSTREAM-TRANSFORM-0000000021 (stores: [store1])
>       --> KSTREAM-MAP-0000000027
>       <-- KSTREAM-MAPVALUES-0000000020
>     Processor: KSTREAM-TRANSFORM-0000000023 (stores: [store2])
>       --> KSTREAM-MAP-0000000024
>       <-- KSTREAM-MAPVALUES-0000000022
>     Processor: KSTREAM-MAP-0000000024 (stores: [])
>       --> KSTREAM-FILTER-0000000025
>       <-- KSTREAM-TRANSFORM-0000000023
>     Processor: KSTREAM-MAP-0000000027 (stores: [])
>       --> KSTREAM-FILTER-0000000028
>       <-- KSTREAM-TRANSFORM-0000000021
>     Processor: KSTREAM-FILTER-0000000025 (stores: [])
>       --> KSTREAM-SINK-0000000026
>       <-- KSTREAM-MAP-0000000024
>     Processor: KSTREAM-FILTER-0000000028 (stores: [])
>       --> KSTREAM-SINK-0000000029
>       <-- KSTREAM-MAP-0000000027
>     Sink: KSTREAM-SINK-0000000026 (topic: output)
>       <-- KSTREAM-FILTER-0000000025
>     Sink: KSTREAM-SINK-0000000029 (topic: output)
>       <-- KSTREAM-FILTER-0000000028
> 
> On one hand I guess that it all information coming from one partition 
> will be processed by one thread, so it can keep the order of the 
> messages, but on the other hand I see two independent sinks in the 
> topology, with independent buffers etc I guess. So in the end I am not 
> sure what's going to happen.
> 
> I would guess that it can work because sinks probably have the same 
> buffer size, but it's not guaranteed. I can imagine a following failure 
> scenario: a write by one sink can succeed while the write by the other 
> sink fails, so a batch of messages gets delivered to the output 
> partition out of order. 
> 
> Can someone please clarify what happens in this case? Is there an 
> ordering guarantee? Can this streams be merged while preserving 
> ordering?
> 
> I know that regular Source.merge() doesn't preserve ordering, but in 
> this case I know that there's no repartitioning etc, and messages 
> basically appear on the same "tick", so it feels like there should be a 
> way to do this. Can I keep ordering if I replace my transformers with 
> processors and manually connect them to the same sink?
> 
> 
> --
> Best regards,
> Vasily Sulatskov
> 
>

Reply via email to