[ https://issues.apache.org/jira/browse/KAFKA-17635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17895432#comment-17895432 ]
Bill Bejeck commented on KAFKA-17635: ------------------------------------- [~herbert.wespi] - I've pushed a new PR > Lost events on internal repartition topic when excatly_once_v2 is set and > producer is fenced > -------------------------------------------------------------------------------------------- > > Key: KAFKA-17635 > URL: https://issues.apache.org/jira/browse/KAFKA-17635 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 3.7.1 > Reporter: Herbert Wespi > Assignee: Bill Bejeck > Priority: Major > Labels: exactly-once, streams > Attachments: screenshot-1.png > > > In some of the Kafka streams applications we observed that some events are > missed during processing, when the processing guarantee was set to > exactly_once_v2. > > It happened in different kafka stream applications at different places. The > common pattern is that there was always an internal repartition topic > involved (e.g. FK joins and aggregations on new key) > With the following simplified example we could reproduce the problem: > {code:java} > inputStream > .groupBy((k, v) -> v, Grouped.with(String(), String()).withName("group")) > > .count(Materialized.as("count").withKeySerde(String()).withValueSerde(Long())); > {code} > The analysis showed the following: > * the event exists in the input topic > * after repartition the changelog topic does not have always all events > aggregated. > It happens only occasional on production environment while processing > millions of events on the initial load. > We were able to seldom reproduce the problem in local environment in > debugging mode. > Our assumption is that there is a problem with the purging of events for the > repartition topic. > The StreamTask holds a list of consumedOffsets (used for purging internal > repartition topics). > After we got a TaskMigratedException (e.g. transaction timeout or similar), > the stream task will be migrated and closed dirty. > When the task is restored, then the consumedOffset list is not cleared. > The consumedOffset list may contain offsets from aborted transactions. > On the next purge cycle some not yet committed offset might get deleted from > the repartition topic. > {code:java} > 2024-09-27T11:35:10.021+02:00 WARN 38644 --- [sandbox] [-StreamThread-4] > o.a.k.s.p.internals.StreamThread : stream-thread > [processTest-1-c6756e6e-e134-481c-8875-e26a77d684e7-StreamThread-4] Detected > that the thread is being fenced. This implies that this thread missed a > rebalance and dropped out of the consumer group. Will close out all assigned > tasks and rejoin the consumer group. > org.apache.kafka.streams.errors.TaskMigratedException: Producer got fenced > trying to commit a transaction [stream-thread [main]]; it means all tasks > belonging to this thread should be migrated. > at > org.apache.kafka.streams.processor.internals.StreamsProducer.commitTransaction(StreamsProducer.java:304) > ~[kafka-streams-3.7.1.jar:na] > at > org.apache.kafka.streams.processor.internals.TaskExecutor.commitOffsetsOrTransaction(TaskExecutor.java:203) > ~[kafka-streams-3.7.1.jar:na] > at > org.apache.kafka.streams.processor.internals.TaskExecutor.commitTasksAndMaybeUpdateCommittableOffsets(TaskExecutor.java:154) > ~[kafka-streams-3.7.1.jar:na] > at > org.apache.kafka.streams.processor.internals.TaskManager.commitTasksAndMaybeUpdateCommittableOffsets(TaskManager.java:1875) > ~[kafka-streams-3.7.1.jar:na] > at > org.apache.kafka.streams.processor.internals.TaskManager.commit(TaskManager.java:1842) > ~[kafka-streams-3.7.1.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1337) > ~[kafka-streams-3.7.1.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:986) > ~[kafka-streams-3.7.1.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:686) > ~[kafka-streams-3.7.1.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:645) > ~[kafka-streams-3.7.1.jar:na] > Caused by: org.apache.kafka.clients.consumer.CommitFailedException: > Transaction offset Commit failed due to consumer group metadata mismatch: The > coordinator is not aware of this member. > at > org.apache.kafka.clients.producer.internals.TransactionManager$TxnOffsetCommitHandler.handleResponse(TransactionManager.java:1689) > ~[kafka-clients-3.7.1.jar:na] > at > org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1236) > ~[kafka-clients-3.7.1.jar:na] > at > org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:154) > ~[kafka-clients-3.7.1.jar:na] > at > org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:608) > ~[kafka-clients-3.7.1.jar:na] > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:600) > ~[kafka-clients-3.7.1.jar:na] > at > org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:463) > ~[kafka-clients-3.7.1.jar:na] > at > org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:339) > ~[kafka-clients-3.7.1.jar:na] > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:253) > ~[kafka-clients-3.7.1.jar:na] > at java.base/java.lang.Thread.run(Thread.java:1583) ~[na:na] > 2024-09-27T11:35:10.021+02:00 INFO 38644 --- [sandbox] [-StreamThread-4] > o.a.k.s.processor.internals.StreamTask : stream-thread > [processTest-1-c6756e6e-e134-481c-8875-e26a77d684e7-StreamThread-4] task > [1_3] Suspended from RUNNING > 2024-09-27T11:35:11.420+02:00 INFO 38644 --- [sandbox] [-StreamThread-4] > o.a.k.s.processor.internals.StreamTask : stream-thread > [processTest-1-c6756e6e-e134-481c-8875-e26a77d684e7-StreamThread-4] task > [1_3] Closed dirty > 2024-09-27T11:37:06.782+02:00 INFO 38644 --- [sandbox] [-StreamThread-4] > o.a.k.s.p.i.ProcessorStateManager : stream-thread > [processTest-1-c6756e6e-e134-481c-8875-e26a77d684e7-StreamThread-4] > stream-task [1_3] State store count did not find checkpoint offset, hence > would default to the starting offset at changelog > processTest-1-count-changelog-3 > 2024-09-27T11:37:06.783+02:00 INFO 38644 --- [sandbox] [-StreamThread-4] > o.a.k.s.processor.internals.StreamTask : stream-thread > [processTest-1-c6756e6e-e134-481c-8875-e26a77d684e7-StreamThread-4] task > [1_3] Initialized > 2024-09-27T11:37:06.787+02:00 INFO 38644 --- [sandbox] [-StreamThread-1] > o.a.k.s.s.i.RocksDBTimestampedStore : Opening store count in regular mode > 2024-09-27T11:37:06.843+02:00 INFO 38644 --- [sandbox] [-StreamThread-4] > o.a.k.s.p.i.StoreChangelogReader : stream-thread > [processTest-1-c6756e6e-e134-481c-8875-e26a77d684e7-StreamThread-4] End > offset for changelog processTest-1-count-changelog-3 initialized as 916. > 2024-09-27T11:37:06.843+02:00 INFO 38644 --- [sandbox] [-StreamThread-4] > o.a.k.c.c.internals.LegacyKafkaConsumer : [Consumer > clientId=processTest-1-c6756e6e-e134-481c-8875-e26a77d684e7-StreamThread-4-restore-consumer, > groupId=null] Assigned to partition(s): processTest-1-count-changelog-3, > processTest-1-count-changelog-1 > 2024-09-27T11:37:06.843+02:00 INFO 38644 --- [sandbox] [-StreamThread-4] > o.a.k.c.c.internals.SubscriptionState : [Consumer > clientId=processTest-1-c6756e6e-e134-481c-8875-e26a77d684e7-StreamThread-4-restore-consumer, > groupId=null] Seeking to earliest offset of partition > processTest-1-count-changelog-1 > 2024-09-27T11:37:06.844+02:00 INFO 38644 --- [sandbox] [-StreamThread-4] > o.a.k.c.c.internals.SubscriptionState : [Consumer > clientId=processTest-1-c6756e6e-e134-481c-8875-e26a77d684e7-StreamThread-4-restore-consumer, > groupId=null] Resetting offset for partition processTest-1-count-changelog-3 > to position FetchPosition{offset=0, offsetEpoch=Optional.empty, > currentLeader=LeaderAndEpoch{leader=Optional[localhost:9093 (id: 2 rack: > null)], epoch=0}}. > 2024-09-27T11:37:06.850+02:00 INFO 38644 --- [sandbox] [-StreamThread-4] > o.a.k.s.p.i.StoreChangelogReader : stream-thread > [processTest-1-c6756e6e-e134-481c-8875-e26a77d684e7-StreamThread-4] Finished > restoring changelog processTest-1-count-changelog-3 to store count with a > total number of 456 records > 2024-09-27T11:37:06.851+02:00 INFO 38644 --- [sandbox] [-StreamThread-4] > o.a.k.s.processor.internals.StreamTask : stream-thread > [processTest-1-c6756e6e-e134-481c-8875-e26a77d684e7-StreamThread-4] task > [1_3] Restored and ready to run > 2024-09-27T11:37:06.854+02:00 INFO 38644 --- [sandbox] [-StreamThread-4] > o.a.k.s.p.internals.StreamThread : stream-thread > [processTest-1-c6756e6e-e134-481c-8875-e26a77d684e7-StreamThread-4] > Restoration took 334 ms for all active tasks [0_3, 1_3, 0_1, 1_1] > 2024-09-27T11:37:06.854+02:00 INFO 38644 --- [sandbox] [-StreamThread-4] > o.a.k.s.p.internals.StreamThread : stream-thread > [processTest-1-c6756e6e-e134-481c-8875-e26a77d684e7-StreamThread-4] State > transition from PARTITIONS_ASSIGNED to RUNNING > 2024-09-27T11:37:06.854+02:00 INFO 38644 --- [sandbox] [-StreamThread-4] > org.apache.kafka.streams.KafkaStreams : stream-client > [processTest-1-c6756e6e-e134-481c-8875-e26a77d684e7] State transition from > REBALANCING to RUNNING > {code} > In our test we produced the same amount of events to each partition (4) > In the sample test we just count the events, therefore all 4 partition ahould > have the same count eventually. > !screenshot-1.png! > Our current workaround would be to temporary increase the > transaction.timeout.ms to a very high value. > this should reduce the probability to have the tasks migrated. > However, this is not really a solution. > Another option would be to increase the repartition.purge.interval.ms to a > very high value in order to disable the purging of repartition topics during > initial load. -- This message was sent by Atlassian Jira (v8.20.10#820010)