Hey,

We have the following setup in our infrastructure.

   1. Kafka - 2.5.1
   2. Apps use kafka streams `org.apache.kafka` version 2.5.1 library
   3. Low level processor API is used with *atleast-once* semantics
   4. State stores are *in-memory* with *caching disabled* and *changelog
   enabled*


Is it possible that during state replication and partition reassignment,
the input data is not always applied to the state store?

1. Let's say the input topic is having records like following

```
Key1, V1
Key1, null (tombstone)
Key1, V2
Key1, null
Key1, V3
Key1, V4
```
2. The app has an aggregation function which takes these record and update
the state store so that changelog shall be

```
Key1, V1
Key1, null (tombstone)
Key1, V2
Key1, null
Key1, V3
Key1, V3 + V4
```
Let's say the partition responsible for processing the above key was
several times reallocated to different threads due to some infra issues we
are having(in Kubernetes where we run the app, not the Kafka cluster).

I see the following record in the changelogs

```
Key1, V1
Key1, null
Key1, V1
Key1, null  (processed again)
Key1, V2
Key1, null
Key1, V1
Key1,V2
Key1, V2+V1 (I guess we didn't process V2 tombstone yet but reprocessed V1
again due to reassignment)
Key1,V1 (V2 is gone as there was a tombstone, but then V1 tombstone should
have been applied also!!)
Key1, V2+V1 (it is back!!!)
Key1,V1
Key1, V1 + V2 + V3 (This is the final state)!
```

If you see this means several things
1. The state is always correctly applied locally (in developer laptop),
where there were no reassignments.
2. The records are processed multiple times, which is understandable as we
have at least symantics here.
3. As long as we re-apply the same events in the same orders we are golden
but looks like some records are skipped, but here it looks as if we have
multiple consumers reading and update the same topics, leading to race
conditions.

Is there any way, Kafka streams' state replication could lead to such a
race condition?

Regards,
Mangat

Reply via email to