Hey Mangat,

A. With at least once, Streams does not make sure atomicity of 1) / 2);
with exactly once, atomicity is indeed guaranteed with transactional
messaging.

B. If you are using processor API, then I'm assuming you did your own
group-by processor right? In that case, the partition key would just be the
record key when you are sending to the repartition topic.


Guozhang




On Thu, Apr 8, 2021 at 9:00 AM mangat rai <mangatm...@gmail.com> wrote:

> Thanks again, that makes things clear. I still have some questions here
> then.
>
> A.  For each record we read, we do two updates
>       1. Changelog topic of the state store.
>       2. Output topic aka sink.
>       Does the Kafka stream app make sure that either both are committed or
> neither?
>
> B.  Out Input topic actually has the as (a,b,c), but we partition with only
> (a). We do this because we have different compaction requirements than the
> partitions. It will still work as all (a,b,c) records will go to the same
> partition. Now in aggregation, we group by (a,b,c). In such case what will
> be the partition key for the changelog topic?
>
> Note that we use low-level processor API and don't commit ourselves.
>
> Regards,
> Mangat
>
>
>
>
> On Thu, Apr 8, 2021 at 5:37 PM Guozhang Wang <wangg...@gmail.com> wrote:
>
> > Hi Mangat,
> >
> > Please see my replies inline below.
> >
> > On Thu, Apr 8, 2021 at 5:34 AM mangat rai <mangatm...@gmail.com> wrote:
> >
> > > @Guozhang Wang
> > >
> > > Thanks for the reply.  Indeed I am finding it difficult to explain this
> > > state. I checked the code many times. There can be a bug but I fail to
> > see
> > > it. There are several things about the Kafka streams that I don't
> > > understand, which makes it harder for me to reason.
> > >
> > > 1. What is the partition key for the changelog topics? Is it the same
> as
> > > the Input key or the state store key? Or maybe the thread specifies the
> > > partition as it knows the input partition it is subscribed to? If the
> > input
> > > topic and state store are differently partitioned then we can explain
> the
> > > issue here.
> > >
> >
> > In Kafka Stream's changelog, the "partition key" of Kafka messages is the
> > same as the "message key" itself. And the message key is the same as the
> > state store key.
> >
> > Since the state store here should be storing the running aggregate, it
> > means that the partition key is the same as the aggregated key.
> >
> > If you are doing a group-by aggregation here, where the group-by keys are
> > different from the source input topic's keys, hence the state store keys
> > would be different with the input topic keys.
> >
> >
> > > 2. Is there a background thread to persist in the state store when
> > caching
> > > is disabled? When will the app commit the log for the input topic? Is
> it
> > > when sink writes into the output topic or when the state store writes
> > into
> > > the changelog topic? Because, if the app commits the record before the
> > data
> > > was written to changelog topic then we can again explain this state
> > >
> > > The commit happens *after* the local state store, as well as the
> > changelog records sent by the Streams' producers, have been flushed. I.e.
> > if there's a failure in between, you would re-process some source records
> > and hence cause duplicates, but no data loss (a.k.a. the at_least_once
> > semantics).
> >
> >
> >
> > > >You may also consider upgrading to 2.6.x or higher version and see if
> > this
> > > issue goes away.
> > > Do you mean the client or the Kafka broker? I will be upgrading the
> > client
> > > to 2.7.0 soon.
> > >
> > > I meant the client.
> >
> >
> > > Sadly looking into the timestamp will not help much as we use some
> > business
> > > time field to set the record timestamp. If I am right, there is no way
> > now
> > > to know that when a Producer wrote a record in a Kafka topic.
> > >
> > > Regards,
> > > Mangat
> > >
> > >
> > >
> > > On Wed, Apr 7, 2021 at 6:22 PM Guozhang Wang <wangg...@gmail.com>
> wrote:
> > >
> > > > Hello Mangat,
> > > >
> > > > With at least once, although some records maybe processed multiple
> > times
> > > > their process ordering should not be violated, so what you observed
> is
> > > not
> > > > expected. What caught my eyes are this section in your output
> > changelogs
> > > > (high-lighted):
> > > >
> > > > Key1, V1
> > > > Key1, null
> > > > Key1, V1
> > > > Key1, null  (processed again)
> > > > Key1, V2
> > > > Key1, null
> > > >
> > > > *Key1, V1Key1,V2*
> > > > Key1, V2+V1 (I guess we didn't process V2 tombstone yet but
> reprocessed
> > > V1
> > > > again due to reassignment)
> > > >
> > > > They seem to be the result of first receiving a tombstone which
> removes
> > > V1
> > > > and then a new record that adds V2. However, since caching is
> disabled
> > > you
> > > > should get
> > > >
> > > > *Key1,V1*
> > > > *Key1,null*
> > > > *Key1,V2*
> > > >
> > > > instead; without the actual code snippet I cannot tell more what's
> > > > happening here. If you can look into the logs you can record each
> time
> > > when
> > > > partition migrates, how many records from the changelog was replayed
> to
> > > > restore the store, and from which offset on the input topic does
> > Streams
> > > > resume processing. You may also consider upgrading to 2.6.x or higher
> > > > version and see if this issue goes away.
> > > >
> > > >
> > > > Guozhang
> > > >
> > > > On Tue, Apr 6, 2021 at 8:38 AM mangat rai <mangatm...@gmail.com>
> > wrote:
> > > >
> > > > > Hey,
> > > > >
> > > > > We have the following setup in our infrastructure.
> > > > >
> > > > >    1. Kafka - 2.5.1
> > > > >    2. Apps use kafka streams `org.apache.kafka` version 2.5.1
> library
> > > > >    3. Low level processor API is used with *atleast-once* semantics
> > > > >    4. State stores are *in-memory* with *caching disabled* and
> > > *changelog
> > > > >    enabled*
> > > > >
> > > > >
> > > > > Is it possible that during state replication and partition
> > > reassignment,
> > > > > the input data is not always applied to the state store?
> > > > >
> > > > > 1. Let's say the input topic is having records like following
> > > > >
> > > > > ```
> > > > > Key1, V1
> > > > > Key1, null (tombstone)
> > > > > Key1, V2
> > > > > Key1, null
> > > > > Key1, V3
> > > > > Key1, V4
> > > > > ```
> > > > > 2. The app has an aggregation function which takes these record and
> > > > update
> > > > > the state store so that changelog shall be
> > > > >
> > > > > ```
> > > > > Key1, V1
> > > > > Key1, null (tombstone)
> > > > > Key1, V2
> > > > > Key1, null
> > > > > Key1, V3
> > > > > Key1, V3 + V4
> > > > > ```
> > > > > Let's say the partition responsible for processing the above key
> was
> > > > > several times reallocated to different threads due to some infra
> > issues
> > > > we
> > > > > are having(in Kubernetes where we run the app, not the Kafka
> > cluster).
> > > > >
> > > > > I see the following record in the changelogs
> > > > >
> > > > > ```
> > > > > Key1, V1
> > > > > Key1, null
> > > > > Key1, V1
> > > > > Key1, null  (processed again)
> > > > > Key1, V2
> > > > > Key1, null
> > > > > Key1, V1
> > > > > Key1,V2
> > > > > Key1, V2+V1 (I guess we didn't process V2 tombstone yet but
> > reprocessed
> > > > V1
> > > > > again due to reassignment)
> > > > > Key1,V1 (V2 is gone as there was a tombstone, but then V1 tombstone
> > > > should
> > > > > have been applied also!!)
> > > > > Key1, V2+V1 (it is back!!!)
> > > > > Key1,V1
> > > > > Key1, V1 + V2 + V3 (This is the final state)!
> > > > > ```
> > > > >
> > > > > If you see this means several things
> > > > > 1. The state is always correctly applied locally (in developer
> > laptop),
> > > > > where there were no reassignments.
> > > > > 2. The records are processed multiple times, which is
> understandable
> > as
> > > > we
> > > > > have at least symantics here.
> > > > > 3. As long as we re-apply the same events in the same orders we are
> > > > golden
> > > > > but looks like some records are skipped, but here it looks as if we
> > > have
> > > > > multiple consumers reading and update the same topics, leading to
> > race
> > > > > conditions.
> > > > >
> > > > > Is there any way, Kafka streams' state replication could lead to
> > such a
> > > > > race condition?
> > > > >
> > > > > Regards,
> > > > > Mangat
> > > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
> >
> > --
> > -- Guozhang
> >
>


-- 
-- Guozhang

Reply via email to