Guozhang,

Yes, you are correct. We have our own group processor. I have more
information now.

1. I added ThreadId in the data when the app persists into the changelog
topic.
2. Thread-2 which was working with partition-0 had a timeout issue.
4. Thread-4 picked up this partition-0 as I can see its Id in the changelog.
5. *But then Thread-2 and Thread-4 both were writing into the partition-0
of the changelog, that too for the same key.*

So I was clearly able to see that two threads were overwriting data of one
another into the state store leading to a corrupted state. This confirms my
theory that it was an issue of concurrent update. This was something
totally unexpected. I suspect that Thread-2 continues to persist its
in-memory state, maybe because It wasn't stopped after the timeout
exception. Is there a configuration possible in the Kafka stream which
could lead to this?

There was no network issue, our CPU was highly throttled by Kubernetes. We
gave more resources, also decreased the fetch-size so we have more I/O to
Cpu time ratio than before, and then there was no timeout issue, hence no
reassignment and hence no corrupted state.

I really appreciate your help here...
Thanks!
Mangat


On Wed, Apr 14, 2021 at 8:48 PM Guozhang Wang <wangg...@gmail.com> wrote:

> Hey Mangat,
>
> A. With at least once, Streams does not make sure atomicity of 1) / 2);
> with exactly once, atomicity is indeed guaranteed with transactional
> messaging.
>
> B. If you are using processor API, then I'm assuming you did your own
> group-by processor right? In that case, the partition key would just be the
> record key when you are sending to the repartition topic.
>
>
> Guozhang
>
>
>
>
> On Thu, Apr 8, 2021 at 9:00 AM mangat rai <mangatm...@gmail.com> wrote:
>
> > Thanks again, that makes things clear. I still have some questions here
> > then.
> >
> > A.  For each record we read, we do two updates
> >       1. Changelog topic of the state store.
> >       2. Output topic aka sink.
> >       Does the Kafka stream app make sure that either both are committed
> or
> > neither?
> >
> > B.  Out Input topic actually has the as (a,b,c), but we partition with
> only
> > (a). We do this because we have different compaction requirements than
> the
> > partitions. It will still work as all (a,b,c) records will go to the same
> > partition. Now in aggregation, we group by (a,b,c). In such case what
> will
> > be the partition key for the changelog topic?
> >
> > Note that we use low-level processor API and don't commit ourselves.
> >
> > Regards,
> > Mangat
> >
> >
> >
> >
> > On Thu, Apr 8, 2021 at 5:37 PM Guozhang Wang <wangg...@gmail.com> wrote:
> >
> > > Hi Mangat,
> > >
> > > Please see my replies inline below.
> > >
> > > On Thu, Apr 8, 2021 at 5:34 AM mangat rai <mangatm...@gmail.com>
> wrote:
> > >
> > > > @Guozhang Wang
> > > >
> > > > Thanks for the reply.  Indeed I am finding it difficult to explain
> this
> > > > state. I checked the code many times. There can be a bug but I fail
> to
> > > see
> > > > it. There are several things about the Kafka streams that I don't
> > > > understand, which makes it harder for me to reason.
> > > >
> > > > 1. What is the partition key for the changelog topics? Is it the same
> > as
> > > > the Input key or the state store key? Or maybe the thread specifies
> the
> > > > partition as it knows the input partition it is subscribed to? If the
> > > input
> > > > topic and state store are differently partitioned then we can explain
> > the
> > > > issue here.
> > > >
> > >
> > > In Kafka Stream's changelog, the "partition key" of Kafka messages is
> the
> > > same as the "message key" itself. And the message key is the same as
> the
> > > state store key.
> > >
> > > Since the state store here should be storing the running aggregate, it
> > > means that the partition key is the same as the aggregated key.
> > >
> > > If you are doing a group-by aggregation here, where the group-by keys
> are
> > > different from the source input topic's keys, hence the state store
> keys
> > > would be different with the input topic keys.
> > >
> > >
> > > > 2. Is there a background thread to persist in the state store when
> > > caching
> > > > is disabled? When will the app commit the log for the input topic? Is
> > it
> > > > when sink writes into the output topic or when the state store writes
> > > into
> > > > the changelog topic? Because, if the app commits the record before
> the
> > > data
> > > > was written to changelog topic then we can again explain this state
> > > >
> > > > The commit happens *after* the local state store, as well as the
> > > changelog records sent by the Streams' producers, have been flushed.
> I.e.
> > > if there's a failure in between, you would re-process some source
> records
> > > and hence cause duplicates, but no data loss (a.k.a. the at_least_once
> > > semantics).
> > >
> > >
> > >
> > > > >You may also consider upgrading to 2.6.x or higher version and see
> if
> > > this
> > > > issue goes away.
> > > > Do you mean the client or the Kafka broker? I will be upgrading the
> > > client
> > > > to 2.7.0 soon.
> > > >
> > > > I meant the client.
> > >
> > >
> > > > Sadly looking into the timestamp will not help much as we use some
> > > business
> > > > time field to set the record timestamp. If I am right, there is no
> way
> > > now
> > > > to know that when a Producer wrote a record in a Kafka topic.
> > > >
> > > > Regards,
> > > > Mangat
> > > >
> > > >
> > > >
> > > > On Wed, Apr 7, 2021 at 6:22 PM Guozhang Wang <wangg...@gmail.com>
> > wrote:
> > > >
> > > > > Hello Mangat,
> > > > >
> > > > > With at least once, although some records maybe processed multiple
> > > times
> > > > > their process ordering should not be violated, so what you observed
> > is
> > > > not
> > > > > expected. What caught my eyes are this section in your output
> > > changelogs
> > > > > (high-lighted):
> > > > >
> > > > > Key1, V1
> > > > > Key1, null
> > > > > Key1, V1
> > > > > Key1, null  (processed again)
> > > > > Key1, V2
> > > > > Key1, null
> > > > >
> > > > > *Key1, V1Key1,V2*
> > > > > Key1, V2+V1 (I guess we didn't process V2 tombstone yet but
> > reprocessed
> > > > V1
> > > > > again due to reassignment)
> > > > >
> > > > > They seem to be the result of first receiving a tombstone which
> > removes
> > > > V1
> > > > > and then a new record that adds V2. However, since caching is
> > disabled
> > > > you
> > > > > should get
> > > > >
> > > > > *Key1,V1*
> > > > > *Key1,null*
> > > > > *Key1,V2*
> > > > >
> > > > > instead; without the actual code snippet I cannot tell more what's
> > > > > happening here. If you can look into the logs you can record each
> > time
> > > > when
> > > > > partition migrates, how many records from the changelog was
> replayed
> > to
> > > > > restore the store, and from which offset on the input topic does
> > > Streams
> > > > > resume processing. You may also consider upgrading to 2.6.x or
> higher
> > > > > version and see if this issue goes away.
> > > > >
> > > > >
> > > > > Guozhang
> > > > >
> > > > > On Tue, Apr 6, 2021 at 8:38 AM mangat rai <mangatm...@gmail.com>
> > > wrote:
> > > > >
> > > > > > Hey,
> > > > > >
> > > > > > We have the following setup in our infrastructure.
> > > > > >
> > > > > >    1. Kafka - 2.5.1
> > > > > >    2. Apps use kafka streams `org.apache.kafka` version 2.5.1
> > library
> > > > > >    3. Low level processor API is used with *atleast-once*
> semantics
> > > > > >    4. State stores are *in-memory* with *caching disabled* and
> > > > *changelog
> > > > > >    enabled*
> > > > > >
> > > > > >
> > > > > > Is it possible that during state replication and partition
> > > > reassignment,
> > > > > > the input data is not always applied to the state store?
> > > > > >
> > > > > > 1. Let's say the input topic is having records like following
> > > > > >
> > > > > > ```
> > > > > > Key1, V1
> > > > > > Key1, null (tombstone)
> > > > > > Key1, V2
> > > > > > Key1, null
> > > > > > Key1, V3
> > > > > > Key1, V4
> > > > > > ```
> > > > > > 2. The app has an aggregation function which takes these record
> and
> > > > > update
> > > > > > the state store so that changelog shall be
> > > > > >
> > > > > > ```
> > > > > > Key1, V1
> > > > > > Key1, null (tombstone)
> > > > > > Key1, V2
> > > > > > Key1, null
> > > > > > Key1, V3
> > > > > > Key1, V3 + V4
> > > > > > ```
> > > > > > Let's say the partition responsible for processing the above key
> > was
> > > > > > several times reallocated to different threads due to some infra
> > > issues
> > > > > we
> > > > > > are having(in Kubernetes where we run the app, not the Kafka
> > > cluster).
> > > > > >
> > > > > > I see the following record in the changelogs
> > > > > >
> > > > > > ```
> > > > > > Key1, V1
> > > > > > Key1, null
> > > > > > Key1, V1
> > > > > > Key1, null  (processed again)
> > > > > > Key1, V2
> > > > > > Key1, null
> > > > > > Key1, V1
> > > > > > Key1,V2
> > > > > > Key1, V2+V1 (I guess we didn't process V2 tombstone yet but
> > > reprocessed
> > > > > V1
> > > > > > again due to reassignment)
> > > > > > Key1,V1 (V2 is gone as there was a tombstone, but then V1
> tombstone
> > > > > should
> > > > > > have been applied also!!)
> > > > > > Key1, V2+V1 (it is back!!!)
> > > > > > Key1,V1
> > > > > > Key1, V1 + V2 + V3 (This is the final state)!
> > > > > > ```
> > > > > >
> > > > > > If you see this means several things
> > > > > > 1. The state is always correctly applied locally (in developer
> > > laptop),
> > > > > > where there were no reassignments.
> > > > > > 2. The records are processed multiple times, which is
> > understandable
> > > as
> > > > > we
> > > > > > have at least symantics here.
> > > > > > 3. As long as we re-apply the same events in the same orders we
> are
> > > > > golden
> > > > > > but looks like some records are skipped, but here it looks as if
> we
> > > > have
> > > > > > multiple consumers reading and update the same topics, leading to
> > > race
> > > > > > conditions.
> > > > > >
> > > > > > Is there any way, Kafka streams' state replication could lead to
> > > such a
> > > > > > race condition?
> > > > > >
> > > > > > Regards,
> > > > > > Mangat
> > > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > > >
> > > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
> --
> -- Guozhang
>

Reply via email to