Hi Kyle,

Thank you for sharing your thoughts.  As you've pointed out correctly,
there are some shortcomings in the current protocol that are being
addressed in KIP-890 (planned to be released in 4.0).

I also agree with the lack of detailed up-to-date documentation on the
Kafka transaction model.  We should docs in 4.0 or shortly after.  Let me
address some of the questions you've raised in the analysis.

The terminology used in Kafka transactions may be confusing, terms such as
"read committed", "read uncommitted" that have specific semantics in DB
world, in Kafka simply mean that a consumer either sees only committed
messages or all messages, it doesn't correspond to DB isolation levels.

The exactly-once semantics is defined with the following configuration:
1. Producer uses a transactional id.  The name is somewhat confusing as it
may imply that this is the id of each transaction, but it is in fact more
like an "application id" that identifies an application working with some
set of data.
2. Consumers use read_committed isolation.  read_uncommitted is more like
an optimization, there is no special semantics assigned to it.
3. Auto-commit is disabled.
4. Various time-based configurations (e.g. retention) are set to allow for
all operations to complete, e.g. if topic data retention is too short,
messages may expire before they are consumed by consumers and we get “lost
writes”, but this is by design.

>From the data perspective, we have 2 types of data:
1. Messages (data that's fetched from partitions and produced to
partitions).
2. Consumer offset values.

Messages are immutable, a message that has been read never changes,
messages are always written to a new position.  Messages in a transaction
committed by a producer with a given transactional id would be in the order
they are produced and appear exactly once.  Transactions from producers
with the same transactional id never interleave, but could be interleaved
with messages produced by producers with other transactional ids.

Let me try to describe Kafka guarantees using the model from
https://pmg.csail.mit.edu/papers/icde00.pdf (mentioned in the report).

The message writes are isolated by the transactional id, there is no
isolation based on data.  If say an application runs 2 instances with the
different transactional ids that write conflicting messages (conflicting
from application perspective), it's a bug in the application.  Kafka just
guarantees that when two or more producers with the same transactional id
are trying to produce data, one would succeed, the others will get fenced
and their transactions aborted.

In other words, in the Kafka isolation model, messages produced
concurrently by producers with different transactional ids are not
considered write dependencies.

When Kafka fetches messages, it gets messages from a partition for a range
(specified by start offset and max bytes).  We can think of it as a
predicate query SELECT * FROM topic-0 WHERE offset >= startOffset LIMIT N
bytes.  Messages below the last received offset are immutable and cannot
change, messages above the last received offset may get created while the
transaction is running (or could be already created, but not visible to the
transactions).  So from that perspective a predicate anti-dependency cycle
is present for messages that are not yet visible to the consumer (either
because they haven't been produced or because they are not visible to
read_committed consumer yet).

Logic that depends only on the fetched messages and doesn't make
assumptions about messages above the last visible offset, doesn't have an
anti-dependency cycle because we know that messages cannot get updated or
created below the last offset.  E.g. if we have logic that reads messages
from one topic, removes sensitive data and produces messages to another
topic it doesn't have an anti-dependency cycle.

Note that committed messages won't be visible to a read_committed consumer
as long as there is an open transaction started at a lower offset, so if we
have a partition like this: [w1][w1][w2][w2][w2][w1][w1][c1][w3][w3][c3] a
read_committed consumer would only be able to read the first two messages
[w1][w1] until transaction2 (that produced messages [w2]) is committed or
aborted.  From that perspective, the reader shouldn't get any expectation
as for when a committed message is available for reading.

When an application commits a transaction that has some messages that the
application produced, it can also update consumer offset value.  The
consumer offset is keyed by <consumer_group_id, topic, partition> and the
value is the offset of the message that the application wants to track.
The value should reflect the messages that the application processed as
part of the transaction.

Consumer offset values are read-write, Kafka would return
UNSTABLE_OFFSET_COMMIT error if the client tries to read a value while a
transaction is pending, so every transaction effectively holds a "write
lock" on <consumer_group_id, topic, partition> key.

If a consumer uses subscription, Kafka would make sure that offsets can be
updated by the client who knows about the latest assignment (as defined by
member epoch or generation id which is passed along with the offset
update), assuming the client commits offsets only for partitions it's
assigned, only one client can commit offsets during the lifetime of an
assignment.  If assignment changes (as a result of rebalance), the newly
assigned client will wait for the transaction that involved
<consumer_group_id, topic, partition> to complete (by retrying reads after
getting UNSTABLE_OFFSET_COMMIT error), then it would get the latest value.

If a consumer doesn't use subscription and uses direct assignment instead,
it is assumed that the application uses the same transactional id to update
a <consumer_group_id, topic, partition> value.  If applications with
different transactional ids try to update the same <consumer_group_id,
topic, partition> value it's a bug in the application and the result is
going to be based on the timing (later update will win).

If a transaction is aborted on the client, the in-memory value of the
latest consumed offset on the client is not automatically reset, the
application needs to do it explicitly
https://github.com/apache/kafka/blob/1854d4b8a11461b53b59fa109b95f2a4f5003997/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java#L284.
This will re-read the latest offset from Kafka.

