Kyle Kingsbury created KAFKA-17754:
--------------------------------------
Summary: A delayed EndTxn message can cause aborted read, lost
writes, atomicity violation
Key: KAFKA-17754
URL: https://issues.apache.org/jira/browse/KAFKA-17754
Project: Kafka
Issue Type: Bug
Components: clients, producer , protocol
Affects Versions: 3.8.0
Reporter: Kyle Kingsbury
Attachments: g1a-trace.svg
In short: I believe that both an internal retry mechanism and guidance from the
example code and client API docs are independently capable of causing committed
transactions to actually abort, aborted transactions to actually commit, and
transactions to be split into multiple parts with different fates. A delayed
EndTxn message could arrive seconds later and decide the fate of an unrelated
transaction.
Consider the attached Lamport diagram, reconstructed from node logs and packet
captures in a recent Jepsen test. In it, a single process, using a single
producer and consumer, executes a series of transactions which all commit or
abort cleanly.Process 76 selected the unique transactional ID `jt1234` on
initialization.
>From packet captures and debug logs, we see `jt1234` used producer ID `233`,
>submitted all four operations, then sent an EndTxn message with `committed =
>false`, which denotes a transaction abort. However, fifteen separate calls to
>`poll` observed this transaction's write of `424` to key `5`---an obvious case
>of aborted read (G1a). Even stranger, *no* poller observed the other writes
>from this transaction: key `17` apparently never
received values `926` or `927`. Why?
Close inspection of the packet capture, combined with Bufstream's logs, allowed
us to reconstruct what happened. Process 76 began a transaction which sent
`1018` to key `15`. It sent an `EndTxn` message to commit that transaction to
node `n3`. However, it did not receive a prompt response. The client then
quietly sent a *second* commit message to `n4`, which returned successfully;
the test harness's call to `commitTransaction` completed successfully. The
process then performed and intentionally aborted a second transaction; this
completed OK. So far, so good.
Then process 76 began our problematic transaction. It sent `424` to key `5`,
and added new partitions to the transaction. Just after accepting record `424`,
node `n3` received the delayed commit message from two transactions previously.
It committed the current transaction, effectively chopping it in half. The
first half (record `424`) was committed and visible to pollers. The second
half, sending `926` and `927` to key `17`, implicitly began a second
transaction, which was aborted by the client.
This suggests a fundamental problem in the Kafka transaction protocol. The
protocol is [intentionally designed](https://kafka.apache.org/protocol) to
allow clients to submit requests over multiple TCP connections and to
distribute them across multiple nodes. There is no sequence number to order
requests from the same client. There is no concept of a transaction number.
When a server receives a commit (or abort) message, it has no way to know what
transaction the client intended to commit. It simply commits or aborts whatever
transaction happens to be in progress.
This means transactions which appeared to commit could actually abort, and vice
versa: we observed both aborted reads and lost writes. It also means
transactions could get chopped in to smaller pieces: one could lose some, but
not all, of a transaction's effects.
What does it take to get this behavior? First, an `EndTxn` message must be
delayed---for instance due to network latency, packet loss, a slow computer,
garbage collection, etc. Second, while that `EndTxn` arrow is hovering in the
air, the client needs to move on to perform a second transaction using the same
producer ID and epoch. There are several ways this could happen.
First, users could explicitly retry committing or aborting a transaction. The
docs say they can, and the client won't stop them.
Second, the official Kafka Java client docs instruct users repeatedly instruct
users to call `abortTransaction` if an error occurs during
`commitTransaction`.[^abort-exceptions] The provided example code leads
directly to this behavior: if `commitTransaction` times out, it calls
`abortTransaction`, and violĂ : the client can move on to later operations. The
only exceptions in the docs are `ProducerFencedException`,
`OutOfOrderSequenceException`, and `AuthorizationException`, none of which
apply here.
I've tried to avoid this problem by ensuring that transactions either commit
once, or abort once, never both. Sadly, this doesn't work. Indeed, process 76
in this test run *never* tried to abort a transaction after calling
commit---and even though it only calls `commitTransaction` once, it sent *two*
commit messages to two different nodes. I suspect this is because the Java
client treats timeouts as retriable
(https://github.com/apache/kafka/blob/8125c3da5bb6ebb35a0cb3494624d33fad4e3187/clients/src/main/java/org/apache/kafka/common/errors/TimeoutException.java#L22),
and the transaction manager appears to perform its own internal retries.
I'm not entirely sure how to fix this--the protocol feels like it's missing
critical ordering information, and you can't rely on e.g. TCP ordering, because
it's multi-stream. One option might be to force the producer to acquire a new
epoch if it ever encounters an indefinite result from an EndTxn message--then
the producer fencing mechanism would prevent any delayed EndTxn messages from
being processed, right?
--
This message was sent by Atlassian Jira
(v8.20.10#820010)