Guozhang,

What does this mean if the changelog topic was disabled ? If thread 2 and 
thread 4 are running in two different nodes and a rebalance occurs, thread 2 
will not realize it is a zombie without the write to the changelog topic, right 
? I am trying to understand the cases under which the changelog topic can ever 
be disabled.

Thanks
Mohan


On 4/21/21, 10:22 PM, "Guozhang Wang" <wangg...@gmail.com> wrote:

    Hello Mangat,
    
    I think using persistent store that relies on in-memory stores could help
    if the threads are from the same instance.
    
    Guozhang
    
    On Tue, Apr 20, 2021 at 12:54 AM mangat rai <mangatm...@gmail.com> wrote:
    
    > Hey Guozhang,
    >
    > Thanks for creating the issue. Yes, you are right, this will happen only
    > with the consecutive rebalancing as after some time zombie thread will 
stop
    > and re-join the group and the new thread will always overwrite the state
    > with the latest data. In our poor infra setup, the rebalancing was
    > happening many times in a row.
    >
    > Now, we can't guarantee that the consecutive rebalancing will not happen
    > again (we reduced fetch-size which fixed it in many ways), will any of the
    > following work as a workaround?
    >
    > 1. Use persistent store instead of in-memory. The new thread will never 
get
    > the lock hence we will lose availability but keep things consistent.
    > 2. Use exactly-once semantics. However, we might need to redesign our 
apps.
    > It's a bigger change.
    >
    > Regards,
    > Mangat
    >
    > On Tue, Apr 20, 2021 at 6:50 AM Guozhang Wang <wangg...@gmail.com> wrote:
    >
    > > Hello Mangat,
    > >
    > > What you've encountered is a "zombie writer" issue, that is, Thread-2 
did
    > > not know there's already a new rebalance and hence its partitions have
    > been
    > > migrated out, until it tries to commit and then got notified of the
    > > illegal-generation error and realize itself is the "zombie" already. 
This
    > > case would still persist even with incremental rebalancing.
    > >
    > > I've filed https://issues.apache.org/jira/browse/KAFKA-12693  to
    > summarize
    > > the situation. Please LMK if that explanation is clear to you.
    > >
    > > On Mon, Apr 19, 2021 at 12:58 AM mangat rai <mangatm...@gmail.com>
    > wrote:
    > >
    > > > Thanks, Guozhang,
    > > >
    > > > I was knocking myself with Kafka's various consumer rebalancing
    > > algorithms
    > > > in the last 2 days. Could I generalize this problem as
    > > >
    > > >
    > > >
    > > > *Any in-memory state store backed by a changelog topic will always 
risk
    > > > having interleaved writes from two different writers during
    > rebalancing?*
    > > > In our case, CPU throttling made it worse as thread-2 didn't try to
    > > commit
    > > > for a long time. Also,
    > > >
    > > > 1. Do you think if we disable the incremental rebalancing, we will not
    > > have
    > > > this issue because If I understood correctly Thread-4 will not start
    > > > processing until the state is completely transferred from Thread-2.
    > > > 2. If yes, how can we disable it without downgrading the client?
    > > >
    > > > Since we have a very low scale and no real-time computing requirement,
    > we
    > > > will be happy to sacrifice the availability to have consistency.
    > > >
    > > > Regards,
    > > > Mangat
    > > >
    > > >
    > > >
    > > > On Sat, Apr 17, 2021 at 12:27 AM Guozhang Wang <wangg...@gmail.com>
    > > wrote:
    > > >
    > > > > Hi Mangat:
    > > > >
    > > > > I think I found the issue of your problem here.
    > > > >
    > > > > It seems thread-2's partition was assigned to thread-4 while 
thread-2
    > > was
    > > > > not aware (because it missed a rebalance, this is normal scenario);
    > in
    > > > > other words, thread2 becomes a "zombie". It would stay in that 