Assuming the clients follow the rules, the consistency model for consumer
offset values is serializable.  Applications are supposed to use a higher
level framework, like a KStreams, which enforces the correct behavior.

The model allows building applications that follow the following pattern:

1. Read latest consumer offset value (generally done only at the beginning
or after failures).
2. Consume messages from partitions that correspond to the consumer offset
values.
3. Do an operation that doesn't require knowledge of messages beyond the
last visible offset (e.g. doesn't assume that we've seen the latest
message).
4. Produce results to other partitions and update the new consumer offset
value in a transaction.

If we read a stale value on step 1 (e.g. because we don't have the latest
assignment) the transaction at step 4 will fail because we'll get fenced
trying to update consumer offset value.

Hopefully, this provided some overview of the logic, let me answer some
specific questions.

> When will a consumer observe monotonically increasing offsets?

The consumer starts with a certain offset and keeps consuming forward.
Under the covers it's just a loop of "fetch at current offset", "make the
last fetch offset the current offset", "fetch at current offset", etc.  On
certain errors (barring some bugs, generally the error is that the consumer
is trying to read below log start after the portion of the log is
truncated) the consumer may reset offset, if the auto reset policy is set,
the offset could jump backward.

> When will a consumer skip over acknowledged records?

I guess this is similar to the previous question.  If the auto reset policy
is configured, the offset could jump forward.

> Is this behavior different within a single call to poll, versus between
two subsequent calls?

Assuming auto-commit is turned off (which is assumed for EoS), there should
be no difference.

> Can a rebalance take effect in the middle of a transaction? How do
rebalances affect transaction semantics?

Yes. There are 2 cases:

1. The client that loses the assignment for the moved partition has updated
the consumer offset but didn't commit the transaction yet.  In that case it
could either commit or abort the transaction.  If it starts another
transaction, it follows case 2.
2. The client that loses the assignment for the moved partition hasn't
updated the consumer offset yet.  In that case it won't be able to update
offset and will have to abort the transaction.

In either case the client that gets the assignments for the moved partition
won't be able to read the consumer offset until a transaction that involves
the offsets completes.

> When will records sent by a producer to a single topic-partition have
monotonically increasing offsets?

Transactional producers will produce the records in the order application
passed the records and exactly once.  Each record is assigned a sequence by
the producer and it's stored in the actual partition, so even if partition
moves to another broker, the client retries or etc. the broker will know
whether it's the next record, record that arrived out-of-sequence or a
duplicate.

> When will those offsets be interleaved with writes from other producers?

Writes from the producers with the same transactional id won't interleave.
Writes from producers with different transactional ids can interleave.

It's worth noting that KStreams supports an isolation model that is based
on consumer assignment rather than fully relying on transactional ids (it's
known as EoSv2).  Given that a consumer cannot update consumer offsets for
partitions it's not assigned, transactions that update the consumer offset
value for a given <consumer_group_id, topic, partition> key won't
interleave.

> For both consumers and producers, how do these behaviors differ within a
transaction vs. between two subsequent transactions on the same client?

Assuming there are no events between the two transactions (aborts,
rebalances) there shouldn't be semantic differences.

> When is G0 (write cycle) legal?

Assuming the transactional ids are chosen to provide proper isolation (e.g.
if we want exact order for messages for the whole partition, then at most
one application with the same transactional id should produce the
partition; if we want exact order of messages with a given key, then at
most one application with the same transactional id should produce messages
with given key, etc.) we should have no write cycles.

If we look from the pure data contention perspective, producing to a
partition contends on the end-of-log-offset for each partition, so
applications with different transactional ids that produce to the same
partition would technically have write cycle because updates of
end-of-log-offset will interleave.

> When is G1a (aborted read) legal?

For EoS it is assumed that we consume with read_committed isolation level,
so we don't have aborted reads.  Consumer offset values expose only
committed values.

> When is G1b (intermediate read) legal?

For read_committed we read only up to the first in-doubt message (message
that belongs to a transaction that's still ongoing), we don't have
intermediate reads.  Consumer offset values expose only committed values.

> When is G1c (circular information flow) legal?

Taking dependencies on messages above the last visible offset can create a
circular information flow.  For example the messages could already be
produced in a pending transaction but not visible to the consumer
(read_committed) yet.  It's also possible that the messages have been
committed, but not visible because they are interleaved with in-doubt
messages.  For example, if an application wanted to look at the last
message and based on that decided to produce the next message to the
partition, it wouldn't be guaranteed to accomplish that (i.e. no guarantee
that the application would produce the next last message, some messages
could interleave).

> What particular cycles are prohibited, if any? For example, are cycles
composed entirely of write-read edges proscribed?

I think the answers above cover this.

> When is fractured read legal? That is, when can a transaction observe
some, but not all, of another transaction’s effects?

