[ 
https://issues.apache.org/jira/browse/KAFKA-17754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17888451#comment-17888451
 ] 

Artem Livshits commented on KAFKA-17754:
----------------------------------------

Hi Kyle,

Thanks for doing the tests and finding the issues!  What you've found is one of 
many anomalies in the existing protocol that stems from, as you've put 
correctly "There is no sequence number to order requests from the same client. 
There is no concept of a transaction number."  This is exactly what we're 
fixing in 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-890%3A+Transactions+Server-Side+Defense
 -- in essence, we'll be incrementing epoch for every transaction, which would 
uniquely identify every transaction and fence stale requests.

With KIP-890 in place, the sequence would look like the following:

1. Client starts txn1 at epoch 42, sends endTxn (commit, epoch 42), gets 
timeout or etc.
2. Client retries endTxn (commit, epoch 42), succeeds, the epoch is 43 now.
3. Client starts txn2 at epoch 43, adds partitions, produces data.
4. The "zombie" endTxn (commit, epoch 42) from step1 arrives, but is fenced, 
because epoch is 43 now.
5. Client continues with txn2, ultimately aborts it.

Without KIP-890, step 4 could commit a torn transaction (commit data from step 
3 that should've been aborted at step 5).

There are also other anomalies that can happen without KIP-890.

1. Transaction coordinator sends writeTxnMarkers RPC to partition leaders with 
commit markers, times out.
2. Transaction coordinator retries, succeeds.
3. Data for the new transaction arrives to the partition leader.
4. Transaction commit marker from step1 arrives, commits transaction to the 
partition leader.
5. Transaction is ultimately aborted, resulting in torn transaction.

Here again, with KIP-890 we'd have a new epoch at step3, so a "zombie" marker 
would be fenced.

One more example (I like this stuff!):

1. Client sends offset commit as part of the transaction, times out.
2. Client retries, succeeds.
3. Client starts a new transaction, sends new offset commits.
4. The offset commit from step1 arrives.
5. Client commits transaction with stale offset.
6. Client restarts reads stale offsets, produces duplicates.

Here again, with KIP-890 we'd have a new epoch at step3, so the "zombie" offset 
commit would be fenced.

There are probably more, but just looking at a few of those it became clear 
that the protocol is missing fundamental "zombie" protection, so we're fixing 
it with KIP-890.  Justine had a talk recently where she goes into detail of 
some anomalies that are fixed with KIP-890 - 
https://current.confluent.io/2024-sessions/fortifying-your-transactions-with-kip-890.

We're are one PR (https://github.com/apache/kafka/pull/17402) away from getting 
KIP-890 functionality in Kafka codebase (there is a second PR to do perf 
improvement, but the correctness part should be completed with the first PR), 
hopefully we'll get it in within a week or so.  I think it would be more 
productive to run the tests once the new logic is in place.

> A delayed EndTxn message can cause aborted read, lost writes, atomicity 
> violation
> ---------------------------------------------------------------------------------
>
>                 Key: KAFKA-17754
>                 URL: https://issues.apache.org/jira/browse/KAFKA-17754
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients, producer , protocol
>    Affects Versions: 3.8.0
>            Reporter: Kyle Kingsbury
>            Assignee: Justine Olshan
>            Priority: Major
>              Labels: atomic, transaction
>         Attachments: lamport-1.png
>
>
> In short: I believe that both an internal retry mechanism and guidance from 
> the example code and client API docs are independently capable of causing 
> committed transactions to actually abort, aborted transactions to actually 
> commit, and transactions to be split into multiple parts with different 
> fates. A delayed EndTxn message could arrive seconds later and decide the 
> fate of an unrelated transaction.
> Consider the attached Lamport diagram, reconstructed from node logs and 
> packet captures in a recent Jepsen test. In it, a single process, using a 
> single producer and consumer, executes a series of transactions which all 
> commit or abort cleanly. Process 76 selected the unique transactional ID 
> `jt1234` on initialization.
> From packet captures and debug logs, we see `jt1234` used producer ID `233`, 
> submitted all four operations, then sent an EndTxn message with `committed = 
> false`, which denotes a transaction abort. However, fifteen separate calls to 
> `poll` observed this transaction's write of `424` to key `5`---an obvious 
> case of aborted read (G1a). Even stranger, *no* poller observed the other 
> writes from this transaction: key `17` apparently never
> received values `926` or `927`. Why?
> From the packet capture and logs: process 76 began a transaction which sent 
> `1018` to key `15`. It sent an `EndTxn` message to commit that transaction to 
> node `n3`. However, it did not receive a prompt response. The client then 
> quietly sent a *second* commit message to `n4`, which returned successfully; 
> the test harness's call to `commitTransaction` completed successfully. The 
> process then performed and intentionally aborted a second transaction; this 
> completed OK. So far, so good.
> Then process 76 began our problematic transaction. It sent `424` to key `5`, 
> and added new partitions to the transaction. Just after accepting record 
> `424`, node `n3` received the delayed commit message from two transactions 
> previously. It committed the current transaction, effectively chopping it in 
> half. The first half (record `424`) was committed and visible to pollers. The 
> second half, sending `926` and `927` to key `17`, implicitly began a second 
> transaction, which was aborted by the client.
> This suggests a fundamental problem in the Kafka transaction protocol. The 
> protocol is intentionally designed to allow clients to submit requests over 
> multiple TCP connections and to distribute them across multiple nodes. There 
> is no sequence number to order requests from the same client. There is no 
> concept of a transaction number. When a server receives a commit (or abort) 
> message, it has no way to know what transaction the client intended to 
> commit. It simply commits or aborts whatever transaction happens to be in 
> progress.
> This means transactions which appeared to commit could actually abort, and 
> vice versa: we observed both aborted reads and lost writes. It also means 
> transactions could get chopped in to smaller pieces: one could lose some, but 
> not all, of a transaction's effects.
> What does it take to get this behavior? First, an `EndTxn` message must be 
> delayed---for instance due to network latency, packet loss, a slow computer, 
> garbage collection, etc. Second, while that `EndTxn` arrow is hovering in the 
> air, the client needs to move on to perform a second transaction using the 
> same producer ID and epoch. There are several ways this could happen.
> First, users could explicitly retry committing or aborting a transaction. The 
> docs say they can, and the client won't stop them.
> Second, the official Kafka Java client docs instruct users repeatedly 
> instruct users to call `abortTransaction` if an error occurs during 
> `commitTransaction`. The provided example code leads directly to this 
> behavior: if `commitTransaction` times out, it calls `abortTransaction`, and 
> violà: the client can move on to later operations. The only exceptions in the 
> docs are `ProducerFencedException`, `OutOfOrderSequenceException`, and  
> `AuthorizationException`, none of which apply here.
> I've tried to avoid this problem by ensuring that transactions either commit 
> once, or abort once, never both. Sadly, this doesn't work. Indeed, process 76 
> in this test run *never* tried to abort a transaction after calling 
> commit---and even though it only calls `commitTransaction` once, it sent 
> *two* commit messages to two different nodes. I suspect this is because the 
> Java client treats timeouts as retriable 
> ([https://github.com/apache/kafka/blob/8125c3da5bb6ebb35a0cb3494624d33fad4e3187/clients/src/main/java/org/apache/kafka/common/errors/TimeoutException.java#L22]),
>  and the transaction manager appears to perform its own internal retries.
> I'm not entirely sure how to fix this--the protocol feels like it's missing 
> critical ordering information, and you can't rely on e.g. TCP ordering, 
> because it's multi-stream. One option might be to force the producer to 
> acquire a new epoch if it ever encounters an indefinite result from an EndTxn 
> message-then the producer fencing mechanism would prevent any delayed EndTxn 
> messages from being processed, right?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to