zombie
    > > > state
    > > > > until it tried to commit, in which it would get an error from the
    > > brokers
    > > > > and realize its zombie identity and re-joins the group.
    > > > >
    > > > > During that period of time, before the commit was issued, it would
    > > > continue
    > > > > trying to write to its local states; here are several scenarios:
    > > > >
    > > > > 1) if thread-2/4 are belonging to two different nodes then that is
    > > fine,
    > > > > since they will write to different local state stores.
    > > > > 2) if they belong to the same nodes, and
    > > > >    a) the state stores are persistent then they would have risks of
    > > > > contention; this is guarded by the state directory locks (as file
    > > locks)
    > > > in
    > > > > which case the new owner thread-4 should not be able to get on the
    > > local
    > > > > state files.
    > > > >    b) the state stores are in-memory, in which case that is fine
    > since
    > > > the
    > > > > in-memory stores are kept separate as well.
    > > > >
    > > > > In your case: 2.b), the issue is that the changelog would still be
    > > shared
    > > > > between the two --- but note that this is the same case as in case 
1)
    > > as
    > > > > well. And this means at that time the changelog is shared by two
    > > writers
    > > > > sending records interleaving. And if there’s a tombstone that was
    > > > intended
    > > > > for a record A, but when it was written interleaving and there’s
    > > another
    > > > > record B in between, that tombstone would effectively delete record
    > B.
    > > > The
    > > > > key here is that, when we replay the changelogs, we replay it
    > > completely
    > > > > following offset ordering.
    > > > >
    > > > >
    > > > >
    > > > > On Thu, Apr 15, 2021 at 2:28 AM mangat rai <mangatm...@gmail.com>
    > > wrote:
    > > > >
    > > > > > Guozhang,
    > > > > >
    > > > > > Yes, you are correct. We have our own group processor. I have more
    > > > > > information now.
    > > > > >
    > > > > > 1. I added ThreadId in the data when the app persists into the
    > > > changelog
    > > > > > topic.
    > > > > > 2. Thread-2 which was working with partition-0 had a timeout 
issue.
    > > > > > 4. Thread-4 picked up this partition-0 as I can see its Id in the
    > > > > > changelog.
    > > > > > 5. *But then Thread-2 and Thread-4 both were writing into the
    > > > partition-0
    > > > > > of the changelog, that too for the same key.*
    > > > > >
    > > > > > So I was clearly able to see that two threads were overwriting 
data
    > > of
    > > > > one
    > > > > > another into the state store leading to a corrupted state. This
    > > > confirms
    > > > > my
    > > > > > theory that it was an issue of concurrent update. This was
    > something
    > > > > > totally unexpected. I suspect that Thread-2 continues to persist
    > its
    > > > > > in-memory state, maybe because It wasn't stopped after the timeout
    > > > > > exception. Is there a configuration possible in the Kafka stream
    > > which
    > > > > > could lead to this?
    > > > > >
    > > > > > There was no network issue, our CPU was highly throttled by
    > > Kubernetes.
    > > > > We
    > > > > > gave more resources, also decreased the fetch-size so we have more
    > > I/O
    > > > to
    > > > > > Cpu time ratio than before, and then there was no timeout issue,
    > > hence
    > > > no
    > > > > > reassignment and hence no corrupted state.
    > > > > >
    > > > > > I really appreciate your help here...
    > > > > > Thanks!
    > > > > > Mangat
    > > > > >
    > > > > >
    > > > > > On Wed, Apr 14, 2021 at 8:48 PM Guozhang Wang <wangg...@gmail.com>
    > > > > wrote:
    > > > > >
    > > > > > > Hey Mangat,
    > > > > > >
    > > > > > > A. With at least once, Streams does not make sure atomicity of
    > 1) /
    > > > 2);
    > > > > > > with exactly once, atomicity is indeed guaranteed with
    > > transactional
    > > > > > > messaging.
    > > > > > >
    > > > > > > B. If you are using processor API, then I'm assuming you did 
