[ 
https://issues.apache.org/jira/browse/KAFKA-16700?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846363#comment-17846363
 ] 

Ayoub Omari commented on KAFKA-16700:
-------------------------------------

[~stoeckmk] About KIP-962, it's not about null foreign keys, it's only about 
null keys of the left topic. So null foreign keys are still behaving the same 
way as before the KIP.

 

> Kafka Streams: possible message loss on KTable-KTable FK Left Join
> ------------------------------------------------------------------
>
>                 Key: KAFKA-16700
>                 URL: https://issues.apache.org/jira/browse/KAFKA-16700
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 3.7.0
>         Environment: Apache Kafka 3.7.0 cluster in KRaft mode, 3 brokers and 
> 3 controllers, on managed Kubernetes 1.28.7, operated by Strimzi Kafka 
> Operators
>            Reporter: Karsten Stöckmann
>            Priority: Major
>              Labels: dsl, joins, streams
>
> We are experiencing significant, yet intermittent / non-deterministic / 
> unexplainable message loss on a Kafka Streams topology while performing a 
> *KTable-KTable* {*}FK Left Join{*}.
> Assume the following snippet:
> {code:java}
> streamsBuilder
>     .table(
>         folderTopicName,
>         Consumed.with(
>             folderKeySerde,
>             folderSerde))
>         .leftJoin(
>             agencies, // KTable<AgencyId, AggregateAgency>
>             Folder::agencyIdValue,
>             AggregateFolder::new,
>             TableJoined.as("folder-to-agency"),
>             Materialized
>                 .as("folder-to-agency-materialized")
>                 .withKeySerde(folderKeySerde)
>                 .withValueSerde(aggregateFolderSerde))
>         .leftJoin(
>             documents,
> {code}
> The setup is as follows:
> A Debezium Connector for PostgreSQL streams database changes into various 
> Kafka topics. A series of Quarkus Kafka Streams applications then performs 
> aggregation operations on those topics to create index documents later to be 
> sent into an OpenSearch system.
> When firing up the Kafka Streams infrastructure to work on initially 
> populated Kafka Topics (i.e. a snapshot of all relevant table data has been 
> streamed to Kafka), the above shown KTable-KTable FK Left Join seems to 
> produce message loss on the first of a series of FK Left Joins; the right 
> hand {{KTable<AgencyId, AggregateAgency>}} is consumed from an aggregated 
> topic fed from another Kafka Streams topology / application.
> On a (heavily reduced) test data set of 6828 messages in the 
> {{folderTopicName}} Topic, we observe the following results:
>  * {{{}folder-to-agency-subscription-registration{}}}: *6967* messages
>  * {{{}folder-to-agency-subscription-response{}}}: *3048* messages
>  * {{{}folder-to-agency-subscription-store-changelog{}}}: *6359* messages
>  * {{{}folder-to-agency-materialized-changelog{}}}: *4644* messages.
> Telling from the nature of a (FK) Left Join, I'd expect all messages from the 
> left hand topic should produce an aggregate even if no matching message is 
> found in the right hand topic.
> Message loss unpredictably varies across tests and seems not to be bound to 
> specific keys or messages.
> As it seems, this can only be observed when initially firing up the Streams 
> infrastructure to process the message 'backlog' that had been snapshotted by 
> Debezium. A manual snapshot triggered later (i.e. Streams applications 
> already running) seems not to show this behaviour. Additionally, as of yet we 
> observed this kind of message loss only when running multiple replicas of the 
> affected application. When carrying out the tests with only one replica, 
> everything seems to work as expected. We've tried to leverage 
> {{group.initial.rebalance.delay.ms}} in order to rule out possible 
> rebalancing issues, but to no avail.
> Our Kafka configuration:
> {code:yaml}
>     offsets.topic.replication.factor: 3
>     transaction.state.log.replication.factor: 3
>     transaction.state.log.min.isr: 2
>     default.replication.factor: 3
>     min.insync.replicas: 2
>     message.max.bytes: "20971520"
> {code}
> Our Kafka Streams application configuration:
> {code:yaml}
>     kafka-streams.num.stream.threads: 5
>     kafka-streams.num.standby.replicas: 1
>     kafka-streams.auto.offset.reset: earliest
>     kafka-streams.cache.max.bytes.buffering: "20971520"
>     kafka-streams.commit.interval.ms: 100
>     kafka-streams.fetch.max.bytes: "10485760"
>     kafka-streams.max.request.size: "10485760"
>     kafka-streams.max.partition.fetch.bytes: "10485760"
>     kafka-streams.metadata.max.age.ms: 300000
>     kafka-streams.statestore.cache.max.bytes: "20971520"
>     kafka-streams.topology.optimization: all
>     kafka-streams.processing.guarantee: exactly_once_v2
>     # Kafka Streams Intermediate Topics
>     kafka-streams.topic.compression.type: lz4
>     kafka-streams.topic.segment.ms: "43200000" # 12h
>     kafka-streams.topic.max.compaction.lag.ms: "86400000" # 24h
>     kafka-streams.topic.delete.retention.ms: "86400000" # 24h
>     kafka-streams.producer.max.request.size: "20971520" # 20MiB
>     kafka-streams.producer.transaction.timeout.ms: 100 # Should match 
> commit.interval.ms, set close to 100ms for exactly_once_v2
>     kafka-streams.consumer.group.instance.id: ${HOSTNAME}
>     kafka-streams.consumer.heartbeat.interval.ms: 100
>     kafka-streams.consumer.session.timeout.ms: 45000
> {code}
> All input (and aggregate) topics feature 15 partitions and share this 
> configuration:
> {code:yaml}
> cleanup.policy: compact          
> compression.type: lz4
> segment.ms: "43200000" # 12h
> max.compaction.lag.ms: "86400000" # 24h
> delete.retention.ms: "86400000" # 24h
> {code}
> Logs show no indication of where or why this happens.
> The issue was discussed on the Kafka [mailing 
> list|https://lists.apache.org/thread/l50pwmwhobt73db97n0r5v36mydo15rs] as 
> well as on 
> [StackOverflow|https://stackoverflow.com/questions/78210993/kafka-streams-topology-initially-dropping-messages-to-intermediate-topics],
>  but both threads lead to no further explanation. In the end, I was suggested 
> to file a bug on the Kafka JIRA. I actually can't rule out that this is 
> entirely based upon some setting in our Kafka environment, but there are 
> other 
> [indications|https://stackoverflow.com/questions/75886729/missing-records-in-kafka-streams-foreign-key-join]
>  of similar message loss on FK Join operations. For the time being, I'd 
> consider this a bug, perhaps emerging only under certain conditions.
> ATM I've no test case to reproduce the issue locally.
> In case any additional information is needed, I'd be happy to provide those.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to