Hello Jeffrey,

I’m sorry for the trouble. I appreciate your diligence in tracking this down. 
In reading your description, nothing jumps out to me as problematic. I’m a bit 
at a loss as to what may have been the problem. 


>    - Is there a realistic scenario (e.g. crash, rebalance) which you can
>    think of where the offset has been committed, but the repartition was not?
I don’t think so. We wait for acks before committing, so it should be safe. 
Maybe double-check that the producer is set to acks=all?

>    - Similarly, is there a realistic scenario (e.g. crash, rebalance) which
>    you can think of where the offset has been committed, but the changelog was
>    not?
Changelogs ought to be exactly the same mechanism as repartition topics. 

>    - When caching the output of a KTable, are the corresponding offsets
>    committed when the cache is flushed, or are they eligible to be committed
>    as soon as the record is added to the cache?
The order of operations is to flush the caches, wait for acks, and then commit. 

>    - Could it be a race-condition on the update of the state store? I.e.
>    given 2 back-to-back messages for the same key, could an aggregation
>    function handle both based on the old value?
This really shouldn’t be possible. Each task runs single-threaded.

Of course, that is all the intent of the code. There may be a bug that 
invalidates one of those responses. But I don’t think we know of other 
occasions of this happening.

It sounds like you don’t need general advice, but I feel compelled to offer it. 
The best I can think of is to keep a really close eye on the app. If you can 
catch the problem right away, you might be able to correlate it with the 
application logs, look in the topic partitions, etc. 

I hope this helps,
John

On Fri, Nov 20, 2020, at 14:39, Jeffrey Goderie wrote:
> Hi all,
> 
> We recently started using Kafka Streams and we encountered an unexpected
> issue with our Streams application. Using the following topology we ran
> into data loss:
> 
> Topologies:
>    Sub-topology: 0
>     Source: meteringpoints-source (topics:
> [serving.meteringpoints.mad-meteringpoint])
>       --> meteringpoints
>     Processor: meteringpoints (stores: [meteringpoints])
>       --> KTABLE-SELECT-0000000002
>       <-- meteringpoints-source
>     Processor: KTABLE-SELECT-0000000002 (stores: [])
>       --> KSTREAM-SINK-0000000003
>       <-- meteringpoints
>     Sink: KSTREAM-SINK-0000000003 (topic:
> meteringpointsbyzipcode-repartition)
>       <-- KTABLE-SELECT-0000000002
> 
>   Sub-topology: 1
>     Source: KSTREAM-SOURCE-0000000004 (topics:
> [meteringpointsbyzipcode-repartition])
>       --> KTABLE-AGGREGATE-0000000005
>     Processor: KTABLE-AGGREGATE-0000000005 (stores:
> [meteringpointsbyzipcode])
>       --> none
>       <-- KSTREAM-SOURCE-0000000004
> 
> The topology is optimized, so the 'meteringpoints' store does not have an
> additional changelog. The 'meteringpointsbyzipcode' store does have one.
> 
> During the aggregation (KTABLE-AGGREGATE-0000000005) we build up a set of
> objects that we encountered for that specific key. Upon inspection we
> noticed that some of our keys did not have all of the expected objects in
> their associated value.
> 
> Configuration-wise: our cluster consists of 3 brokers, and the topics
> (regular and internal) are replicated over all of them. We don't have EOS
> enabled, as our aggregation is idempotent. Our consumers' isolation level
> is 'read_uncommited', which we thought was irrelevant as 'at_least_once'
> delivery doesn't seem to use Kafka transactions. The amount of consumers is
> equal to the amount of partitions in each topic, so each consumer deals
> with a single partition for each topic.
> 
> Unfortunately, both the repartition topic and the changelog topic were
> wiped before we were able to investigate what caused the issue. Because of
> this we are unable to determine whether the problem originated in the
> changelog or the repartition topic. Resetting the application (and its
> offsets) caused all data to be reprocessed after which the issue was gone.
> We tried reproducing the erroneous scenario, but have not yet succeeded, up
> mostly because we don't know what events caused it in the first place.
> 
> Since we are seemingly unable to reproduce the behaviour, and didn't find
> any recent accounts of similar problems, we decided to look into the Kafka
> Streams source code to determine how things function. While insightful, it
> didn't help in determining the cause. As such, we were wondering whether
> you could aid us by providing an answer to some of the following questions?
> 
>    - Is there a realistic scenario (e.g. crash, rebalance) which you can
>    think of where the offset has been committed, but the repartition was not?
>    - Similarly, is there a realistic scenario (e.g. crash, rebalance) which
>    you can think of where the offset has been committed, but the changelog was
>    not?
>    - When caching the output of a KTable, are the corresponding offsets
>    committed when the cache is flushed, or are they eligible to be committed
>    as soon as the record is added to the cache?
>    - Could it be a race-condition on the update of the state store? I.e.
>    given 2 back-to-back messages for the same key, could an aggregation
>    function handle both based on the old value?
> 
> Kind regards,
> Jeffrey G
>

Reply via email to