your
    > > own
    > > > > > > group-by processor right? In that case, the partition key would
    > > just
    > > > be
    > > > > > the
    > > > > > > record key when you are sending to the repartition topic.
    > > > > > >
    > > > > > >
    > > > > > > Guozhang
    > > > > > >
    > > > > > >
    > > > > > >
    > > > > > >
    > > > > > > On Thu, Apr 8, 2021 at 9:00 AM mangat rai <mangatm...@gmail.com>
    > > > > wrote:
    > > > > > >
    > > > > > > > Thanks again, that makes things clear. I still have some
    > > questions
    > > > > here
    > > > > > > > then.
    > > > > > > >
    > > > > > > > A.  For each record we read, we do two updates
    > > > > > > >       1. Changelog topic of the state store.
    > > > > > > >       2. Output topic aka sink.
    > > > > > > >       Does the Kafka stream app make sure that either both are
    > > > > > committed
    > > > > > > or
    > > > > > > > neither?
    > > > > > > >
    > > > > > > > B.  Out Input topic actually has the as (a,b,c), but we
    > partition
    > > > > with
    > > > > > > only
    > > > > > > > (a). We do this because we have different compaction
    > requirements
    > > > > than
    > > > > > > the
    > > > > > > > partitions. It will still work as all (a,b,c) records will go
    > to
    > > > the
    > > > > > same
    > > > > > > > partition. Now in aggregation, we group by (a,b,c). In such
    > case
    > > > what
    > > > > > > will
    > > > > > > > be the partition key for the changelog topic?
    > > > > > > >
    > > > > > > > Note that we use low-level processor API and don't commit
    > > > ourselves.
    > > > > > > >
    > > > > > > > Regards,
    > > > > > > > Mangat
    > > > > > > >
    > > > > > > >
    > > > > > > >
    > > > > > > >
    > > > > > > > On Thu, Apr 8, 2021 at 5:37 PM Guozhang Wang <
    > wangg...@gmail.com
    > > >
    > > > > > wrote:
    > > > > > > >
    > > > > > > > > Hi Mangat,
    > > > > > > > >
    > > > > > > > > Please see my replies inline below.
    > > > > > > > >
    > > > > > > > > On Thu, Apr 8, 2021 at 5:34 AM mangat rai <
    > > mangatm...@gmail.com>
    > > > > > > wrote:
    > > > > > > > >
    > > > > > > > > > @Guozhang Wang
    > > > > > > > > >
    > > > > > > > > > Thanks for the reply.  Indeed I am finding it difficult to
    > > > > explain
    > > > > > > this
    > > > > > > > > > state. I checked the code many times. There can be a bug
    > but
    > > I
    > > > > fail
    > > > > > > to
    > > > > > > > > see
    > > > > > > > > > it. There are several things about the Kafka streams that 
I
    > > > don't
    > > > > > > > > > understand, which makes it harder for me to reason.
    > > > > > > > > >
    > > > > > > > > > 1. What is the partition key for the changelog topics? Is
    > it
    > > > the
    > > > > > same
    > > > > > > > as
    > > > > > > > > > the Input key or the state store key? Or maybe the thread
    > > > > specifies
    > > > > > > the
    > > > > > > > > > partition as it knows the input partition it is subscribed
    > > to?
    > > > If
    > > > > > the
    > > > > > > > > input
    > > > > > > > > > topic and state store are differently partitioned then we
    > can
    > > > > > explain
    > > > > > > > the
    > > > > > > > > > issue here.
    > > > > > > > > >
    > > > > > > > >
    > > > > > > > > In Kafka Stream's changelog, the "partition key" of Kafka
    > > > messages
    > > > > is
    > > > > > > the
    > > > > > > > > same as the "message key" itself. And the message key is the
    > > same
    > > > > as
    > > > > > > the
    > > > > > > > > state store key.
    > > > > > > > >
    > > > > > > > > Since the state store here should be storing the running
    > > > aggregate,
    > > > > > it
    > > > > > > > > means that the partition key is the same as the aggregated
    > key.
    > > > > > > > >
    > > > > > > > > If you are doing a group-by aggregation here, where the
    > > group-by
    > > > > keys
    > > > > > > are
    > > > > > > > > different from the source input topic's keys, hence the 
