Hi Mangat,

back to work now. I've configured out Streams applications to use
exacly-once semantics, but to no avail. Actually, after some. more
investigation I've come to suspect that the issue is somehow related
to rebalancing.

The initially shown topology lives inside a Quarkus Kafka Streams
application that is rolled out as part of a K8s StatefulSet with three
replicas. When initially fired up, the StatefulSet controller starts
the Pods sequentially in order, each subsequently triggering a
rebalance operation. Topics then show the mentioned message loss.

As soon as I reduce the StatefulSet to only one replica, the changelog
topics as well as the eventual output topic show the expected message
count.

So - can the issue somehow be related to rebalancing?

Best wishes
Karsten

Am Do., 28. März 2024 um 08:25 Uhr schrieb Karsten Stöckmann
<karsten.stoeckm...@gmail.com>:
>
> Hi Mangat,
>
> thank you for clarification. I'll try the suggested configuration as soon as 
> I get back to work. Will keep you posted then.
>
> Best wishes
> Karsten
>
>
> mangat rai <mangatm...@gmail.com> schrieb am Mi., 27. März 2024, 11:07:
>>
>> Hey Karsten,
>>
>> You don't need to do any other configuration to enable EOS. See here -
>> https://docs.confluent.io/platform/current/streams/concepts.html#processing-guarantees
>> It mentions that the producer will be idempotent. That also mans ack=all
>> will be considered. Not that if you have any other ack from the config, it
>> will be ignored in the favour of exactly-once.
>>
>> Do let me know if that solves your problem. I am curious. if yes, then I
>> would ask you to create an issue.
>>
>> Regards,
>> Mangat
>>
>> On Wed, Mar 27, 2024 at 10:49 AM Karsten Stöckmann <
>> karsten.stoeckm...@gmail.com> wrote:
>>
>> > Hi Mangat,
>> >
>> > thanks for clarification. So to my knowledge exactly-once is configured
>> > using the 'processing.guarantee=exactly_once_v2' setting? Is the
>> > configuration setting 'acks=all' somehow related and would you advise
>> > setting that as well?
>> >
>> > Best wishes
>> > Karsten
>> >
>> >
>> > mangat rai <mangatm...@gmail.com> schrieb am Di., 26. März 2024, 15:44:
>> >
>> > > Hey Karsten,
>> > >
>> > > So if a topic has not been created yet. Streams app will keep the data in
>> > > memory, and then write it later when it is available. if your app is
>> > > restarted (or thread is killed), you may lose data but it depends if the
>> > > app will commit in the source topics. If there is no errors, then it
>> > should
>> > > be persisted eventually.
>> > >
>> > > However, overall exactly-once provides a much tighter and better commit
>> > > control. If you don't have scaling issue, I will strongly advise you to
>> > use
>> > > EOS.
>> > >
>> > > Thanks,
>> > > Mangat
>> > >
>> > >
>> > > On Tue, Mar 26, 2024 at 3:33 PM Karsten Stöckmann <
>> > > karsten.stoeckm...@gmail.com> wrote:
>> > >
>> > > > Hi Mangat,
>> > > >
>> > > > thanks for your thoughts. I had actually considered exactly-once
>> > > semantics
>> > > > already, was unsure whether it would help, and left it aside for once
>> > > then.
>> > > > I'll try that immediately when I get back to work.
>> > > >
>> > > > About snapshots and deserialization - I doubt that the issue is caused
>> > by
>> > > > deserialization failures because: when taking another (i.e. at a later
>> > > > point of time) snapshot of the exact same data, all messages fed into
>> > the
>> > > > input topic pass the pipeline as expected.
>> > > >
>> > > > Logs of both Kafka and Kafka Streams show no signs of notable issues as
>> > > far
>> > > > as I can tell, apart from these (when initially starting up,
>> > intermediate
>> > > > topics not existing yet):
>> > > >
>> > > > 2024-03-22 22:36:11,386 WARN [org.apa.kaf.cli.NetworkClient]
>> > > >
>> > > >
>> > >
>> > (kstreams-folder-aggregator-a38397c2-d30a-437e-9817-baa605d49e23-StreamThread-4)
>> > > > [Consumer
>> > > >
>> > > >
>> > >
>> > clientId=kstreams-folder-aggregator-a38397c2-d30a-437e-9817-baa605d49e23-StreamThread-4-consumer,
>> > > > groupId=kstreams-folder-aggregator] Error while fetching metadata with
>> > > > correlation id 69 :
>> > > >
>> > > >
>> > >
>> > {kstreams-folder-aggregator-folder-to-agency-subscription-response-topic=UNKNOWN_TOPIC_OR_PARTITION,
>> > > > <various other intermediate topics>}
>> > > >
>> > > > Best wishes
>> > > > Karsten
>> > > >
>> > > >
>> > > >
>> > > > mangat rai <mangatm...@gmail.com> schrieb am Di., 26. März 2024,
>> > 11:06:
>> > > >
>> > > > > Hey Karsten,
>> > > > >
>> > > > > There could be several reasons this could happen.
>> > > > > 1. Did you check the error logs? There are several reasons why the
>> > > Kafka
>> > > > > stream app may drop incoming messages. Use exactly-once semantics to
>> > > > limit
>> > > > > such cases.
>> > > > > 2. Are you sure there was no error when deserializing the records
>> > from
>> > > > > `folderTopicName`. You mentioned that it happens only when you start
>> > > > > processing and the other table snapshot works fine. This gives me a
>> > > > feeling
>> > > > > that the first records in the topic might not be deserialized
>> > properly.
>> > > > >
>> > > > > Regards,
>> > > > > Mangat
>> > > > >
>> > > > > On Tue, Mar 26, 2024 at 8:45 AM Karsten Stöckmann <
>> > > > > karsten.stoeckm...@gmail.com> wrote:
>> > > > >
>> > > > > > Hi,
>> > > > > >
>> > > > > > thanks for getting back. I'll try and illustrate the issue.
>> > > > > >
>> > > > > > I've got an input topic 'folderTopicName' fed by a database CDC
>> > > system.
>> > > > > > Messages then pass a series of FK left joins and are eventually
>> > sent
>> > > to
>> > > > > an
>> > > > > > output topic like this ('agencies' and 'documents' being KTables):
>> > > > > >
>> > > > > >
>> > > > > >             streamsBuilder //
>> > > > > >                     .table( //
>> > > > > >                             folderTopicName, //
>> > > > > >                             Consumed.with( //
>> > > > > >                                     folderKeySerde, //
>> > > > > >                                     folderSerde)) //
>> > > > > >                     .leftJoin( //
>> > > > > >                             agencies, //
>> > > > > >                             Folder::agencyIdValue, //
>> > > > > >                             AggregateFolder::new, //
>> > > > > >                             TableJoined.as("folder-to-agency"), //
>> > > > > >                             Materializer //
>> > > > > >                                     .<FolderId,
>> > > > > > AggregateFolder>named("folder-to-agency-materialized") //
>> > > > > >                                     .withKeySerde(folderKeySerde)
>> > //
>> > > > > >
>> > > > >  .withValueSerde(aggregateFolderSerde))
>> > > > > > //
>> > > > > >                     .leftJoin( //
>> > > > > >                             documents, //
>> > > > > >                     .toStream(...
>> > > > > >                     .to(...
>> > > > > >
>> > > > > > ...
>> > > > > >
>> > > > > > As far as I understand, left join sematics should be similar to
>> > those
>> > > > of
>> > > > > > relational databases, i.e. the left hand value always passes the
>> > join
>> > > > > with
>> > > > > > the right hand value set as <null> if not present. Whereas what I
>> > am
>> > > > > > observing is this: lots of messages on the input topic are even
>> > > absent
>> > > > on
>> > > > > > the first left join changelog topic
>> > > > > > ('folder-to-agency-materialized-changelog'). But: this seems to
>> > > happen
>> > > > > only
>> > > > > > in case the Streams application is fired up for the first time,
>> > i.e.
>> > > > > > intermediate topics do not yet exist. When streaming another table
>> > > > > snapshot
>> > > > > > to the input topic, things seem (!) to work as expected...
>> > > > > >
>> > > > > > Best wishes,
>> > > > > > Karsten
>> > > > > >
>> > > > > > Bruno Cadonna <cado...@apache.org> schrieb am Mo., 25. März 2024,
>> > > > 17:01:
>> > > > > >
>> > > > > > > Hi,
>> > > > > > >
>> > > > > > > That sounds worrisome!
>> > > > > > >
>> > > > > > > Could you please provide us with a minimal example that shows the
>> > > > issue
>> > > > > > > you describe?
>> > > > > > >
>> > > > > > > That would help a lot!
>> > > > > > >
>> > > > > > > Best,
>> > > > > > > Bruno
>> > > > > > >
>> > > > > > > On 3/25/24 4:07 PM, Karsten Stöckmann wrote:
>> > > > > > > > Hi,
>> > > > > > > >
>> > > > > > > > are there circumstances that might lead to messages silently
>> > > (i.e.
>> > > > > > > without
>> > > > > > > > any logged warnings or errors) disappearing from a topology?
>> > > > > > > >
>> > > > > > > > Specifically, I've got a rather simple topology doing a series
>> > of
>> > > > FK
>> > > > > > left
>> > > > > > > > joins and notice severe message loss in case the application is
>> > > > fired
>> > > > > > up
>> > > > > > > > for the first time, i.e. intermediate topics not existing yet.
>> > > I'd
>> > > > > > > > generally expect the message count on the output topic resemble
>> > > > that
>> > > > > > from
>> > > > > > > > the input topic, yet it doesn't (about half only).
>> > > > > > > >
>> > > > > > > > Any hints on this?
>> > > > > > > >
>> > > > > > > > Best wishes
>> > > > > > > > Karsten
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >

Reply via email to