@Guozhang Wang

Thanks for the reply.  Indeed I am finding it difficult to explain this
state. I checked the code many times. There can be a bug but I fail to see
it. There are several things about the Kafka streams that I don't
understand, which makes it harder for me to reason.

1. What is the partition key for the changelog topics? Is it the same as
the Input key or the state store key? Or maybe the thread specifies the
partition as it knows the input partition it is subscribed to? If the input
topic and state store are differently partitioned then we can explain the
issue here.
2. Is there a background thread to persist in the state store when caching
is disabled? When will the app commit the log for the input topic? Is it
when sink writes into the output topic or when the state store writes into
the changelog topic? Because, if the app commits the record before the data
was written to changelog topic then we can again explain this state

>You may also consider upgrading to 2.6.x or higher version and see if this
issue goes away.
Do you mean the client or the Kafka broker? I will be upgrading the client
to 2.7.0 soon.

Sadly looking into the timestamp will not help much as we use some business
time field to set the record timestamp. If I am right, there is no way now
to know that when a Producer wrote a record in a Kafka topic.

Regards,
Mangat



On Wed, Apr 7, 2021 at 6:22 PM Guozhang Wang <wangg...@gmail.com> wrote:

> Hello Mangat,
>
> With at least once, although some records maybe processed multiple times
> their process ordering should not be violated, so what you observed is not
> expected. What caught my eyes are this section in your output changelogs
> (high-lighted):
>
> Key1, V1
> Key1, null
> Key1, V1
> Key1, null  (processed again)
> Key1, V2
> Key1, null
>
> *Key1, V1Key1,V2*
> Key1, V2+V1 (I guess we didn't process V2 tombstone yet but reprocessed V1
> again due to reassignment)
>
> They seem to be the result of first receiving a tombstone which removes V1
> and then a new record that adds V2. However, since caching is disabled you
> should get
>
> *Key1,V1*
> *Key1,null*
> *Key1,V2*
>
> instead; without the actual code snippet I cannot tell more what's
> happening here. If you can look into the logs you can record each time when
> partition migrates, how many records from the changelog was replayed to
> restore the store, and from which offset on the input topic does Streams
> resume processing. You may also consider upgrading to 2.6.x or higher
> version and see if this issue goes away.
>
>
> Guozhang
>
> On Tue, Apr 6, 2021 at 8:38 AM mangat rai <mangatm...@gmail.com> wrote:
>
> > Hey,
> >
> > We have the following setup in our infrastructure.
> >
> >    1. Kafka - 2.5.1
> >    2. Apps use kafka streams `org.apache.kafka` version 2.5.1 library
> >    3. Low level processor API is used with *atleast-once* semantics
> >    4. State stores are *in-memory* with *caching disabled* and *changelog
> >    enabled*
> >
> >
> > Is it possible that during state replication and partition reassignment,
> > the input data is not always applied to the state store?
> >
> > 1. Let's say the input topic is having records like following
> >
> > ```
> > Key1, V1
> > Key1, null (tombstone)
> > Key1, V2
> > Key1, null
> > Key1, V3
> > Key1, V4
> > ```
> > 2. The app has an aggregation function which takes these record and
> update
> > the state store so that changelog shall be
> >
> > ```
> > Key1, V1
> > Key1, null (tombstone)
> > Key1, V2
> > Key1, null
> > Key1, V3
> > Key1, V3 + V4
> > ```
> > Let's say the partition responsible for processing the above key was
> > several times reallocated to different threads due to some infra issues
> we
> > are having(in Kubernetes where we run the app, not the Kafka cluster).
> >
> > I see the following record in the changelogs
> >
> > ```
> > Key1, V1
> > Key1, null
> > Key1, V1
> > Key1, null  (processed again)
> > Key1, V2
> > Key1, null
> > Key1, V1
> > Key1,V2
> > Key1, V2+V1 (I guess we didn't process V2 tombstone yet but reprocessed
> V1
> > again due to reassignment)
> > Key1,V1 (V2 is gone as there was a tombstone, but then V1 tombstone
> should
> > have been applied also!!)
> > Key1, V2+V1 (it is back!!!)
> > Key1,V1
> > Key1, V1 + V2 + V3 (This is the final state)!
> > ```
> >
> > If you see this means several things
> > 1. The state is always correctly applied locally (in developer laptop),
> > where there were no reassignments.
> > 2. The records are processed multiple times, which is understandable as
> we
> > have at least symantics here.
> > 3. As long as we re-apply the same events in the same orders we are
> golden
> > but looks like some records are skipped, but here it looks as if we have
> > multiple consumers reading and update the same topics, leading to race
> > conditions.
> >
> > Is there any way, Kafka streams' state replication could lead to such a
> > race condition?
> >
> > Regards,
> > Mangat
> >
>
>
> --
> -- Guozhang
>

Reply via email to