state
    > > > store
    > > > > > > keys
    > > > > > > > > would be different with the input topic keys.
    > > > > > > > >
    > > > > > > > >
    > > > > > > > > > 2. Is there a background thread to persist in the state
    > store
    > > > > when
    > > > > > > > > caching
    > > > > > > > > > is disabled? When will the app commit the log for the 
input
    > > > > topic?
    > > > > > Is
    > > > > > > > it
    > > > > > > > > > when sink writes into the output topic or when the state
    > > store
    > > > > > writes
    > > > > > > > > into
    > > > > > > > > > the changelog topic? Because, if the app commits the 
record
    > > > > before
    > > > > > > the
    > > > > > > > > data
    > > > > > > > > > was written to changelog topic then we can again explain
    > this
    > > > > state
    > > > > > > > > >
    > > > > > > > > > The commit happens *after* the local state store, as well
    > as
    > > > the
    > > > > > > > > changelog records sent by the Streams' producers, have been
    > > > > flushed.
    > > > > > > I.e.
    > > > > > > > > if there's a failure in between, you would re-process some
    > > source
    > > > > > > records
    > > > > > > > > and hence cause duplicates, but no data loss (a.k.a. the
    > > > > > at_least_once
    > > > > > > > > semantics).
    > > > > > > > >
    > > > > > > > >
    > > > > > > > >
    > > > > > > > > > >You may also consider upgrading to 2.6.x or higher 
version
    > > and
    > > > > see
    > > > > > > if
    > > > > > > > > this
    > > > > > > > > > issue goes away.
    > > > > > > > > > Do you mean the client or the Kafka broker? I will be
    > > upgrading
    > > > > the
    > > > > > > > > client
    > > > > > > > > > to 2.7.0 soon.
    > > > > > > > > >
    > > > > > > > > > I meant the client.
    > > > > > > > >
    > > > > > > > >
    > > > > > > > > > Sadly looking into the timestamp will not help much as we
    > use
    > > > > some
    > > > > > > > > business
    > > > > > > > > > time field to set the record timestamp. If I am right,
    > there
    > > is
    > > > > no
    > > > > > > way
    > > > > > > > > now
    > > > > > > > > > to know that when a Producer wrote a record in a Kafka
    > topic.
    > > > > > > > > >
    > > > > > > > > > Regards,
    > > > > > > > > > Mangat
    > > > > > > > > >
    > > > > > > > > >
    > > > > > > > > >
    > > > > > > > > > On Wed, Apr 7, 2021 at 6:22 PM Guozhang Wang <
    > > > wangg...@gmail.com
    > > > > >
    > > > > > > > wrote:
    > > > > > > > > >
    > > > > > > > > > > Hello Mangat,
    > > > > > > > > > >
    > > > > > > > > > > With at least once, although some records maybe 
processed
    > > > > > multiple
    > > > > > > > > times
    > > > > > > > > > > their process ordering should not be violated, so what
    > you
    > > > > > observed
    > > > > > > > is
    > > > > > > > > > not
    > > > > > > > > > > expected. What caught my eyes are this section in your
    > > output
    > > > > > > > > changelogs
    > > > > > > > > > > (high-lighted):
    > > > > > > > > > >
    > > > > > > > > > > Key1, V1
    > > > > > > > > > > Key1, null
    > > > > > > > > > > Key1, V1
    > > > > > > > > > > Key1, null  (processed again)
    > > > > > > > > > > Key1, V2
    > > > > > > > > > > Key1, null
    > > > > > > > > > >
    > > > > > > > > > > *Key1, V1Key1,V2*
    > > > > > > > > > > Key1, V2+V1 (I guess we didn't process V2 tombstone yet
    > but
    > > > > > > > reprocessed
    > > > > > > > > > V1
    > > > > > > > > > > again due to reassignment)
    > > > > > > > > > >
    > > > > > > > > > > They seem to be the result of first receiving a 