It's guaranteed that a transaction would eventually observe all messages
produced by another transaction, but the time of consumption is not
guaranteed.  I.e. a consumer could get to the visible end of the log and
get only some of the messages of a committed transaction (e.g. because
later committed messages of this transaction are interleaved with in-doubt
messages of another transaction).

> Is it legal to read values written inside the current transaction?

Messages of the current transaction won't be visible to read_committed
consumers, so the current transaction won't see its messages.  (Even for
read_uncommitted there are some cases when latest messages are not visible
to consumers).

> How are explicitly aborted transactions different from those which (e.g.)
crash before committing?

With KIP-890 part2 there should be no difference.  Prior to that
(currently) a crashed transaction would go through InitProducerId that
would bump the producer epoch on the transaction and disable some anomalies
that are possible for transactions that are not going through
InitProducerId.

> Are the values and offsets returned by poll() correct even if the
transaction aborts?

With read_committed consumer we get only messages that are committed.

> What offsets should a consumer poll after a transaction crashes?

If transaction aborts, the consumer needs to read latest consumer offset
values from Kafka, here is an example:
https://github.com/apache/kafka/blob/1854d4b8a11461b53b59fa109b95f2a4f5003997/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java#L284
.

> How should users navigate Kafka’s maze of transaction errors?

KIP-1050 is aiming to simplify errors so that applications can have a
better chance of implementing correct logic:
1. Retriable errors can be retried by the application, if the application
thinks it's just a temporary problem that will fix itself.
2. Abortable errors require a transaction to be aborted and indicate that
the application can just rewind its in-memory state to undo any effects of
the transaction.
3. Application-recoverable errors indicate that in-memory state may be
invalid, so application should fully re-initialize state from Kafka.  An
example of this is producer-fenced error, which means that another instance
of application with the same transactional id made changes to Kafka so the
current instance in-memory state may not match the state of Kafka any more.
4. Invalid-configuration errors give applications a chance to either retry
or treat them as fatal.  An example of this is authorization error: it can
be treated as a fatal error if the user just tries to access something they
don't permission to access or it can be treated as a transient error if we
assume that the permission is granted but maybe there is propagation delay
and this specific Kafka cluster hasn't gotten the change yet.

> How should users handle errors that occur during the transaction abort
process, or during rewind?

Assuming KIP-1050 in place, if the application gets a retriable error, it
can just retry the operation.  If it gets an abortable error, it should
rewind in-memory state and abort the transaction.  .abort() will never
return an abortable error.  If it gets an application recoverable error,
the application should remove all in-memory state, restart the producer
(which will go through InitProducerId to fence other producers and
"zombies" from previous retries) and reinitialize from Kafka (maybe it's
simpler to think about it as “application should just restart”).

> Is the “committed position” the offset of the highest committed record,
or the uncommitted offset one higher?

Kafka has a notion of "last stable offset" -- it's the highest offset
visible to read_committed consumers, and it's calculated as
min(first_in-doubt_offset, high-water-mark).  (High water mark is the end
of the log that's been replicated to all in-sync replicas).  There is an
example above in this email.

> Is it safe to let multiple transactional IDs process records from the
same topic-partitions?

If they belong to different consumer groups -- yes.  There could be
multiple applications that process the same messages.  E.g. messages could
represent a stream of order changes and there could be an application that
sends SMS to customers to inform them of the order progress; and another
application that collects some analytics about orders.  Those applications
will use different consumer groups.

If consumers are in the same consumer group, they could consume from the
same topic-partition (e.g. because there is a split brain and one of them
has stale assignment), but only one will be able to update the consumer
offset and the others (with stale assignment) will fail and will have to
abort the transaction.

> Which offsets can auto-commit commit? When is data loss possible?

It's assumed that applications that want EoS don't use auto-commit.

> How does auto-commit interact with transactions?

See above.

-Artem

On 2024/11/12 16:04:05 Kyle Kingsbury wrote:
> Hello all,
>
> I've spent the last few months testing Bufstream, a Kafka-compatible
> system. In the course of that research, we discovered that the Kafka
> transaction protocol allows aborted reads, lost writes, and torn
> transactions:
>
> https://jepsen.io/analyses/bufstream-0.1.0
>
> In short, the protocol assumes that message delivery is ordered, but
> sends messages over different TCP connections, to different nodes, with
> automatic retries. When network or node hiccups (e.g. garbage
> collection) delay delivery of a commit or abort message, that message
> can commit or abort a different, later transaction. Committed
> transactions can actually be lost. Aborted transactions can actually
> succeed. Transactions can be torn into parts: some of their effects
> committed, others lost.
>
> We've reproduced these problems in both Bufstream and Kafka itself, and
> we believe every Kafka-compatible system is most likely susceptible.
> KIP-890 may help. Client maintainers may also be able to defend against
> this problem by re-initializing producers on indefinite errors, like RPC
> timeouts.
>
>
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=235834631#KIP890:TransactionsServerSideDefense-BumpEpochonEachTransactionforNewClients(1)
.
>
> Yours truly,
>
> --Kyle
>
>

Reply via email to