[
https://issues.apache.org/jira/browse/KAFKA-17582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17885971#comment-17885971
]
Justine Olshan commented on KAFKA-17582:
----------------------------------------
Hi [Kyle
Kingsbury|https://issues.apache.org/jira/secure/ViewProfile.jspa?name=aphyr].
Thanks for taking a look at transactions through Jepsen testing! I’m familiar
with the framework and think it is super useful for testing distributed
systems.
Can you clarify the test setup? It’s a little unclear from the ticket
description. Is there an input topic that you are copying to an output topic?
Or in other words, the test application reads the input data, commits offsets
and writes the data into the same (or different) topic using a kafka
transaction?
To answer your question
> What... *should* Kafka do with respect to consumer offsets when a transaction
> aborts?
Just as a quick summary:
Transactions in Kafka operate via commands from the Kafka producer (starting
the transaction, writing data and offsets, and deciding to abort or commit). On
aborted transactions, records are still written to the data partitions and the
consumer offsets topic, but the read-committed consumer will skip over the
aborted data records and the group coordinator won’t materialize the aborted
offset commits.
However– there is no native logic in the consumer to reset offsets in the case
of an abort. If you want to “roll back” the offset to reread and reprocess, you
will need to explicitly fetch the consumer’s offsets again. Usually for this to
work, the application must control both the producer and the consumer. See this
example in the code. (This is an application that simply reads from an input
topic and copies it to an output topic transactionally.)
[https://github.com/apache/kafka/blob/1854d4b8a11461b53b59fa109b95f2a4f5003997/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java#L284]
Here the application controls the producer and consumer, chooses when to abort,
and can reset accordingly. In the real world, transactions may encounter
different errors and some are “abortable” meaning the state can be reset in
memory, while some may be fatal and require restart of the application
(producer and consumer) completely. Kafka Streams takes the approach of
triggering a rebalance for the consumer on most errors, and this will fetch the
offsets from the group coordinator in a way similar to explicitly fetching from
an already running consumer and/or restarting the consumer.
This is consistent with what you observed.
> I also suspect that maybe advancing *is* the default behavior (uh oh) and
> rewinding is actually a consequence of a rebalance.
More details on the errors and future plans to make it easier to make the right
decision on errors can be found in
[KIP-1050|https://cwiki.apache.org/confluence/display/KAFKA/KIP-1050%3A+Consistent+error+handling+for+Transactions].
Let me know if you have further questions. I may have missed something in your
description, so let me know if this wasn’t addressing your concern.
> Unpredictable consumer position after transaction abort
> -------------------------------------------------------
>
> Key: KAFKA-17582
> URL: https://issues.apache.org/jira/browse/KAFKA-17582
> Project: Kafka
> Issue Type: Bug
> Components: clients, consumer, documentation
> Affects Versions: 3.8.0
> Reporter: Kyle Kingsbury
> Priority: Critical
> Labels: abort, offset, transaction
> Attachments: 20240919T124411.740-0500(1).zip, Screenshot from
> 2024-09-19 18-45-34.png
>
>
> With the official Kafka Java client, version 3.8.0, the position of consumers
> after a transaction aborts appears unpredictable. Sometimes the consumer
> moves on, skipping over the records it polled in the aborted transaction.
> Sometimes it rewinds to read them again. Sometimes it rewinds *further* than
> the most recent transaction.
> Since the goal of transactions is to enable "exactly-once semantics", it
> seems sensible that the consumer should rewind on abort, such that any
> subsequent transactions would start at the same offsets. Not rewinding leads
> to data loss, since messages are consumed but their effects are not
> committed. Rewinding too far is... just weird.
> I'm seeing this issue in Jepsen tests of Kafka 3.0.0 and other
> Kafka-compatible systems. It occurs without faults, and with a single
> producer and consumer; no other concurrent processes. Here's the producer and
> consumer config:
>
> {{{}Producer config: {"socket.connection.setup.timeout.max.ms" 1000,
> "transactional.id" "jt1", "bootstrap.servers" "n3:9092", "request.timeout.ms"
> 3000, "enable.idempotence" true, "max.block.ms" 10000, "value.serializer"
> "org.apache.kafka.common.serialization.LongSerializer", "retries" 1000,
> "key.serializer" "org.apache.kafka.common.serialization.LongSerializer",
> "socket.connection.setup.timeout.ms" 500, "reconnect.backoff.max.ms" 1000,
> "delivery.timeout.ms" 10000, "acks" "all", "transaction.timeout.ms" 1000{}}}}
> {{{}Consumer config: {"socket.connection.setup.timeout.max.ms" 1000,
> "bootstrap.servers" "n5:9092", "request.timeout.ms" 10000,
> "connections.max.idle.ms" 60000, "session.timeout.ms" 6000,
> "heartbeat.interval.ms" 300, "key.deserializer"
> "org.apache.kafka.common.serialization.LongDeserializer", "group.id"
> "jepsen-group", "metadata.max.age.ms" 60000, "auto.offset.reset" "earliest",
> "isolation.level" "read_committed", "socket.connection.setup.timeout.ms" 500,
> "value.deserializer"
> "org.apache.kafka.common.serialization.LongDeserializer",
> "enable.auto.commit" false, "default.api.timeout.ms" 10000{}}}}
>
> Attached is a test run that shows this behavior, as well as a visualization
> of the reads (polls) and writes (sends) of a single topic-partition.
> In this plot, time flows down, and offsets run left to right. Each
> transaction is a single horizontal line. `w1` denotes a send of value 1, and
> `r2` denotes a poll of read 2. All operations here are performed by the sole
> process in the system, which has a single Kafka consumer and a single Kafka
> client. First, a transaction writes 35 and commits. Second, a transaction
> reads 35 and aborts. Third, a transaction reads 35 and aborts: the consumer
> has clearly re-wound to show the same record twice.
> Then a transaction writes 37. Immediately thereafter a transaction reads 37
> and 38. Unlike before, it did *not* rewind. This transaction also aborts.
> Finally, a transaction writes 39 and 40. Then a transaction reads 39 and 40.
> This transaction commits! Values 35, 37, and 38 have been lost!
> It doesn't seem possible that this is the effect of a consumer rebalance:
> rebalancing should start off the consumer at the last *committed* offset, and
> the last committed offset in this history was actually value 31–it should
> have picked up at 35, 37, etc. This test uses auto.offset.reset=earliest, so
> if the commit were somehow missing, it should have rewound to the start of
> the topic-partition.
> What... *should* Kafka do with respect to consumer offsets when a transaction
> aborts? And is there any sort of documentation for this? I've been digging
> into this problem for almost a week–it manifested as write loss in a Jepsen
> test--and I'm baffled as to how to proceed.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)