tombstone
    > > > which
    > > > > > > > removes
    > > > > > > > > > V1
    > > > > > > > > > > and then a new record that adds V2. However, since
    > caching
    > > is
    > > > > > > > disabled
    > > > > > > > > > you
    > > > > > > > > > > should get
    > > > > > > > > > >
    > > > > > > > > > > *Key1,V1*
    > > > > > > > > > > *Key1,null*
    > > > > > > > > > > *Key1,V2*
    > > > > > > > > > >
    > > > > > > > > > > instead; without the actual code snippet I cannot tell
    > more
    > > > > > what's
    > > > > > > > > > > happening here. If you can look into the logs you can
    > > record
    > > > > each
    > > > > > > > time
    > > > > > > > > > when
    > > > > > > > > > > partition migrates, how many records from the changelog
    > was
    > > > > > > replayed
    > > > > > > > to
    > > > > > > > > > > restore the store, and from which offset on the input
    > topic
    > > > > does
    > > > > > > > > Streams
    > > > > > > > > > > resume processing. You may also consider upgrading to
    > 2.6.x
    > > > or
    > > > > > > higher
    > > > > > > > > > > version and see if this issue goes away.
    > > > > > > > > > >
    > > > > > > > > > >
    > > > > > > > > > > Guozhang
    > > > > > > > > > >
    > > > > > > > > > > On Tue, Apr 6, 2021 at 8:38 AM mangat rai <
    > > > > mangatm...@gmail.com>
    > > > > > > > > wrote:
    > > > > > > > > > >
    > > > > > > > > > > > Hey,
    > > > > > > > > > > >
    > > > > > > > > > > > We have the following setup in our infrastructure.
    > > > > > > > > > > >
    > > > > > > > > > > >    1. Kafka - 2.5.1
    > > > > > > > > > > >    2. Apps use kafka streams `org.apache.kafka` 
