[
https://issues.apache.org/jira/browse/KAFKA-17635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17911275#comment-17911275
]
A. Sophie Blee-Goldman edited comment on KAFKA-17635 at 1/9/25 1:03 AM:
------------------------------------------------------------------------
Nice find! Seems like a pretty bad bug D:
I know the Affect Version is set to 3.7.1 but I'm assuming that's just he
version you saw this on, and not necessarily the minimum affected version? I'm
trying to help some users figure out when/if they might hit this.
First, my understanding is that there are two ways to trigger this bug:
# Hitting a TimeoutException during commit, since we will swallow this
(unless/until task.timeout.ms is hit) and proceed on to purging the repartition
topic events
# Hitting any kind of non-fatal exception (eg TaskMigratedException) after
processing some N records in one poll loop, dropping out & rejoining the group,
and then resuming but only processing some M < N records before getting to the
next commit/purge.
I assume the first case was introduced as part of KIP-572 and therefore affects
2.8+, but the second case probably impact all versions (prior to this fix),
right? Should we change the "Affects Version" of the ticket to reflect this so
users know this may be present in earlier versions as well (and hopefully
encourage them to upgrade for the fix!)
In fact I'm guessing this bug was actually even worse for EOS apps in older
versions, specifically those prior to 3.2, since we used to purge on every
commit. At least the new repartition purge interval config from KAFKA-13549
makes it so that EOS apps aren't purging every 100ms by default, decreasing the
chance of hitting this
Lastly, I guess this primarily affects EOS since uncommitted records in the
repartition topic can't be processed. Whereas under ALOS it's possible the
downstream task managed to consume/process the repartition records before they
were prematurely deleted. So it's probably relatively more rare to hit this
with ALOS than EOS, though both are affected by the bug in theory, yes?
was (Author: ableegoldman):
Nice find! Seems like a pretty bad bug D:
I know the Affect Version is set to 3.7.1 but I'm assuming that's just he
version you saw this on, and not necessarily the minimum affected version? I'm
trying to help some users figure out when/if they might hit this.
First, my understanding is that there are two ways to trigger this bug:
# Hitting a TimeoutException during commit, since we will swallow this
(unless/until task.timeout.ms is hit) and proceed on to purging the repartition
topic events
# Hitting any kind of non-fatal exception (eg TaskMigratedException) after
processing some N records in one poll loop, dropping out & rejoining the group,
and then resuming but only processing some M < N records before getting to the
next commit/purge.
I assume the first case was introduced as part of KIP-572 and therefore affects
2.8+, but the second case probably impact all versions (prior to this fix),
right? Should we change the "Affects Version" of the ticket to reflect this so
users know this may be present in earlier versions as well (and hopefully
encourage them to upgrade for the fix!)
Also, I guess this primarily affects EOS since uncommitted records in the
repartition topic can't be processed. Whereas under ALOS it's possible the
downstream task managed to consume/process the repartition records before they
were prematurely deleted. So it's probably relatively more rare to hit this
with ALOS than EOS, though both are affected by the bug in theory, yes?
> Lost events on internal repartition topic when excatly_once_v2 is set and
> producer is fenced
> --------------------------------------------------------------------------------------------
>
> Key: KAFKA-17635
> URL: https://issues.apache.org/jira/browse/KAFKA-17635
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 3.7.1
> Reporter: Herbert Wespi
> Assignee: Bill Bejeck
> Priority: Major
> Labels: exactly-once, streams
> Fix For: 4.0.0, 3.7.2, 3.9.1, 3.8.2
>
> Attachments: screenshot-1.png
>
>
> In some of the Kafka streams applications we observed that some events are
> missed during processing, when the processing guarantee was set to
> exactly_once_v2.
>
> It happened in different kafka stream applications at different places. The
> common pattern is that there was always an internal repartition topic
> involved (e.g. FK joins and aggregations on new key)
> With the following simplified example we could reproduce the problem:
> {code:java}
> inputStream
> .groupBy((k, v) -> v, Grouped.with(String(), String()).withName("group"))
>
> .count(Materialized.as("count").withKeySerde(String()).withValueSerde(Long()));
> {code}
> The analysis showed the following:
> * the event exists in the input topic
> * after repartition the changelog topic does not have always all events
> aggregated.
> It happens only occasional on production environment while processing
> millions of events on the initial load.
> We were able to seldom reproduce the problem in local environment in
> debugging mode.
> Our assumption is that there is a problem with the purging of events for the
> repartition topic.
> The StreamTask holds a list of consumedOffsets (used for purging internal
> repartition topics).
> After we got a TaskMigratedException (e.g. transaction timeout or similar),
> the stream task will be migrated and closed dirty.
> When the task is restored, then the consumedOffset list is not cleared.
> The consumedOffset list may contain offsets from aborted transactions.
> On the next purge cycle some not yet committed offset might get deleted from
> the repartition topic.
> {code:java}
> 2024-09-27T11:35:10.021+02:00 WARN 38644 --- [sandbox] [-StreamThread-4]
> o.a.k.s.p.internals.StreamThread : stream-thread
> [processTest-1-c6756e6e-e134-481c-8875-e26a77d684e7-StreamThread-4] Detected
> that the thread is being fenced. This implies that this thread missed a
> rebalance and dropped out of the consumer group. Will close out all assigned
> tasks and rejoin the consumer group.
> org.apache.kafka.streams.errors.TaskMigratedException: Producer got fenced
> trying to commit a transaction [stream-thread [main]]; it means all tasks
> belonging to this thread should be migrated.
> at
> org.apache.kafka.streams.processor.internals.StreamsProducer.commitTransaction(StreamsProducer.java:304)
> ~[kafka-streams-3.7.1.jar:na]
> at
> org.apache.kafka.streams.processor.internals.TaskExecutor.commitOffsetsOrTransaction(TaskExecutor.java:203)
> ~[kafka-streams-3.7.1.jar:na]
> at
> org.apache.kafka.streams.processor.internals.TaskExecutor.commitTasksAndMaybeUpdateCommittableOffsets(TaskExecutor.java:154)
> ~[kafka-streams-3.7.1.jar:na]
> at
> org.apache.kafka.streams.processor.internals.TaskManager.commitTasksAndMaybeUpdateCommittableOffsets(TaskManager.java:1875)
> ~[kafka-streams-3.7.1.jar:na]
> at
> org.apache.kafka.streams.processor.internals.TaskManager.commit(TaskManager.java:1842)
> ~[kafka-streams-3.7.1.jar:na]
> at
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1337)
> ~[kafka-streams-3.7.1.jar:na]
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:986)
> ~[kafka-streams-3.7.1.jar:na]
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:686)
> ~[kafka-streams-3.7.1.jar:na]
> at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:645)
> ~[kafka-streams-3.7.1.jar:na]
> Caused by: org.apache.kafka.clients.consumer.CommitFailedException:
> Transaction offset Commit failed due to consumer group metadata mismatch: The
> coordinator is not aware of this member.
> at
> org.apache.kafka.clients.producer.internals.TransactionManager$TxnOffsetCommitHandler.handleResponse(TransactionManager.java:1689)
> ~[kafka-clients-3.7.1.jar:na]
> at
> org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1236)
> ~[kafka-clients-3.7.1.jar:na]
> at
> org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:154)
> ~[kafka-clients-3.7.1.jar:na]
> at
> org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:608)
> ~[kafka-clients-3.7.1.jar:na]
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:600)
> ~[kafka-clients-3.7.1.jar:na]
> at
> org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:463)
> ~[kafka-clients-3.7.1.jar:na]
> at
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:339)
> ~[kafka-clients-3.7.1.jar:na]
> at
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:253)
> ~[kafka-clients-3.7.1.jar:na]
> at java.base/java.lang.Thread.run(Thread.java:1583) ~[na:na]
> 2024-09-27T11:35:10.021+02:00 INFO 38644 --- [sandbox] [-StreamThread-4]
> o.a.k.s.processor.internals.StreamTask : stream-thread
> [processTest-1-c6756e6e-e134-481c-8875-e26a77d684e7-StreamThread-4] task
> [1_3] Suspended from RUNNING
> 2024-09-27T11:35:11.420+02:00 INFO 38644 --- [sandbox] [-StreamThread-4]
> o.a.k.s.processor.internals.StreamTask : stream-thread
> [processTest-1-c6756e6e-e134-481c-8875-e26a77d684e7-StreamThread-4] task
> [1_3] Closed dirty
> 2024-09-27T11:37:06.782+02:00 INFO 38644 --- [sandbox] [-StreamThread-4]
> o.a.k.s.p.i.ProcessorStateManager : stream-thread
> [processTest-1-c6756e6e-e134-481c-8875-e26a77d684e7-StreamThread-4]
> stream-task [1_3] State store count did not find checkpoint offset, hence
> would default to the starting offset at changelog
> processTest-1-count-changelog-3
> 2024-09-27T11:37:06.783+02:00 INFO 38644 --- [sandbox] [-StreamThread-4]
> o.a.k.s.processor.internals.StreamTask : stream-thread
> [processTest-1-c6756e6e-e134-481c-8875-e26a77d684e7-StreamThread-4] task
> [1_3] Initialized
> 2024-09-27T11:37:06.787+02:00 INFO 38644 --- [sandbox] [-StreamThread-1]
> o.a.k.s.s.i.RocksDBTimestampedStore : Opening store count in regular mode
> 2024-09-27T11:37:06.843+02:00 INFO 38644 --- [sandbox] [-StreamThread-4]
> o.a.k.s.p.i.StoreChangelogReader : stream-thread
> [processTest-1-c6756e6e-e134-481c-8875-e26a77d684e7-StreamThread-4] End
> offset for changelog processTest-1-count-changelog-3 initialized as 916.
> 2024-09-27T11:37:06.843+02:00 INFO 38644 --- [sandbox] [-StreamThread-4]
> o.a.k.c.c.internals.LegacyKafkaConsumer : [Consumer
> clientId=processTest-1-c6756e6e-e134-481c-8875-e26a77d684e7-StreamThread-4-restore-consumer,
> groupId=null] Assigned to partition(s): processTest-1-count-changelog-3,
> processTest-1-count-changelog-1
> 2024-09-27T11:37:06.843+02:00 INFO 38644 --- [sandbox] [-StreamThread-4]
> o.a.k.c.c.internals.SubscriptionState : [Consumer
> clientId=processTest-1-c6756e6e-e134-481c-8875-e26a77d684e7-StreamThread-4-restore-consumer,
> groupId=null] Seeking to earliest offset of partition
> processTest-1-count-changelog-1
> 2024-09-27T11:37:06.844+02:00 INFO 38644 --- [sandbox] [-StreamThread-4]
> o.a.k.c.c.internals.SubscriptionState : [Consumer
> clientId=processTest-1-c6756e6e-e134-481c-8875-e26a77d684e7-StreamThread-4-restore-consumer,
> groupId=null] Resetting offset for partition processTest-1-count-changelog-3
> to position FetchPosition{offset=0, offsetEpoch=Optional.empty,
> currentLeader=LeaderAndEpoch{leader=Optional[localhost:9093 (id: 2 rack:
> null)], epoch=0}}.
> 2024-09-27T11:37:06.850+02:00 INFO 38644 --- [sandbox] [-StreamThread-4]
> o.a.k.s.p.i.StoreChangelogReader : stream-thread
> [processTest-1-c6756e6e-e134-481c-8875-e26a77d684e7-StreamThread-4] Finished
> restoring changelog processTest-1-count-changelog-3 to store count with a
> total number of 456 records
> 2024-09-27T11:37:06.851+02:00 INFO 38644 --- [sandbox] [-StreamThread-4]
> o.a.k.s.processor.internals.StreamTask : stream-thread
> [processTest-1-c6756e6e-e134-481c-8875-e26a77d684e7-StreamThread-4] task
> [1_3] Restored and ready to run
> 2024-09-27T11:37:06.854+02:00 INFO 38644 --- [sandbox] [-StreamThread-4]
> o.a.k.s.p.internals.StreamThread : stream-thread
> [processTest-1-c6756e6e-e134-481c-8875-e26a77d684e7-StreamThread-4]
> Restoration took 334 ms for all active tasks [0_3, 1_3, 0_1, 1_1]
> 2024-09-27T11:37:06.854+02:00 INFO 38644 --- [sandbox] [-StreamThread-4]
> o.a.k.s.p.internals.StreamThread : stream-thread
> [processTest-1-c6756e6e-e134-481c-8875-e26a77d684e7-StreamThread-4] State
> transition from PARTITIONS_ASSIGNED to RUNNING
> 2024-09-27T11:37:06.854+02:00 INFO 38644 --- [sandbox] [-StreamThread-4]
> org.apache.kafka.streams.KafkaStreams : stream-client
> [processTest-1-c6756e6e-e134-481c-8875-e26a77d684e7] State transition from
> REBALANCING to RUNNING
> {code}
> In our test we produced the same amount of events to each partition (4)
> In the sample test we just count the events, therefore all 4 partition ahould
> have the same count eventually.
> !screenshot-1.png!
> Our current workaround would be to temporary increase the
> transaction.timeout.ms to a very high value.
> this should reduce the probability to have the tasks migrated.
> However, this is not really a solution.
> Another option would be to increase the repartition.purge.interval.ms to a
> very high value in order to disable the purging of repartition topics during
> initial load.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)