Hello Mangat, I think using persistent store that relies on in-memory stores could help if the threads are from the same instance.
Guozhang On Tue, Apr 20, 2021 at 12:54 AM mangat rai <mangatm...@gmail.com> wrote: > Hey Guozhang, > > Thanks for creating the issue. Yes, you are right, this will happen only > with the consecutive rebalancing as after some time zombie thread will stop > and re-join the group and the new thread will always overwrite the state > with the latest data. In our poor infra setup, the rebalancing was > happening many times in a row. > > Now, we can't guarantee that the consecutive rebalancing will not happen > again (we reduced fetch-size which fixed it in many ways), will any of the > following work as a workaround? > > 1. Use persistent store instead of in-memory. The new thread will never get > the lock hence we will lose availability but keep things consistent. > 2. Use exactly-once semantics. However, we might need to redesign our apps. > It's a bigger change. > > Regards, > Mangat > > On Tue, Apr 20, 2021 at 6:50 AM Guozhang Wang <wangg...@gmail.com> wrote: > > > Hello Mangat, > > > > What you've encountered is a "zombie writer" issue, that is, Thread-2 did > > not know there's already a new rebalance and hence its partitions have > been > > migrated out, until it tries to commit and then got notified of the > > illegal-generation error and realize itself is the "zombie" already. This > > case would still persist even with incremental rebalancing. > > > > I've filed https://issues.apache.org/jira/browse/KAFKA-12693 to > summarize > > the situation. Please LMK if that explanation is clear to you. > > > > On Mon, Apr 19, 2021 at 12:58 AM mangat rai <mangatm...@gmail.com> > wrote: > > > > > Thanks, Guozhang, > > > > > > I was knocking myself with Kafka's various consumer rebalancing > > algorithms > > > in the last 2 days. Could I generalize this problem as > > > > > > > > > > > > *Any in-memory state store backed by a changelog topic will always risk > > > having interleaved writes from two different writers during > rebalancing?* > > > In our case, CPU throttling made it worse as thread-2 didn't try to > > commit > > > for a long time. Also, > > > > > > 1. Do you think if we disable the incremental rebalancing, we will not > > have > > > this issue because If I understood correctly Thread-4 will not start > > > processing until the state is completely transferred from Thread-2. > > > 2. If yes, how can we disable it without downgrading the client? > > > > > > Since we have a very low scale and no real-time computing requirement, > we > > > will be happy to sacrifice the availability to have consistency. > > > > > > Regards, > > > Mangat > > > > > > > > > > > > On Sat, Apr 17, 2021 at 12:27 AM Guozhang Wang <wangg...@gmail.com> > > wrote: > > > > > > > Hi Mangat: > > > > > > > > I think I found the issue of your problem here. > > > > > > > > It seems thread-2's partition was assigned to thread-4 while thread-2 > > was > > > > not aware (because it missed a rebalance, this is normal scenario); > in > > > > other words, thread2 becomes a "zombie". It would stay in that zombie > > > state > > > > until it tried to commit, in which it would get an error from the > > brokers > > > > and realize its zombie identity and re-joins the group. > > > > > > > > During that period of time, before the commit was issued, it would > > > continue > > > > trying to write to its local states; here are several scenarios: > > > > > > > > 1) if thread-2/4 are belonging to two different nodes then that is > > fine, > > > > since they will write to different local state stores. > > > > 2) if they belong to the same nodes, and > > > > a) the state stores are persistent then they would have risks of > > > > contention; this is guarded by the state directory locks (as file > > locks) > > > in > > > > which case the new owner thread-4 should not be able to get on the > > local > > > > state files. > > > > b) the state stores are in-memory, in which case that is fine > since > > > the > > > > in-memory stores are kept separate as well. > > > > > > > > In your case: 2.b), the issue is that the changelog would still be > > shared > > > > between the two --- but note that this is the same case as in case 1) > > as > > > > well. And this means at that time the changelog is shared by two > > writers > > > > sending records interleaving. And if there’s a tombstone that was > > > intended > > > > for a record A, but when it was written interleaving and there’s > > another > > > > record B in between, that tombstone would effectively delete record > B. > > > The > > > > key here is that, when we replay the changelogs, we replay it > > completely > > > > following offset ordering. > > > > > > > > > > > > > > > > On Thu, Apr 15, 2021 at 2:28 AM mangat rai <mangatm...@gmail.com> > > wrote: > > > > > > > > > Guozhang, > > > > > > > > > > Yes, you are correct. We have our own group processor. I have more > > > > > information now. > > > > > > > > > > 1. I added ThreadId in the data when the app persists into the > > > changelog > > > > > topic. > > > > > 2. Thread-2 which was working with partition-0 had a timeout issue. > > > > > 4. Thread-4 picked up this partition-0 as I can see its Id in the > > > > > changelog. > > > > > 5. *But then Thread-2 and Thread-4 both were writing into the > > > partition-0 > > > > > of the changelog, that too for the same key.* > > > > > > > > > > So I was clearly able to see that two threads were overwriting data > > of > > > > one > > > > > another into the state store leading to a corrupted state. This > > > confirms > > > > my > > > > > theory that it was an issue of concurrent update. This was > something > > > > > totally unexpected. I suspect that Thread-2 continues to persist > its > > > > > in-memory state, maybe because It wasn't stopped after the timeout > > > > > exception. Is there a configuration possible in the Kafka stream > > which > > > > > could lead to this? > > > > > > > > > > There was no network issue, our CPU was highly throttled by > > Kubernetes. > > > > We > > > > > gave more resources, also decreased the fetch-size so we have more > > I/O > > > to > > > > > Cpu time ratio than before, and then there was no timeout issue, > > hence > > > no > > > > > reassignment and hence no corrupted state. > > > > > > > > > > I really appreciate your help here... > > > > > Thanks! > > > > > Mangat > > > > > > > > > > > > > > > On Wed, Apr 14, 2021 at 8:48 PM Guozhang Wang <wangg...@gmail.com> > > > > wrote: > > > > > > > > > > > Hey Mangat, > > > > > > > > > > > > A. With at least once, Streams does not make sure atomicity of > 1) / > > > 2); > > > > > > with exactly once, atomicity is indeed guaranteed with > > transactional > > > > > > messaging. > > > > > > > > > > > > B. If you are using processor API, then I'm assuming you did your > > own > > > > > > group-by processor right? In that case, the partition key would > > just > > > be > > > > > the > > > > > > record key when you are sending to the repartition topic. > > > > > > > > > > > > > > > > > > Guozhang > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Thu, Apr 8, 2021 at 9:00 AM mangat rai <mangatm...@gmail.com> > > > > wrote: > > > > > > > > > > > > > Thanks again, that makes things clear. I still have some > > questions > > > > here > > > > > > > then. > > > > > > > > > > > > > > A. For each record we read, we do two updates > > > > > > > 1. Changelog topic of the state store. > > > > > > > 2. Output topic aka sink. > > > > > > > Does the Kafka stream app make sure that either both are > > > > > committed > > > > > > or > > > > > > > neither? > > > > > > > > > > > > > > B. Out Input topic actually has the as (a,b,c), but we > partition > > > > with > > > > > > only > > > > > > > (a). We do this because we have different compaction > requirements > > > > than > > > > > > the > > > > > > > partitions. It will still work as all (a,b,c) records will go > to > > > the > > > > > same > > > > > > > partition. Now in aggregation, we group by (a,b,c). In such > case > > > what > > > > > > will > > > > > > > be the partition key for the changelog topic? > > > > > > > > > > > > > > Note that we use low-level processor API and don't commit > > > ourselves. > > > > > > > > > > > > > > Regards, > > > > > > > Mangat > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Thu, Apr 8, 2021 at 5:37 PM Guozhang Wang < > wangg...@gmail.com > > > > > > > > wrote: > > > > > > > > > > > > > > > Hi Mangat, > > > > > > > > > > > > > > > > Please see my replies inline below. > > > > > > > > > > > > > > > > On Thu, Apr 8, 2021 at 5:34 AM mangat rai < > > mangatm...@gmail.com> > > > > > > wrote: > > > > > > > > > > > > > > > > > @Guozhang Wang > > > > > > > > > > > > > > > > > > Thanks for the reply. Indeed I am finding it difficult to > > > > explain > > > > > > this > > > > > > > > > state. I checked the code many times. There can be a bug > but > > I > > > > fail > > > > > > to > > > > > > > > see > > > > > > > > > it. There are several things about the Kafka streams that I > > > don't > > > > > > > > > understand, which makes it harder for me to reason. > > > > > > > > > > > > > > > > > > 1. What is the partition key for the changelog topics? Is > it > > > the > > > > > same > > > > > > > as > > > > > > > > > the Input key or the state store key? Or maybe the thread > > > > specifies > > > > > > the > > > > > > > > > partition as it knows the input partition it is subscribed > > to? > > > If > > > > > the > > > > > > > > input > > > > > > > > > topic and state store are differently partitioned then we > can > > > > > explain > > > > > > > the > > > > > > > > > issue here. > > > > > > > > > > > > > > > > > > > > > > > > > In Kafka Stream's changelog, the "partition key" of Kafka > > > messages > > > > is > > > > > > the > > > > > > > > same as the "message key" itself. And the message key is the > > same > > > > as > > > > > > the > > > > > > > > state store key. > > > > > > > > > > > > > > > > Since the state store here should be storing the running > > > aggregate, > > > > > it > > > > > > > > means that the partition key is the same as the aggregated > key. > > > > > > > > > > > > > > > > If you are doing a group-by aggregation here, where the > > group-by > > > > keys > > > > > > are > > > > > > > > different from the source input topic's keys, hence the state > > > store > > > > > > keys > > > > > > > > would be different with the input topic keys. > > > > > > > > > > > > > > > > > > > > > > > > > 2. Is there a background thread to persist in the state > store > > > > when > > > > > > > > caching > > > > > > > > > is disabled? When will the app commit the log for the input > > > > topic? > > > > > Is > > > > > > > it > > > > > > > > > when sink writes into the output topic or when the state > > store > > > > > writes > > > > > > > > into > > > > > > > > > the changelog topic? Because, if the app commits the record > > > > before > > > > > > the > > > > > > > > data > > > > > > > > > was written to changelog topic then we can again explain > this > > > > state > > > > > > > > > > > > > > > > > > The commit happens *after* the local state store, as well > as > > > the > > > > > > > > changelog records sent by the Streams' producers, have been > > > > flushed. > > > > > > I.e. > > > > > > > > if there's a failure in between, you would re-process some > > source > > > > > > records > > > > > > > > and hence cause duplicates, but no data loss (a.k.a. the > > > > > at_least_once > > > > > > > > semantics). > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >You may also consider upgrading to 2.6.x or higher version > > and > > > > see > > > > > > if > > > > > > > > this > > > > > > > > > issue goes away. > > > > > > > > > Do you mean the client or the Kafka broker? I will be > > upgrading > > > > the > > > > > > > > client > > > > > > > > > to 2.7.0 soon. > > > > > > > > > > > > > > > > > > I meant the client. > > > > > > > > > > > > > > > > > > > > > > > > > Sadly looking into the timestamp will not help much as we > use > > > > some > > > > > > > > business > > > > > > > > > time field to set the record timestamp. If I am right, > there > > is > > > > no > > > > > > way > > > > > > > > now > > > > > > > > > to know that when a Producer wrote a record in a Kafka > topic. > > > > > > > > > > > > > > > > > > Regards, > > > > > > > > > Mangat > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Apr 7, 2021 at 6:22 PM Guozhang Wang < > > > wangg...@gmail.com > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > Hello Mangat, > > > > > > > > > > > > > > > > > > > > With at least once, although some records maybe processed > > > > > multiple > > > > > > > > times > > > > > > > > > > their process ordering should not be violated, so what > you > > > > > observed > > > > > > > is > > > > > > > > > not > > > > > > > > > > expected. What caught my eyes are this section in your > > output > > > > > > > > changelogs > > > > > > > > > > (high-lighted): > > > > > > > > > > > > > > > > > > > > Key1, V1 > > > > > > > > > > Key1, null > > > > > > > > > > Key1, V1 > > > > > > > > > > Key1, null (processed again) > > > > > > > > > > Key1, V2 > > > > > > > > > > Key1, null > > > > > > > > > > > > > > > > > > > > *Key1, V1Key1,V2* > > > > > > > > > > Key1, V2+V1 (I guess we didn't process V2 tombstone yet > but > > > > > > > reprocessed > > > > > > > > > V1 > > > > > > > > > > again due to reassignment) > > > > > > > > > > > > > > > > > > > > They seem to be the result of first receiving a tombstone > > > which > > > > > > > removes > > > > > > > > > V1 > > > > > > > > > > and then a new record that adds V2. However, since > caching > > is > > > > > > > disabled > > > > > > > > > you > > > > > > > > > > should get > > > > > > > > > > > > > > > > > > > > *Key1,V1* > > > > > > > > > > *Key1,null* > > > > > > > > > > *Key1,V2* > > > > > > > > > > > > > > > > > > > > instead; without the actual code snippet I cannot tell > more > > > > > what's > > > > > > > > > > happening here. If you can look into the logs you can > > record > > > > each > > > > > > > time > > > > > > > > > when > > > > > > > > > > partition migrates, how many records from the changelog > was > > > > > > replayed > > > > > > > to > > > > > > > > > > restore the store, and from which offset on the input > topic > > > > does > > > > > > > > Streams > > > > > > > > > > resume processing. You may also consider upgrading to > 2.6.x > > > or > > > > > > higher > > > > > > > > > > version and see if this issue goes away. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Guozhang > > > > > > > > > > > > > > > > > > > > On Tue, Apr 6, 2021 at 8:38 AM mangat rai < > > > > mangatm...@gmail.com> > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > Hey, > > > > > > > > > > > > > > > > > > > > > > We have the following setup in our infrastructure. > > > > > > > > > > > > > > > > > > > > > > 1. Kafka - 2.5.1 > > > > > > > > > > > 2. Apps use kafka streams `org.apache.kafka` version > > > 2.5.1 > > > > > > > library > > > > > > > > > > > 3. Low level processor API is used with > *atleast-once* > > > > > > semantics > > > > > > > > > > > 4. State stores are *in-memory* with *caching > > disabled* > > > > and > > > > > > > > > *changelog > > > > > > > > > > > enabled* > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Is it possible that during state replication and > > partition > > > > > > > > > reassignment, > > > > > > > > > > > the input data is not always applied to the state > store? > > > > > > > > > > > > > > > > > > > > > > 1. Let's say the input topic is having records like > > > following > > > > > > > > > > > > > > > > > > > > > > ``` > > > > > > > > > > > Key1, V1 > > > > > > > > > > > Key1, null (tombstone) > > > > > > > > > > > Key1, V2 > > > > > > > > > > > Key1, null > > > > > > > > > > > Key1, V3 > > > > > > > > > > > Key1, V4 > > > > > > > > > > > ``` > > > > > > > > > > > 2. The app has an aggregation function which takes > these > > > > record > > > > > > and > > > > > > > > > > update > > > > > > > > > > > the state store so that changelog shall be > > > > > > > > > > > > > > > > > > > > > > ``` > > > > > > > > > > > Key1, V1 > > > > > > > > > > > Key1, null (tombstone) > > > > > > > > > > > Key1, V2 > > > > > > > > > > > Key1, null > > > > > > > > > > > Key1, V3 > > > > > > > > > > > Key1, V3 + V4 > > > > > > > > > > > ``` > > > > > > > > > > > Let's say the partition responsible for processing the > > > above > > > > > key > > > > > > > was > > > > > > > > > > > several times reallocated to different threads due to > > some > > > > > infra > > > > > > > > issues > > > > > > > > > > we > > > > > > > > > > > are having(in Kubernetes where we run the app, not the > > > Kafka > > > > > > > > cluster). > > > > > > > > > > > > > > > > > > > > > > I see the following record in the changelogs > > > > > > > > > > > > > > > > > > > > > > ``` > > > > > > > > > > > Key1, V1 > > > > > > > > > > > Key1, null > > > > > > > > > > > Key1, V1 > > > > > > > > > > > Key1, null (processed again) > > > > > > > > > > > Key1, V2 > > > > > > > > > > > Key1, null > > > > > > > > > > > Key1, V1 > > > > > > > > > > > Key1,V2 > > > > > > > > > > > Key1, V2+V1 (I guess we didn't process V2 tombstone yet > > but > > > > > > > > reprocessed > > > > > > > > > > V1 > > > > > > > > > > > again due to reassignment) > > > > > > > > > > > Key1,V1 (V2 is gone as there was a tombstone, but then > V1 > > > > > > tombstone > > > > > > > > > > should > > > > > > > > > > > have been applied also!!) > > > > > > > > > > > Key1, V2+V1 (it is back!!!) > > > > > > > > > > > Key1,V1 > > > > > > > > > > > Key1, V1 + V2 + V3 (This is the final state)! > > > > > > > > > > > ``` > > > > > > > > > > > > > > > > > > > > > > If you see this means several things > > > > > > > > > > > 1. The state is always correctly applied locally (in > > > > developer > > > > > > > > laptop), > > > > > > > > > > > where there were no reassignments. > > > > > > > > > > > 2. The records are processed multiple times, which is > > > > > > > understandable > > > > > > > > as > > > > > > > > > > we > > > > > > > > > > > have at least symantics here. > > > > > > > > > > > 3. As long as we re-apply the same events in the same > > > orders > > > > we > > > > > > are > > > > > > > > > > golden > > > > > > > > > > > but looks like some records are skipped, but here it > > looks > > > as > > > > > if > > > > > > we > > > > > > > > > have > > > > > > > > > > > multiple consumers reading and update the same topics, > > > > leading > > > > > to > > > > > > > > race > > > > > > > > > > > conditions. > > > > > > > > > > > > > > > > > > > > > > Is there any way, Kafka streams' state replication > could > > > lead > > > > > to > > > > > > > > such a > > > > > > > > > > > race condition? > > > > > > > > > > > > > > > > > > > > > > Regards, > > > > > > > > > > > Mangat > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > > -- Guozhang > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > -- Guozhang > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > -- Guozhang > > > > > > > > > > > > > > > > > > > > > > > -- > > > > -- Guozhang > > > > > > > > > > > > > -- > > -- Guozhang > > > -- -- Guozhang