version
    > > > 2.5.1
    > > > > > > > library
    > > > > > > > > > > >    3. Low level processor API is used with
    > *atleast-once*
    > > > > > > semantics
    > > > > > > > > > > >    4. State stores are *in-memory* with *caching
    > > disabled*
    > > > > and
    > > > > > > > > > *changelog
    > > > > > > > > > > >    enabled*
    > > > > > > > > > > >
    > > > > > > > > > > >
    > > > > > > > > > > > Is it possible that during state replication and
    > > partition
    > > > > > > > > > reassignment,
    > > > > > > > > > > > the input data is not always applied to the state
    > store?
    > > > > > > > > > > >
    > > > > > > > > > > > 1. Let's say the input topic is having records like
    > > > following
    > > > > > > > > > > >
    > > > > > > > > > > > ```
    > > > > > > > > > > > Key1, V1
    > > > > > > > > > > > Key1, null (tombstone)
    > > > > > > > > > > > Key1, V2
    > > > > > > > > > > > Key1, null
    > > > > > > > > > > > Key1, V3
    > > > > > > > > > > > Key1, V4
    > > > > > > > > > > > ```
    > > > > > > > > > > > 2. The app has an aggregation function which takes
    > these
    > > > > record
    > > > > > > and
    > > > > > > > > > > update
    > > > > > > > > > > > the state store so that changelog shall be
    > > > > > > > > > > >
    > > > > > > > > > > > ```
    > > > > > > > > > > > Key1, V1
    > > > > > > > > > > > Key1, null (tombstone)
    > > > > > > > > > > > Key1, V2
    > > > > > > > > > > > Key1, null
    > > > > > > > > > > > Key1, V3
    > > > > > > > > > > > Key1, V3 + V4
    > > > > > > > > > > > ```
    > > > > > > > > > > > Let's say the partition responsible for processing the
    > > > above
    > > > > > key
    > > > > > > > was
    > > > > > > > > > > > several times reallocated to different threads due to
    > > some
    > > > > > infra
    > > > > > > > > issues
    > > > > > > > > > > we
    > > > > > > > > > > > are having(in Kubernetes where we run the app, not the
    > > > Kafka
    > > > > > > > > cluster).
    > > > > > > > > > > >
    > > > > > > > > > > > I see the following record in the changelogs
    > > > > > > > > > > >
    > > > > > > > > > > > ```
    > > > > > > > > > > > Key1, V1
    > > > > > > > > > > > Key1, null
    > > > > > > > > > > > Key1, V1
    > > > > > > > > > > > Key1, null  (processed again)
    > > > > > > > > > > > Key1, V2
    > > > > > > > > > > > Key1, null
    > > > > > > > > > > > Key1, V1
    > > > > > > > > > > > Key1,V2
    > > > > > > > > > > > Key1, V2+V1 (I guess we didn't process V2 tombstone 
yet
    > > but
    > > > > > > > > reprocessed
    > > > > > > > > > > V1
    > > > > > > > > > > > again due to reassignment)
    > > > > > > > > > > > Key1,V1 (V2 is gone as there was a tombstone, but then
    > V1
    > > > > > > tombstone
    > > > > > > > > > > should
    > > > > > > > > > > > have been applied also!!)
    > > > > > > > > > > > Key1, V2+V1 (it is back!!!)
    > > > > > > > > > > > Key1,V1
    > > > > > > > > > > > Key1, V1 + V2 + V3 (This is the final state)!
    > > > > > > > > > > > ```
    > > > > > > > > > > >
    > > > > > > > > > > > If you see this means several things
    > > > > > > > > > > > 1. The state is always correctly applied locally (in
    > > > > developer
    > > > > > > > > laptop),
    > > > > > > > > > > > where there were no reassignments.
    > > > > > > > > > > > 2. The records are processed multiple times, which is
    > > > > > > > understandable
    > > > > > > > > as
    > > > > > > > > > > we
    > > > > > > > > > > > have at least symantics here.
    > > > > > > > > > > > 3. As long as we re-apply the same events in the same
    > > > orders
    > > > > we
    > > > > > > are
    > > > > > > > > > > golden
    > > > > > > > > > > > but looks like some records are skipped, but here it
    > > looks
    > > > as
    > > > > > if
    > > > > > > we
    > > > > > > > > > have
    > > > > > > > > > > > multiple consumers reading and update the same topics,
    > > > > leading
    > > > > > to
    > > > > > > > > race
    > > > > > > > > > > > conditions.
    > > > > > > > > > > >
    > > > > > > > > > > > Is there any way, Kafka streams' state replication
    > could
    > > > lead
    > > > > > to
    > > > > > > > > such a
    > > > > > > > > > > > race condition?
    > > > > > > > > > > >
    > > > > > > > > > > > Regards,
    > > > > > > > > > > > Mangat
    > > > > > > > > > > >
    > > > > > > > > > >
    > > > > > > > > > >
    > > > > > > > > > > --
    > > > > > > > > > > -- Guozhang
    > > > > > > > > > >
    > > > > > > > > >
    > > > > > > > >
    > > > > > > > >
    > > > > > > > > --
    > > > > > > > > -- Guozhang
    > > > > > > > >
    > > > > > > >
    > > > > > >
    > > > > > >
    > > > > > > --
    > > > > > > -- Guozhang
    > > > > > >
    > > > > >
    > > > >
    > > > >
    > > > > --
    > > > > -- Guozhang
    > > > >
    > > >
    > >
    > >
    > > --
    > > -- Guozhang
    > >
    >
    
    
    -- 
    -- Guozhang
    

Reply via email to