Hi,
So in my example what I notice is that records order is:
[1, (KA, AA1)] [2, (KA, AB1)] [3, (KB, B1)]
Stream is branched as:
input
    .branch(
        (k, v) -> //filter records of type AA,
        (k, v) -> //filter records of type AB,
        (k, v) -> //filter records of type B,
    )
Topology is
streamAA.join(streamAB).process(/*put joined value to state store*/)
streamB.transformValues(/*fetch joined value from state store and enrich
the value*/)

Once I see the logs I see the following order:
(KA, AA1)
(KA, AB1)
fetch for joined data
(KB, B1)
join [ (KA, AA1), (KA, AB1) ]
put joined record

Clearly it is trying to fetch data before it is made available to the state
store and that would be because join operation happens after the streamB
record is read.
As you mentioned that if there is re-partition then it may not hold.
I wanted to know when does kafka automatically re-partition the topics.
Just for simplicity in the example, I had omitted lot of steps like:
After branching I actually re-key each stream using map, and then apply a
transformer and again re-key them.

If this creates re-partition of these streams and order may not hold, then
what may be a good way to ensure streamB records are enriched with the
joined data of streamAA and streamAB.

Thanks
Sachin



On Sat, Feb 22, 2020 at 2:33 AM Guozhang Wang <wangg...@gmail.com> wrote:

> From the description it seems there's no repartition topics throughout your
> topology, in that case the ordering should be guaranteed. I.e. a record
> from the input topic would only be processed after all previous records
> from that same topics have been processed entirely; if there's repartition
> topics in between though, this may not hold.
>
> Also a minor thing that if your streamAA.join(streamBB) results are only
> needed for populating the store, you can use `process(..)` instead of
> `transform(..)` as it would not need a return value since it would be the
> end of this branch of the topology.
>
>
> Guozhang
>
> On Thu, Feb 20, 2020 at 7:32 PM Sachin Mittal <sjmit...@gmail.com> wrote:
>
> > Hi,
> > I wanted to understand if in this particular case my solution would work:
> > Say I have source records [timestamp, (K,V)] in input topic in following
> > order:
> > .. [1, (KA, AA1)] [2, (KA, AB1)] [3, (KB, B1)] ...
> >
> > I create multiple streams out of input stream as:
> > input
> >     .branch(
> >         (k, v) -> //filter records of type AA,
> >         (k, v) -> //filter records of type AB,
> >         (k, v) -> //filter records of type B,
> >     )
> > Then my topology is written in following way:
> > //join stream of type AA with AB and push its value in state store
> > streamAA.join(streamAB).transform( new TransformerSupplier() {
> transform(K
> > key, V value) {
> >                 // timestamp = extract timestamp from (key, value)
> >                 enrichMessageStore.put(key , value, timestamp);
> >                 return new KeyValue(key, value);
> > } } , "enrich-message-store")
> > //fetch the data from that state store and enrich streamB
> > streamB.transform(new TransformerSupplier() { transform(K key, V value) {
> >                 // (from, to) = extract (from, to) from (key, value)
> >                 result = enrichMessageStore.fetchAll( from, to );
> >                 //mutate value using value.enrich(result)
> >                 return new KeyValue(key, value);
> > } } , "enrich-message-store");
> >
> > So does kafka stream ensure that records of streamB would be processed
> only
> > after records of streamAA and streamAB are joined since they are in
> order?
> > Because if the operation of streamAA.join(streamAB) happens after
> > streamB.transform()
> > then it will not work.
> > I am assuming that since streamAA and streamAB types of records are
> before
> > streamB type of record in the input topic the join will also happen
> before.
> > If this assumption is not safe then is there any other way of ensuring.
> >
> > For now lets assume there is single partition of the input topic.
> >
> > Thanks
> > Sachin
> >
> >
> >
> >
> > On Fri, Feb 21, 2020 at 4:57 AM Guozhang Wang <wangg...@gmail.com>
> wrote:
> >
> > > Hello Sachin,
> > >
> > > 1) It seems from your source code, that in the stream2.transform you
> are
> > > generating a new value and return a new key-value pair:
> > >
> > > mutate value = enrich(value, result)
> > > return new KeyValue(key, value);
> > >
> > > ---------------
> > >
> > > Anyways, if you do not want to generate a new value object, and just
> > have a
> > > field function like this:
> > >
> > > value.enrich(result)
> > > return new KeyValue(key, value);
> > >
> > > ---------------
> > >
> > > That actually still works as long as your serde function recognize the
> > > optional enriched fields and can encode / decode the value object with
> /
> > > without the enriched fields.
> > >
> > > 2) And regarding your join scenario, if your join does not depend on
> any
> > > field, but only depending on the time-range (since you used fetchAll()
> > > which would return ALL the key-values falling into that range) I think
> > this
> > > would be the way to do it.
> > >
> > >
> > > Guozhang
> > >
> > >
> > > On Wed, Feb 19, 2020 at 10:25 PM Sachin Mittal <sjmit...@gmail.com>
> > wrote:
> > >
> > > > Hi,
> > > > I have two streams and I want to enrich stream2 records based off
> > stream1
> > > > records.
> > > > I really cannot join those two streams is because there is no common
> > key
> > > > between them.
> > > > Hence only way I can do that is using a timestamp field property.
> > > >
> > > > This is how I have built my pipeline.
> > > > .....
> > > > //create and add state store
> > > > final StoreBuilder<WindowStore<K, V>>> enrichStoreBuilder =
> > > > Stores.windowStoreBuilder(
> > > >             Stores.persistentWindowStore("enrich-message-store",
> > > > retentionSize, windowSize, false), ...);
> > > > builder.addStateStore(enrichStoreBuilder)
> > > > .....
> > > > //push the data to state store
> > > > stream1.transform(new TransformerSupplier() { transform(K key, V
> > value) {
> > > >                 // timestamp = extract timestamp from (key, value)
> > > >                 enrichMessageStore.put(key , value, timestamp);
> > > >                 return new KeyValue(key, value);
> > > > } } , " enrich-message-store");
> > > > .....
> > > > //fetch the data from state store for
> > > > stream2.transform(new TransformerSupplier() { transform(K key, V
> > value) {
> > > >                 // (from, to) = extract (from, to) from (key, value)
> > > >                 result = enrichMessageStore.fetchAll( from, to );
> > > >                 //mutate value = enrich(value, result)
> > > >                 return new KeyValue(key, value);
> > > > } } , " enrich-message-store");
> > > > .......
> > > >
> > > > So is this something that would work. Basically in one stream
> > > > transformation a state store is updated.
> > > > And in second stream transformation values from that state stores are
> > > > fetched and stream2 value is enriched from those values.
> > > > The enriched (mutated) value is returned back to downstream.
> > > >
> > > > So will this work and most important can we mutate the input value
> > itself
> > > > in transform function and return the same or we should always create
> > new
> > > > instances of the same.
> > > >
> > > > Is there any better way of doing this.
> > > >
> > > > Thanks
> > > > Sachin
> > > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
> --
> -- Guozhang
>

Reply via email to