Hi Mangat, back to work now. I've configured out Streams applications to use exacly-once semantics, but to no avail. Actually, after some. more investigation I've come to suspect that the issue is somehow related to rebalancing.
The initially shown topology lives inside a Quarkus Kafka Streams application that is rolled out as part of a K8s StatefulSet with three replicas. When initially fired up, the StatefulSet controller starts the Pods sequentially in order, each subsequently triggering a rebalance operation. Topics then show the mentioned message loss. As soon as I reduce the StatefulSet to only one replica, the changelog topics as well as the eventual output topic show the expected message count. So - can the issue somehow be related to rebalancing? Best wishes Karsten Am Do., 28. März 2024 um 08:25 Uhr schrieb Karsten Stöckmann <karsten.stoeckm...@gmail.com>: > > Hi Mangat, > > thank you for clarification. I'll try the suggested configuration as soon as > I get back to work. Will keep you posted then. > > Best wishes > Karsten > > > mangat rai <mangatm...@gmail.com> schrieb am Mi., 27. März 2024, 11:07: >> >> Hey Karsten, >> >> You don't need to do any other configuration to enable EOS. See here - >> https://docs.confluent.io/platform/current/streams/concepts.html#processing-guarantees >> It mentions that the producer will be idempotent. That also mans ack=all >> will be considered. Not that if you have any other ack from the config, it >> will be ignored in the favour of exactly-once. >> >> Do let me know if that solves your problem. I am curious. if yes, then I >> would ask you to create an issue. >> >> Regards, >> Mangat >> >> On Wed, Mar 27, 2024 at 10:49 AM Karsten Stöckmann < >> karsten.stoeckm...@gmail.com> wrote: >> >> > Hi Mangat, >> > >> > thanks for clarification. So to my knowledge exactly-once is configured >> > using the 'processing.guarantee=exactly_once_v2' setting? Is the >> > configuration setting 'acks=all' somehow related and would you advise >> > setting that as well? >> > >> > Best wishes >> > Karsten >> > >> > >> > mangat rai <mangatm...@gmail.com> schrieb am Di., 26. März 2024, 15:44: >> > >> > > Hey Karsten, >> > > >> > > So if a topic has not been created yet. Streams app will keep the data in >> > > memory, and then write it later when it is available. if your app is >> > > restarted (or thread is killed), you may lose data but it depends if the >> > > app will commit in the source topics. If there is no errors, then it >> > should >> > > be persisted eventually. >> > > >> > > However, overall exactly-once provides a much tighter and better commit >> > > control. If you don't have scaling issue, I will strongly advise you to >> > use >> > > EOS. >> > > >> > > Thanks, >> > > Mangat >> > > >> > > >> > > On Tue, Mar 26, 2024 at 3:33 PM Karsten Stöckmann < >> > > karsten.stoeckm...@gmail.com> wrote: >> > > >> > > > Hi Mangat, >> > > > >> > > > thanks for your thoughts. I had actually considered exactly-once >> > > semantics >> > > > already, was unsure whether it would help, and left it aside for once >> > > then. >> > > > I'll try that immediately when I get back to work. >> > > > >> > > > About snapshots and deserialization - I doubt that the issue is caused >> > by >> > > > deserialization failures because: when taking another (i.e. at a later >> > > > point of time) snapshot of the exact same data, all messages fed into >> > the >> > > > input topic pass the pipeline as expected. >> > > > >> > > > Logs of both Kafka and Kafka Streams show no signs of notable issues as >> > > far >> > > > as I can tell, apart from these (when initially starting up, >> > intermediate >> > > > topics not existing yet): >> > > > >> > > > 2024-03-22 22:36:11,386 WARN [org.apa.kaf.cli.NetworkClient] >> > > > >> > > > >> > > >> > (kstreams-folder-aggregator-a38397c2-d30a-437e-9817-baa605d49e23-StreamThread-4) >> > > > [Consumer >> > > > >> > > > >> > > >> > clientId=kstreams-folder-aggregator-a38397c2-d30a-437e-9817-baa605d49e23-StreamThread-4-consumer, >> > > > groupId=kstreams-folder-aggregator] Error while fetching metadata with >> > > > correlation id 69 : >> > > > >> > > > >> > > >> > {kstreams-folder-aggregator-folder-to-agency-subscription-response-topic=UNKNOWN_TOPIC_OR_PARTITION, >> > > > <various other intermediate topics>} >> > > > >> > > > Best wishes >> > > > Karsten >> > > > >> > > > >> > > > >> > > > mangat rai <mangatm...@gmail.com> schrieb am Di., 26. März 2024, >> > 11:06: >> > > > >> > > > > Hey Karsten, >> > > > > >> > > > > There could be several reasons this could happen. >> > > > > 1. Did you check the error logs? There are several reasons why the >> > > Kafka >> > > > > stream app may drop incoming messages. Use exactly-once semantics to >> > > > limit >> > > > > such cases. >> > > > > 2. Are you sure there was no error when deserializing the records >> > from >> > > > > `folderTopicName`. You mentioned that it happens only when you start >> > > > > processing and the other table snapshot works fine. This gives me a >> > > > feeling >> > > > > that the first records in the topic might not be deserialized >> > properly. >> > > > > >> > > > > Regards, >> > > > > Mangat >> > > > > >> > > > > On Tue, Mar 26, 2024 at 8:45 AM Karsten Stöckmann < >> > > > > karsten.stoeckm...@gmail.com> wrote: >> > > > > >> > > > > > Hi, >> > > > > > >> > > > > > thanks for getting back. I'll try and illustrate the issue. >> > > > > > >> > > > > > I've got an input topic 'folderTopicName' fed by a database CDC >> > > system. >> > > > > > Messages then pass a series of FK left joins and are eventually >> > sent >> > > to >> > > > > an >> > > > > > output topic like this ('agencies' and 'documents' being KTables): >> > > > > > >> > > > > > >> > > > > > streamsBuilder // >> > > > > > .table( // >> > > > > > folderTopicName, // >> > > > > > Consumed.with( // >> > > > > > folderKeySerde, // >> > > > > > folderSerde)) // >> > > > > > .leftJoin( // >> > > > > > agencies, // >> > > > > > Folder::agencyIdValue, // >> > > > > > AggregateFolder::new, // >> > > > > > TableJoined.as("folder-to-agency"), // >> > > > > > Materializer // >> > > > > > .<FolderId, >> > > > > > AggregateFolder>named("folder-to-agency-materialized") // >> > > > > > .withKeySerde(folderKeySerde) >> > // >> > > > > > >> > > > > .withValueSerde(aggregateFolderSerde)) >> > > > > > // >> > > > > > .leftJoin( // >> > > > > > documents, // >> > > > > > .toStream(... >> > > > > > .to(... >> > > > > > >> > > > > > ... >> > > > > > >> > > > > > As far as I understand, left join sematics should be similar to >> > those >> > > > of >> > > > > > relational databases, i.e. the left hand value always passes the >> > join >> > > > > with >> > > > > > the right hand value set as <null> if not present. Whereas what I >> > am >> > > > > > observing is this: lots of messages on the input topic are even >> > > absent >> > > > on >> > > > > > the first left join changelog topic >> > > > > > ('folder-to-agency-materialized-changelog'). But: this seems to >> > > happen >> > > > > only >> > > > > > in case the Streams application is fired up for the first time, >> > i.e. >> > > > > > intermediate topics do not yet exist. When streaming another table >> > > > > snapshot >> > > > > > to the input topic, things seem (!) to work as expected... >> > > > > > >> > > > > > Best wishes, >> > > > > > Karsten >> > > > > > >> > > > > > Bruno Cadonna <cado...@apache.org> schrieb am Mo., 25. März 2024, >> > > > 17:01: >> > > > > > >> > > > > > > Hi, >> > > > > > > >> > > > > > > That sounds worrisome! >> > > > > > > >> > > > > > > Could you please provide us with a minimal example that shows the >> > > > issue >> > > > > > > you describe? >> > > > > > > >> > > > > > > That would help a lot! >> > > > > > > >> > > > > > > Best, >> > > > > > > Bruno >> > > > > > > >> > > > > > > On 3/25/24 4:07 PM, Karsten Stöckmann wrote: >> > > > > > > > Hi, >> > > > > > > > >> > > > > > > > are there circumstances that might lead to messages silently >> > > (i.e. >> > > > > > > without >> > > > > > > > any logged warnings or errors) disappearing from a topology? >> > > > > > > > >> > > > > > > > Specifically, I've got a rather simple topology doing a series >> > of >> > > > FK >> > > > > > left >> > > > > > > > joins and notice severe message loss in case the application is >> > > > fired >> > > > > > up >> > > > > > > > for the first time, i.e. intermediate topics not existing yet. >> > > I'd >> > > > > > > > generally expect the message count on the output topic resemble >> > > > that >> > > > > > from >> > > > > > > > the input topic, yet it doesn't (about half only). >> > > > > > > > >> > > > > > > > Any hints on this? >> > > > > > > > >> > > > > > > > Best wishes >> > > > > > > > Karsten >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> >