Thanks for providing more details.
It makes sense that you need to ensure that an "insert" comes before an
"update".
If I understand you correctly, that's what idempotence guarantees. If the
PUT is not appended to the log, it will be retried, but then thread 2
can't
have observed it.
Yes.
If the PUT is appended to the log, and thread 1 just
didn't see the ack, thread 2 may have observed the PUT, but then it will
not be inserted into the log again on retry
Yes.
so the UPDATE thread 2 does in response won't be clobbered.
Correct, the UPDATE would be after the PUT in the Kafka topic.
For `OutOfOrderSequenceException`: I getting out of my depth on all
details, so take my replies with a grain of salt...
If the producer has x=1, x=2 and x=3 in flight, and x=2 gets a sequence
number error, it's probably because the x=1 write hasn't arrived at the
broker yet, so the producer can just retry it internally without risking
reordering
Exactly (even if it does not risk re-ordering as long as it does not
reset epoch/seq-no -- the broker we just keep rejecting x=2). For this
case, OutOfOrderSequence error tells the producer that x=1 did not make
it, and the producer needs to retry sending x=1 first.
and it's only when sequence errors occur for the first in
flight message where the producer needs to reset the epoch, right?
This only happens if there was an ack for x=1. If there was no ack for
x=1, the producer can retry sending x=1 internally, However, if there
was an ack for x=1, the producer raises `OutOfOrderSequenceException` to
the application, telling the app to not continue "blindly".
If the producer has x=1, x=2 and x=3 in flight, and x=1 gets a delivery
timeout, does that provoke an (escaping from the producer)
OutOfOrderSequenceException for x=2 if the broker never saw x=1?
When you get a delivery timeout you don't know if the data did reach the
broker or not: the timeout could also trigger if the data was send
successfully, but the ack was never received. If the data was never
received, sending x=2 should not trigger a `OutOfOrderSequenceException`
as the producer still has x=1 in the buffer and can keep retrying x=1.
If the producer has x=1, x=2 and x=3 in flight, and x=1 gets a sequence
number error, is that always indicating broker-side data loss? My
understanding is that sequence number gaps should only occur because
there
was a previous message (x=0) which was acked by the broker, and now the
broker doesn't have that message anymore. This provokes an epoch bump in
the producer and also lets the exception escape to the caller, so they
can
decide what to do about it, and are alerted to the data loss.
Yes, that's my understanding.
If the caller decides to keep using the producer instance, why can't the
producer reset the epoch and use the old sequence numbers when assigning
new sequence numbers to the buffered messages?
If the epoch is bumped, the broker expects the first message with the
new epoch to have seq=0.
If the producer knows that
in the old epoch, the order was x=1, x=2, x=3, then couldn't it use that
knowledge after the epoch bump to keep the ordering of those messages the
same?
Well, the producer does not have x=1 in it's buffer any longer --
otherwise, it our-of-sequence-error would not have be given to the
application, and the producer could just retry internally.
Is it fair to only expect to see OutOfOrderSequenceExceptions escape from
the producer if the brokers lose data, or do I need to guard against it
happening even if there is no data loss on the brokers?
Yes, this should be the only root cause.
-Matthias
On 4/17/25 3:54 AM, Stig Rohde Døssing wrote:
Thanks. What I'm looking for isn't that messages can't be interleaved,
it's
something like causal order.
We're updating a database where every node has their own copy of the
database state (similar to RocksDB), by sending changes to that database
via Kafka.
In our application thread 1 is creating records in the database, and
thread
2 is updating fields in existing database records. So thread 1 is doing
something like PUT {id=someKey x=1} and thread 2 is doing UPDATE someKey
WITH {x=2}. The reason I care about ordering is that PUTs will clobber
the
existing record if it's there, so having the PUT replayed after the
UPDATE
will clobber that update.
The guarantee I'm after is that if thread 2 observes a PUT coming through
Kafka, and issues an UPDATE for that record id, the observed PUT won't be
appended to Kafka again later because the producer replayed it.
If I understand you correctly, that's what idempotence guarantees. If the
PUT is not appended to the log, it will be retried, but then thread 2
can't
have observed it. If the PUT is appended to the log, and thread 1 just
didn't see the ack, thread 2 may have observed the PUT, but then it will
not be inserted into the log again on retry, so the UPDATE thread 2 does
in
response won't be clobbered.
Regarding OutOfOrderSequenceException, thanks for the explanation.
I'm not entirely sure I get it. Let me just describe a couple of cases
and
you can correct my understanding:
If the producer has x=1, x=2 and x=3 in flight, and x=2 gets a sequence
number error, it's probably because the x=1 write hasn't arrived at the
broker yet, so the producer can just retry it internally without risking
reordering, and it's only when sequence errors occur for the first in
flight message where the producer needs to reset the epoch, right?
If the producer has x=1, x=2 and x=3 in flight, and x=1 gets a delivery
timeout, does that provoke an (escaping from the producer)
OutOfOrderSequenceException for x=2 if the broker never saw x=1?
If the producer has x=1, x=2 and x=3 in flight, and x=1 gets a sequence
number error, is that always indicating broker-side data loss? My
understanding is that sequence number gaps should only occur because
there
was a previous message (x=0) which was acked by the broker, and now the
broker doesn't have that message anymore. This provokes an epoch bump in
the producer and also lets the exception escape to the caller, so they
can
decide what to do about it, and are alerted to the data loss.
If the caller decides to keep using the producer instance, why can't the
producer reset the epoch and use the old sequence numbers when assigning
new sequence numbers to the buffered messages? If the producer knows that
in the old epoch, the order was x=1, x=2, x=3, then couldn't it use that
knowledge after the epoch bump to keep the ordering of those messages the
same?
Is it fair to only expect to see OutOfOrderSequenceExceptions escape from
the producer if the brokers lose data, or do I need to guard against it
happening even if there is no data loss on the brokers?
Den tors. 17. apr. 2025 kl. 04.02 skrev Matthias J. Sax <
mj...@apache.org>:
The behavior I'm looking for is that if thread 1 sends message x=1,
and
thread 2 sees message x=1 come through Kafka, thread 2 can't then
write
message x=2 and have that be clobbered because thread 1's producer is
retrying the send since it didn't receive the ack in time.
This is not how it works. Idempotency guarantees are for a single
producer only -- if you have two producer, both write into the topic
interleaved, and there is no guarantee about the order of messages
between both producers.
For the first producer two error cases can happen: either x=1 was not
appended to the log at all, and no offset was "assigned" to the message.
For this case, the producer will retry, and if producer-2 writes in
between, the write of x=1 goes to a later offset.
Or, the write was successful, but the ack was lost -- for this case,
there is no reason to block producer-2 either. Of course, producer-1
might retry sending x=1 but the broker would detect this as an
idempotent write, and "drop the write on the floor", just re-sending the
ack to producer-1.
I don't understand why this is needed?
Assume you write x=1, x=2, x=3, and x=1 does not make it an returns
`OutOfOrderSequenceException`. For this case x=2 and x=3 might still be
in the send buffer of the producer. If you don't close the producer, and
just retry sending x=1, it would be written after x=3, so not in the
original order.
This happens, because when `OutOfOrderSequenceException` error happens,
the producer bumps its epoch, and resets sequence number to zero. This
allows the producer to keep sending data as the JavaDocs points out. The
already buffered data in the producer send buffer, would use this new
epoch and reset sequence numbers.
By closing the producer though, you also throw away pending writes for
x=2 and x=3, and would call producer.send() for all three messages
again, and thus can again send in the intended order x=1, x=2, x=3.
Does this answer your questions?
-Matthias
On 4/15/25 3:19 AM, Stig Rohde Døssing wrote:
Hi,
If I understand correctly, the idempotent producer should be able to
guarantee that messages are not duplicated when the producer retries
sends,
and as of https://issues.apache.org/jira/browse/KAFKA-5494, it should
allow
for the producer to have multiple in flight requests at a time without
risking that the messages are reordered because batches are retried out
of
order.
The behavior I'm looking for is that if thread 1 sends message x=1, and
thread 2 sees message x=1 come through Kafka, thread 2 can't then write
message x=2 and have that be clobbered because thread 1's producer is
retrying the send since it didn't receive the ack in time.
The KafkaProducer Javadoc explains that when using the idempotent
producer,
"it is possible to continue sending after receiving an
OutOfOrderSequenceException, but doing so can result in out of order
delivery of pending messages. To ensure proper ordering, you should
close
the producer and create a new instance."
I don't understand why this is needed?
If I understand the feature correctly (please correct me), messages get
sequence numbers, and requests fail if the messages are not received in
order by the broker. Most out of order errors are handled internally by
the
producer, either by completing the request or by retrying. The case it
can't handle is if the first pending message has a gap to the last
acked
sequence number, e.g. if the last acked number is 10 and the first
message
the producer has pending is 12. In that case there has been message
loss,
and the exception escapes from the producer to the caller.
Is this the case the Javadoc note is referring to?
Why is it necessary/helpful to terminate and replace the producer in
order
to guarantee ordering when this kind of gap occurs?