Hi Kyle, Thank you for sharing your thoughts. As you've pointed out correctly, there are some shortcomings in the current protocol that are being addressed in KIP-890 (planned to be released in 4.0).
I also agree with the lack of detailed up-to-date documentation on the Kafka transaction model. We should docs in 4.0 or shortly after. Let me address some of the questions you've raised in the analysis. The terminology used in Kafka transactions may be confusing, terms such as "read committed", "read uncommitted" that have specific semantics in DB world, in Kafka simply mean that a consumer either sees only committed messages or all messages, it doesn't correspond to DB isolation levels. The exactly-once semantics is defined with the following configuration: 1. Producer uses a transactional id. The name is somewhat confusing as it may imply that this is the id of each transaction, but it is in fact more like an "application id" that identifies an application working with some set of data. 2. Consumers use read_committed isolation. read_uncommitted is more like an optimization, there is no special semantics assigned to it. 3. Auto-commit is disabled. 4. Various time-based configurations (e.g. retention) are set to allow for all operations to complete, e.g. if topic data retention is too short, messages may expire before they are consumed by consumers and we get “lost writes”, but this is by design. >From the data perspective, we have 2 types of data: 1. Messages (data that's fetched from partitions and produced to partitions). 2. Consumer offset values. Messages are immutable, a message that has been read never changes, messages are always written to a new position. Messages in a transaction committed by a producer with a given transactional id would be in the order they are produced and appear exactly once. Transactions from producers with the same transactional id never interleave, but could be interleaved with messages produced by producers with other transactional ids. Let me try to describe Kafka guarantees using the model from https://pmg.csail.mit.edu/papers/icde00.pdf (mentioned in the report). The message writes are isolated by the transactional id, there is no isolation based on data. If say an application runs 2 instances with the different transactional ids that write conflicting messages (conflicting from application perspective), it's a bug in the application. Kafka just guarantees that when two or more producers with the same transactional id are trying to produce data, one would succeed, the others will get fenced and their transactions aborted. In other words, in the Kafka isolation model, messages produced concurrently by producers with different transactional ids are not considered write dependencies. When Kafka fetches messages, it gets messages from a partition for a range (specified by start offset and max bytes). We can think of it as a predicate query SELECT * FROM topic-0 WHERE offset >= startOffset LIMIT N bytes. Messages below the last received offset are immutable and cannot change, messages above the last received offset may get created while the transaction is running (or could be already created, but not visible to the transactions). So from that perspective a predicate anti-dependency cycle is present for messages that are not yet visible to the consumer (either because they haven't been produced or because they are not visible to read_committed consumer yet). Logic that depends only on the fetched messages and doesn't make assumptions about messages above the last visible offset, doesn't have an anti-dependency cycle because we know that messages cannot get updated or created below the last offset. E.g. if we have logic that reads messages from one topic, removes sensitive data and produces messages to another topic it doesn't have an anti-dependency cycle. Note that committed messages won't be visible to a read_committed consumer as long as there is an open transaction started at a lower offset, so if we have a partition like this: [w1][w1][w2][w2][w2][w1][w1][c1][w3][w3][c3] a read_committed consumer would only be able to read the first two messages [w1][w1] until transaction2 (that produced messages [w2]) is committed or aborted. From that perspective, the reader shouldn't get any expectation as for when a committed message is available for reading. When an application commits a transaction that has some messages that the application produced, it can also update consumer offset value. The consumer offset is keyed by <consumer_group_id, topic, partition> and the value is the offset of the message that the application wants to track. The value should reflect the messages that the application processed as part of the transaction. Consumer offset values are read-write, Kafka would return UNSTABLE_OFFSET_COMMIT error if the client tries to read a value while a transaction is pending, so every transaction effectively holds a "write lock" on <consumer_group_id, topic, partition> key. If a consumer uses subscription, Kafka would make sure that offsets can be updated by the client who knows about the latest assignment (as defined by member epoch or generation id which is passed along with the offset update), assuming the client commits offsets only for partitions it's assigned, only one client can commit offsets during the lifetime of an assignment. If assignment changes (as a result of rebalance), the newly assigned client will wait for the transaction that involved <consumer_group_id, topic, partition> to complete (by retrying reads after getting UNSTABLE_OFFSET_COMMIT error), then it would get the latest value. If a consumer doesn't use subscription and uses direct assignment instead, it is assumed that the application uses the same transactional id to update a <consumer_group_id, topic, partition> value. If applications with different transactional ids try to update the same <consumer_group_id, topic, partition> value it's a bug in the application and the result is going to be based on the timing (later update will win). If a transaction is aborted on the client, the in-memory value of the latest consumed offset on the client is not automatically reset, the application needs to do it explicitly https://github.com/apache/kafka/blob/1854d4b8a11461b53b59fa109b95f2a4f5003997/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java#L284. This will re-read the latest offset from Kafka. Assuming the clients follow the rules, the consistency model for consumer offset values is serializable. Applications are supposed to use a higher level framework, like a KStreams, which enforces the correct behavior. The model allows building applications that follow the following pattern: 1. Read latest consumer offset value (generally done only at the beginning or after failures). 2. Consume messages from partitions that correspond to the consumer offset values. 3. Do an operation that doesn't require knowledge of messages beyond the last visible offset (e.g. doesn't assume that we've seen the latest message). 4. Produce results to other partitions and update the new consumer offset value in a transaction. If we read a stale value on step 1 (e.g. because we don't have the latest assignment) the transaction at step 4 will fail because we'll get fenced trying to update consumer offset value. Hopefully, this provided some overview of the logic, let me answer some specific questions. > When will a consumer observe monotonically increasing offsets? The consumer starts with a certain offset and keeps consuming forward. Under the covers it's just a loop of "fetch at current offset", "make the last fetch offset the current offset", "fetch at current offset", etc. On certain errors (barring some bugs, generally the error is that the consumer is trying to read below log start after the portion of the log is truncated) the consumer may reset offset, if the auto reset policy is set, the offset could jump backward. > When will a consumer skip over acknowledged records? I guess this is similar to the previous question. If the auto reset policy is configured, the offset could jump forward. > Is this behavior different within a single call to poll, versus between two subsequent calls? Assuming auto-commit is turned off (which is assumed for EoS), there should be no difference. > Can a rebalance take effect in the middle of a transaction? How do rebalances affect transaction semantics? Yes. There are 2 cases: 1. The client that loses the assignment for the moved partition has updated the consumer offset but didn't commit the transaction yet. In that case it could either commit or abort the transaction. If it starts another transaction, it follows case 2. 2. The client that loses the assignment for the moved partition hasn't updated the consumer offset yet. In that case it won't be able to update offset and will have to abort the transaction. In either case the client that gets the assignments for the moved partition won't be able to read the consumer offset until a transaction that involves the offsets completes. > When will records sent by a producer to a single topic-partition have monotonically increasing offsets? Transactional producers will produce the records in the order application passed the records and exactly once. Each record is assigned a sequence by the producer and it's stored in the actual partition, so even if partition moves to another broker, the client retries or etc. the broker will know whether it's the next record, record that arrived out-of-sequence or a duplicate. > When will those offsets be interleaved with writes from other producers? Writes from the producers with the same transactional id won't interleave. Writes from producers with different transactional ids can interleave. It's worth noting that KStreams supports an isolation model that is based on consumer assignment rather than fully relying on transactional ids (it's known as EoSv2). Given that a consumer cannot update consumer offsets for partitions it's not assigned, transactions that update the consumer offset value for a given <consumer_group_id, topic, partition> key won't interleave. > For both consumers and producers, how do these behaviors differ within a transaction vs. between two subsequent transactions on the same client? Assuming there are no events between the two transactions (aborts, rebalances) there shouldn't be semantic differences. > When is G0 (write cycle) legal? Assuming the transactional ids are chosen to provide proper isolation (e.g. if we want exact order for messages for the whole partition, then at most one application with the same transactional id should produce the partition; if we want exact order of messages with a given key, then at most one application with the same transactional id should produce messages with given key, etc.) we should have no write cycles. If we look from the pure data contention perspective, producing to a partition contends on the end-of-log-offset for each partition, so applications with different transactional ids that produce to the same partition would technically have write cycle because updates of end-of-log-offset will interleave. > When is G1a (aborted read) legal? For EoS it is assumed that we consume with read_committed isolation level, so we don't have aborted reads. Consumer offset values expose only committed values. > When is G1b (intermediate read) legal? For read_committed we read only up to the first in-doubt message (message that belongs to a transaction that's still ongoing), we don't have intermediate reads. Consumer offset values expose only committed values. > When is G1c (circular information flow) legal? Taking dependencies on messages above the last visible offset can create a circular information flow. For example the messages could already be produced in a pending transaction but not visible to the consumer (read_committed) yet. It's also possible that the messages have been committed, but not visible because they are interleaved with in-doubt messages. For example, if an application wanted to look at the last message and based on that decided to produce the next message to the partition, it wouldn't be guaranteed to accomplish that (i.e. no guarantee that the application would produce the next last message, some messages could interleave). > What particular cycles are prohibited, if any? For example, are cycles composed entirely of write-read edges proscribed? I think the answers above cover this. > When is fractured read legal? That is, when can a transaction observe some, but not all, of another transaction’s effects? It's guaranteed that a transaction would eventually observe all messages produced by another transaction, but the time of consumption is not guaranteed. I.e. a consumer could get to the visible end of the log and get only some of the messages of a committed transaction (e.g. because later committed messages of this transaction are interleaved with in-doubt messages of another transaction). > Is it legal to read values written inside the current transaction? Messages of the current transaction won't be visible to read_committed consumers, so the current transaction won't see its messages. (Even for read_uncommitted there are some cases when latest messages are not visible to consumers). > How are explicitly aborted transactions different from those which (e.g.) crash before committing? With KIP-890 part2 there should be no difference. Prior to that (currently) a crashed transaction would go through InitProducerId that would bump the producer epoch on the transaction and disable some anomalies that are possible for transactions that are not going through InitProducerId. > Are the values and offsets returned by poll() correct even if the transaction aborts? With read_committed consumer we get only messages that are committed. > What offsets should a consumer poll after a transaction crashes? If transaction aborts, the consumer needs to read latest consumer offset values from Kafka, here is an example: https://github.com/apache/kafka/blob/1854d4b8a11461b53b59fa109b95f2a4f5003997/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java#L284 . > How should users navigate Kafka’s maze of transaction errors? KIP-1050 is aiming to simplify errors so that applications can have a better chance of implementing correct logic: 1. Retriable errors can be retried by the application, if the application thinks it's just a temporary problem that will fix itself. 2. Abortable errors require a transaction to be aborted and indicate that the application can just rewind its in-memory state to undo any effects of the transaction. 3. Application-recoverable errors indicate that in-memory state may be invalid, so application should fully re-initialize state from Kafka. An example of this is producer-fenced error, which means that another instance of application with the same transactional id made changes to Kafka so the current instance in-memory state may not match the state of Kafka any more. 4. Invalid-configuration errors give applications a chance to either retry or treat them as fatal. An example of this is authorization error: it can be treated as a fatal error if the user just tries to access something they don't permission to access or it can be treated as a transient error if we assume that the permission is granted but maybe there is propagation delay and this specific Kafka cluster hasn't gotten the change yet. > How should users handle errors that occur during the transaction abort process, or during rewind? Assuming KIP-1050 in place, if the application gets a retriable error, it can just retry the operation. If it gets an abortable error, it should rewind in-memory state and abort the transaction. .abort() will never return an abortable error. If it gets an application recoverable error, the application should remove all in-memory state, restart the producer (which will go through InitProducerId to fence other producers and "zombies" from previous retries) and reinitialize from Kafka (maybe it's simpler to think about it as “application should just restart”). > Is the “committed position” the offset of the highest committed record, or the uncommitted offset one higher? Kafka has a notion of "last stable offset" -- it's the highest offset visible to read_committed consumers, and it's calculated as min(first_in-doubt_offset, high-water-mark). (High water mark is the end of the log that's been replicated to all in-sync replicas). There is an example above in this email. > Is it safe to let multiple transactional IDs process records from the same topic-partitions? If they belong to different consumer groups -- yes. There could be multiple applications that process the same messages. E.g. messages could represent a stream of order changes and there could be an application that sends SMS to customers to inform them of the order progress; and another application that collects some analytics about orders. Those applications will use different consumer groups. If consumers are in the same consumer group, they could consume from the same topic-partition (e.g. because there is a split brain and one of them has stale assignment), but only one will be able to update the consumer offset and the others (with stale assignment) will fail and will have to abort the transaction. > Which offsets can auto-commit commit? When is data loss possible? It's assumed that applications that want EoS don't use auto-commit. > How does auto-commit interact with transactions? See above. -Artem On 2024/11/12 16:04:05 Kyle Kingsbury wrote: > Hello all, > > I've spent the last few months testing Bufstream, a Kafka-compatible > system. In the course of that research, we discovered that the Kafka > transaction protocol allows aborted reads, lost writes, and torn > transactions: > > https://jepsen.io/analyses/bufstream-0.1.0 > > In short, the protocol assumes that message delivery is ordered, but > sends messages over different TCP connections, to different nodes, with > automatic retries. When network or node hiccups (e.g. garbage > collection) delay delivery of a commit or abort message, that message > can commit or abort a different, later transaction. Committed > transactions can actually be lost. Aborted transactions can actually > succeed. Transactions can be torn into parts: some of their effects > committed, others lost. > > We've reproduced these problems in both Bufstream and Kafka itself, and > we believe every Kafka-compatible system is most likely susceptible. > KIP-890 may help. Client maintainers may also be able to defend against > this problem by re-initializing producers on indefinite errors, like RPC > timeouts. > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=235834631#KIP890:TransactionsServerSideDefense-BumpEpochonEachTransactionforNewClients(1) . > > Yours truly, > > --Kyle > >