Karsten Stöckmann created KAFKA-16700: -----------------------------------------
Summary: Kafka Streams: possible message loss on KTable-KTable FK Left Join Key: KAFKA-16700 URL: https://issues.apache.org/jira/browse/KAFKA-16700 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 3.7.0 Environment: Apache Kafka 3.7.0 cluster in KRaft mode, 3 brokers and 3 controllers, on managed Kubernetes 1.28.7, operated by Strimzi Kafka Operators Reporter: Karsten Stöckmann We are experiencing significant, yet intermittent / non-deterministic / unexplainable message loss on a Kafka Streams topology while performing a *KTable-KTable* {*}FK Left Join{*}. Assume the following snippet: {code:java} streamsBuilder .table( folderTopicName, Consumed.with( folderKeySerde, folderSerde)) .leftJoin( agencies, // KTable<AgencyId, AggregateAgency> Folder::agencyIdValue, AggregateFolder::new, TableJoined.as("folder-to-agency"), Materialized .as("folder-to-agency-materialized") .withKeySerde(folderKeySerde) .withValueSerde(aggregateFolderSerde)) .leftJoin( documents, {code} The setup is as follows: A Debezium Connector for PostgreSQL streams database changes into various Kafka topics. A series of Quarkus Kafka Streams applications then performs aggregation operations on those topics to create index documents later to be sent into an OpenSearch system. When firing up the Kafka Streams infrastructure to work on initially populated Kafka Topics (i.e. a snapshot of all relevant table data has been streamed to Kafka), the above shown KTable-KTable FK Left Join seems to produce message loss on the first of a series of FK Left Joins; the right hand {{KTable<AgencyId, AggregateAgency>}} is consumed from an aggregated topic fed from another Kafka Streams topology / application. On a (heavily reduced) test data set of 6828 messages in the {{folderTopicName}} Topic, we observe the following results: * {{{}folder-to-agency-subscription-registration{}}}: *6967* messages * {{{}folder-to-agency-subscription-response{}}}: *3048* messages * {{{}folder-to-agency-subscription-store-changelog{}}}: *6359* messages * {{{}folder-to-agency-materialized-changelog{}}}: *4644* messages. Telling from the nature of a (FK) Left Join, I'd expect all messages from the left hand topic should produce an aggregate even if no matching message is found in the right hand topic. Message loss unpredictably varies across tests and seems not to be bound to specific keys or messages. As it seems, this can only be observed when initially firing up the Streams infrastructure to process the message 'backlog' that had been snapshotted by Debezium. A manual snapshot triggered later (i.e. Streams applications already running) seems not to show this behaviour. Additionally, as of yet we observed this kind of message loss only when running multiple replicas of the affected application. When carrying out the tests with only one replica, everything seems to work as expected. We've tried to leverage {{group.initial.rebalance.delay.ms}} in order to rule out possible rebalancing issues, but to no avail. Our Kafka configuration: {code:yaml} offsets.topic.replication.factor: 3 transaction.state.log.replication.factor: 3 transaction.state.log.min.isr: 2 default.replication.factor: 3 min.insync.replicas: 2 message.max.bytes: "20971520" {code} Our Kafka Streams application configuration: {code:yaml} kafka-streams.num.stream.threads: 5 kafka-streams.num.standby.replicas: 1 kafka-streams.auto.offset.reset: earliest kafka-streams.cache.max.bytes.buffering: "20971520" kafka-streams.commit.interval.ms: 100 kafka-streams.fetch.max.bytes: "10485760" kafka-streams.max.request.size: "10485760" kafka-streams.max.partition.fetch.bytes: "10485760" kafka-streams.metadata.max.age.ms: 300000 kafka-streams.statestore.cache.max.bytes: "20971520" kafka-streams.topology.optimization: all kafka-streams.processing.guarantee: exactly_once_v2 # Kafka Streams Intermediate Topics kafka-streams.topic.compression.type: lz4 kafka-streams.topic.segment.ms: "43200000" # 12h kafka-streams.topic.max.compaction.lag.ms: "86400000" # 24h kafka-streams.topic.delete.retention.ms: "86400000" # 24h kafka-streams.producer.max.request.size: "20971520" # 20MiB kafka-streams.producer.transaction.timeout.ms: 100 # Should match commit.interval.ms, set close to 100ms for exactly_once_v2 kafka-streams.consumer.group.instance.id: ${HOSTNAME} kafka-streams.consumer.heartbeat.interval.ms: 100 kafka-streams.consumer.session.timeout.ms: 45000 {code} All input (and aggregate) topics feature 15 partitions and share this configuration: {code:yaml} cleanup.policy: compact compression.type: lz4 segment.ms: "43200000" # 12h max.compaction.lag.ms: "86400000" # 24h delete.retention.ms: "86400000" # 24h {code} Logs show no indication of where or why this happens. The issue was discussed on the Kafka [mailing list|https://lists.apache.org/thread/l50pwmwhobt73db97n0r5v36mydo15rs] as well as on [StackOverflow|https://stackoverflow.com/questions/78210993/kafka-streams-topology-initially-dropping-messages-to-intermediate-topics], but both threads lead to no further explanation. In the end, I was suggested to file a bug on the Kafka JIRA. I actually can't rule out that this is entirely based upon some setting in our Kafka environment, but there are other [indications|https://stackoverflow.com/questions/75886729/missing-records-in-kafka-streams-foreign-key-join] of similar message loss on FK Join operations. For the time being, I'd consider this a bug, perhaps emerging only under certain conditions. ATM I've no test case to reproduce the issue locally. In case any additional information is needed, I'd be happy to provide those. -- This message was sent by Atlassian Jira (v8.20.10#820010)