Karsten Stöckmann created KAFKA-16700:
-----------------------------------------
Summary: Kafka Streams: possible message loss on KTable-KTable FK
Left Join
Key: KAFKA-16700
URL: https://issues.apache.org/jira/browse/KAFKA-16700
Project: Kafka
Issue Type: Bug
Components: streams
Affects Versions: 3.7.0
Environment: Apache Kafka 3.7.0 cluster in KRaft mode, 3 brokers and 3
controllers, on managed Kubernetes 1.28.7, operated by Strimzi Kafka Operators
Reporter: Karsten Stöckmann
We are experiencing significant, yet intermittent / non-deterministic /
unexplainable message loss on a Kafka Streams topology while performing a
*KTable-KTable* {*}FK Left Join{*}.
Assume the following snippet:
{code:java}
streamsBuilder
.table(
folderTopicName,
Consumed.with(
folderKeySerde,
folderSerde))
.leftJoin(
agencies, // KTable<AgencyId, AggregateAgency>
Folder::agencyIdValue,
AggregateFolder::new,
TableJoined.as("folder-to-agency"),
Materialized
.as("folder-to-agency-materialized")
.withKeySerde(folderKeySerde)
.withValueSerde(aggregateFolderSerde))
.leftJoin(
documents,
{code}
The setup is as follows:
A Debezium Connector for PostgreSQL streams database changes into various Kafka
topics. A series of Quarkus Kafka Streams applications then performs
aggregation operations on those topics to create index documents later to be
sent into an OpenSearch system.
When firing up the Kafka Streams infrastructure to work on initially populated
Kafka Topics (i.e. a snapshot of all relevant table data has been streamed to
Kafka), the above shown KTable-KTable FK Left Join seems to produce message
loss on the first of a series of FK Left Joins; the right hand
{{KTable<AgencyId, AggregateAgency>}} is consumed from an aggregated topic fed
from another Kafka Streams topology / application.
On a (heavily reduced) test data set of 6828 messages in the
{{folderTopicName}} Topic, we observe the following results:
* {{{}folder-to-agency-subscription-registration{}}}: *6967* messages
* {{{}folder-to-agency-subscription-response{}}}: *3048* messages
* {{{}folder-to-agency-subscription-store-changelog{}}}: *6359* messages
* {{{}folder-to-agency-materialized-changelog{}}}: *4644* messages.
Telling from the nature of a (FK) Left Join, I'd expect all messages from the
left hand topic should produce an aggregate even if no matching message is
found in the right hand topic.
Message loss unpredictably varies across tests and seems not to be bound to
specific keys or messages.
As it seems, this can only be observed when initially firing up the Streams
infrastructure to process the message 'backlog' that had been snapshotted by
Debezium. A manual snapshot triggered later (i.e. Streams applications already
running) seems not to show this behaviour. Additionally, as of yet we observed
this kind of message loss only when running multiple replicas of the affected
application. When carrying out the tests with only one replica, everything
seems to work as expected. We've tried to leverage
{{group.initial.rebalance.delay.ms}} in order to rule out possible rebalancing
issues, but to no avail.
Our Kafka configuration:
{code:yaml}
offsets.topic.replication.factor: 3
transaction.state.log.replication.factor: 3
transaction.state.log.min.isr: 2
default.replication.factor: 3
min.insync.replicas: 2
message.max.bytes: "20971520"
{code}
Our Kafka Streams application configuration:
{code:yaml}
kafka-streams.num.stream.threads: 5
kafka-streams.num.standby.replicas: 1
kafka-streams.auto.offset.reset: earliest
kafka-streams.cache.max.bytes.buffering: "20971520"
kafka-streams.commit.interval.ms: 100
kafka-streams.fetch.max.bytes: "10485760"
kafka-streams.max.request.size: "10485760"
kafka-streams.max.partition.fetch.bytes: "10485760"
kafka-streams.metadata.max.age.ms: 300000
kafka-streams.statestore.cache.max.bytes: "20971520"
kafka-streams.topology.optimization: all
kafka-streams.processing.guarantee: exactly_once_v2
# Kafka Streams Intermediate Topics
kafka-streams.topic.compression.type: lz4
kafka-streams.topic.segment.ms: "43200000" # 12h
kafka-streams.topic.max.compaction.lag.ms: "86400000" # 24h
kafka-streams.topic.delete.retention.ms: "86400000" # 24h
kafka-streams.producer.max.request.size: "20971520" # 20MiB
kafka-streams.producer.transaction.timeout.ms: 100 # Should match
commit.interval.ms, set close to 100ms for exactly_once_v2
kafka-streams.consumer.group.instance.id: ${HOSTNAME}
kafka-streams.consumer.heartbeat.interval.ms: 100
kafka-streams.consumer.session.timeout.ms: 45000
{code}
All input (and aggregate) topics feature 15 partitions and share this
configuration:
{code:yaml}
cleanup.policy: compact
compression.type: lz4
segment.ms: "43200000" # 12h
max.compaction.lag.ms: "86400000" # 24h
delete.retention.ms: "86400000" # 24h
{code}
Logs show no indication of where or why this happens.
The issue was discussed on the Kafka [mailing
list|https://lists.apache.org/thread/l50pwmwhobt73db97n0r5v36mydo15rs] as well
as on
[StackOverflow|https://stackoverflow.com/questions/78210993/kafka-streams-topology-initially-dropping-messages-to-intermediate-topics],
but both threads lead to no further explanation. In the end, I was suggested
to file a bug on the Kafka JIRA. I actually can't rule out that this is
entirely based upon some setting in our Kafka environment, but there are other
[indications|https://stackoverflow.com/questions/75886729/missing-records-in-kafka-streams-foreign-key-join]
of similar message loss on FK Join operations. For the time being, I'd
consider this a bug, perhaps emerging only under certain conditions.
ATM I've no test case to reproduce the issue locally.
In case any additional information is needed, I'd be happy to provide those.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)