Hi Alex, what you describe about failing before offsets are committed is one reason why records are processed multiple times under the at-least-once processing guarantee. That is reality of life as you stated. Kafka Streams in exactly-once mode guarantees that this duplicate state updates do not happen.
The exactly-once processing guarantee was implemented in Kafka Streams for use cases where correctness is of highest importance. Best, Bruno On Mon, Aug 19, 2019 at 9:24 PM Alex Brekken <[email protected]> wrote: > > Hi all, I have a (relatively) simple streams topology that is producing > some counts, and while testing this code I'm seeing some occasional > incorrect aggregation results. This seems to happen when a re-balance > occurs - typically due to a timeout or communication hiccup with the Kafka > broker. The topology is built with the DSL, and utilizes 2 KTables: the > first is really just a de-dup table and the second is the result of the > aggregation. So at a high level the topology consumes from a source topic, > groupsByKey() and then does a reduce() where we always return the > newValue. Then it does a groupBy() on a new key, and finally an > aggregate() call with an adder and subtractor. Because our source topic > frequently contains duplicate messages, this seemed like a good way to > handle the dupes: the subtractor gets invoked anytime we replace a value in > the "upstream" KTable and removes it from the count, then adds it back > again in the adder. > > In the happy-path scenario where we never see any exceptions or rebalances, > this whole thing works great - the counts at the end are 100% correct. But > when rebalancing is triggered we sometimes get bad counts. My theory is > that when a timeout or connectivity problem happens during that second > aggregation, the data ends up getting saved to the state store but the > offsets don't get committed and the message(s) in the repartition topic > that feed the aggregation get replayed after the stream task gets > rebalanced, causing the counts to get incorrectly incremented or > decremented. (depending on whether the message was triggering the adder or > the subtractor) I can simulate this problem (or something similar to this > problem) when debugging the application in my IDE just by pausing execution > on a breakpoint inside the aggregation's adder or subtractor method for a > few seconds. The result of the adder or subtractor gets saved to the state > store which means that when the messages in the repartition topic get > re-processed, the counts get doubled. If I enable "exactly_once" > processing, I'm unable to recreate the problem and the counts are always > accurate. > > My questions are: > > 1. Is this expected behavior? In a hostile application environment where > connectivity problems and rebalances happen frequently, is some degree of > incorrectly aggregated data just a reality of life? > > 2. Is exactly_once processing the right solution if correctness is of > highest importance? Or should I be looking at different ways of writing > the topology? > > Thanks for any advice! > > Alex
