Thanks. For us at least, there is no need to change the behavior, we just wanted to understand the reasoning behind the Javadoc.
Den ons. 23. apr. 2025 kl. 20.36 skrev Matthias J. Sax <mj...@apache.org>: > I guess that make sense. Would need to dig into the code (not super > familiar with this part of the code base), to fully confirm. > > Not sure if it might not be possible to change though, if there is a > demand for it. -- Might also be a question about how complex such a > change would be. > > My read is, that historically (ie, prior to idempotency feature), when a > send failed, the producer would remove the batch right away from the > buffer (to unblock sending consecutive buffers) and raise the error to > the application. This does make sense I guess, as there is no ordering > guarantee for retires anyway (not even for producer internal retries). > > When idempotency was added, the behavior was just kept as-is. > > > -Matthias > > > On 4/21/25 1:27 AM, Stig Rohde Døssing wrote: > > Thanks, this was very useful. > > > >> Well, the producer does not have x=1 in it's buffer any longer -- > >> otherwise, it our-of-sequence-error would not have be given to the > >> application, and the producer could just retry internally. > > > > Okay, so if I understand you, the reason the application has to replace > the > > producer is that x=1 is not reinserted into the buffer when the exception > > is propagated out of the producer. So the need to replace the producer is > > more of an implementation choice than an inherent need: > > > > If the client had x=1, x=2, x=3 in flight, and x=1 received an out of > order > > error from the broker, indicating that some message x=0 was lost, the > > implementation could also have chosen to raise the exception to the > client, > > while also inserting x=1 back into the buffer, resetting the epoch and > > resetting the sequence numbers on the messages based on what their > ordering > > was (i.e. if their current sequence numbers are x=1 s=5, x=2 s=6, x=3 > s=7, > > it would reset them to x=1 s=0, x=2 s=1, x=3 s=2. That way, if the > > application chooses to terminate the producer, it works the same way it > > does now, but if the application chooses to keep the producer running, > the > > remaining messages (x=1 and up) would be sent in the right order. > > > > But the way it works currently is instead that x=1 is "cancelled" when > the > > exception escapes to the application, and reinserting x=1 from outside > the > > producer at that point would obviously cause reordering, so an > application > > that just wants to ignore the loss of x=0 but keep the ordering of the > > remaining messages the same has to replace the producer, and resubmit the > > x=1, x=2 and x=3 send calls in that order. > > > > Changing the behavior to do that now would be a breaking change, and > might > > be awkward at the API level, since getting a callback from the producer > > saying "x=1 was not sent, OutOfSequenceException", and then having the > > producer automatically reinsert it would be unlike how that API usually > > works, and restarting a producer isn't too much of a hassle anyway, but > it > > was helpful to understand why the producer needs to be replaced when this > > happens, so thanks for this. > > > > > > Den søn. 20. apr. 2025 kl. 21.39 skrev Matthias J. Sax <mj...@apache.org > >: > > > >> Thanks for providing more details. > >> > >> It makes sense that you need to ensure that an "insert" comes before an > >> "update". > >> > >> > >>> If I understand you correctly, that's what idempotence guarantees. If > the > >>> PUT is not appended to the log, it will be retried, but then thread 2 > >> can't > >>> have observed it. > >> > >> Yes. > >> > >> > >>> If the PUT is appended to the log, and thread 1 just > >>> didn't see the ack, thread 2 may have observed the PUT, but then it > will > >>> not be inserted into the log again on retry > >> > >> Yes. > >> > >> > >>> so the UPDATE thread 2 does in response won't be clobbered. > >> > >> Correct, the UPDATE would be after the PUT in the Kafka topic. > >> > >> > >> > >> > >> > >> For `OutOfOrderSequenceException`: I getting out of my depth on all > >> details, so take my replies with a grain of salt... > >> > >> > >>> If the producer has x=1, x=2 and x=3 in flight, and x=2 gets a sequence > >>> number error, it's probably because the x=1 write hasn't arrived at the > >>> broker yet, so the producer can just retry it internally without > risking > >>> reordering > >> > >> Exactly (even if it does not risk re-ordering as long as it does not > >> reset epoch/seq-no -- the broker we just keep rejecting x=2). For this > >> case, OutOfOrderSequence error tells the producer that x=1 did not make > >> it, and the producer needs to retry sending x=1 first. > >> > >> > >>> and it's only when sequence errors occur for the first in > >>> flight message where the producer needs to reset the epoch, right? > >> > >> This only happens if there was an ack for x=1. If there was no ack for > >> x=1, the producer can retry sending x=1 internally, However, if there > >> was an ack for x=1, the producer raises `OutOfOrderSequenceException` to > >> the application, telling the app to not continue "blindly". > >> > >> > >>> If the producer has x=1, x=2 and x=3 in flight, and x=1 gets a delivery > >>> timeout, does that provoke an (escaping from the producer) > >>> OutOfOrderSequenceException for x=2 if the broker never saw x=1? > >> > >> When you get a delivery timeout you don't know if the data did reach the > >> broker or not: the timeout could also trigger if the data was send > >> successfully, but the ack was never received. If the data was never > >> received, sending x=2 should not trigger a `OutOfOrderSequenceException` > >> as the producer still has x=1 in the buffer and can keep retrying x=1. > >> > >> > >>> If the producer has x=1, x=2 and x=3 in flight, and x=1 gets a sequence > >>> number error, is that always indicating broker-side data loss? My > >>> understanding is that sequence number gaps should only occur because > >> there > >>> was a previous message (x=0) which was acked by the broker, and now the > >>> broker doesn't have that message anymore. This provokes an epoch bump > in > >>> the producer and also lets the exception escape to the caller, so they > >> can > >>> decide what to do about it, and are alerted to the data loss. > >> > >> Yes, that's my understanding. > >> > >> > >>> If the caller decides to keep using the producer instance, why can't > the > >>> producer reset the epoch and use the old sequence numbers when > assigning > >>> new sequence numbers to the buffered messages? > >> > >> If the epoch is bumped, the broker expects the first message with the > >> new epoch to have seq=0. > >> > >> > >>> If the producer knows that > >>> in the old epoch, the order was x=1, x=2, x=3, then couldn't it use > that > >>> knowledge after the epoch bump to keep the ordering of those messages > the > >>> same? > >> > >> Well, the producer does not have x=1 in it's buffer any longer -- > >> otherwise, it our-of-sequence-error would not have be given to the > >> application, and the producer could just retry internally. > >> > >> > >>> Is it fair to only expect to see OutOfOrderSequenceExceptions escape > from > >>> the producer if the brokers lose data, or do I need to guard against it > >>> happening even if there is no data loss on the brokers? > >> > >> Yes, this should be the only root cause. > >> > >> > >> > >> -Matthias > >> > >> > >> > >> On 4/17/25 3:54 AM, Stig Rohde Døssing wrote: > >>> Thanks. What I'm looking for isn't that messages can't be interleaved, > >> it's > >>> something like causal order. > >>> > >>> We're updating a database where every node has their own copy of the > >>> database state (similar to RocksDB), by sending changes to that > database > >>> via Kafka. > >>> > >>> In our application thread 1 is creating records in the database, and > >> thread > >>> 2 is updating fields in existing database records. So thread 1 is doing > >>> something like PUT {id=someKey x=1} and thread 2 is doing UPDATE > someKey > >>> WITH {x=2}. The reason I care about ordering is that PUTs will clobber > >> the > >>> existing record if it's there, so having the PUT replayed after the > >> UPDATE > >>> will clobber that update. > >>> > >>> The guarantee I'm after is that if thread 2 observes a PUT coming > through > >>> Kafka, and issues an UPDATE for that record id, the observed PUT won't > be > >>> appended to Kafka again later because the producer replayed it. > >>> > >>> If I understand you correctly, that's what idempotence guarantees. If > the > >>> PUT is not appended to the log, it will be retried, but then thread 2 > >> can't > >>> have observed it. If the PUT is appended to the log, and thread 1 just > >>> didn't see the ack, thread 2 may have observed the PUT, but then it > will > >>> not be inserted into the log again on retry, so the UPDATE thread 2 > does > >> in > >>> response won't be clobbered. > >>> > >>> Regarding OutOfOrderSequenceException, thanks for the explanation. > >>> > >>> I'm not entirely sure I get it. Let me just describe a couple of cases > >> and > >>> you can correct my understanding: > >>> > >>> If the producer has x=1, x=2 and x=3 in flight, and x=2 gets a sequence > >>> number error, it's probably because the x=1 write hasn't arrived at the > >>> broker yet, so the producer can just retry it internally without > risking > >>> reordering, and it's only when sequence errors occur for the first in > >>> flight message where the producer needs to reset the epoch, right? > >>> > >>> If the producer has x=1, x=2 and x=3 in flight, and x=1 gets a delivery > >>> timeout, does that provoke an (escaping from the producer) > >>> OutOfOrderSequenceException for x=2 if the broker never saw x=1? > >>> > >>> If the producer has x=1, x=2 and x=3 in flight, and x=1 gets a sequence > >>> number error, is that always indicating broker-side data loss? My > >>> understanding is that sequence number gaps should only occur because > >> there > >>> was a previous message (x=0) which was acked by the broker, and now the > >>> broker doesn't have that message anymore. This provokes an epoch bump > in > >>> the producer and also lets the exception escape to the caller, so they > >> can > >>> decide what to do about it, and are alerted to the data loss. > >>> > >>> If the caller decides to keep using the producer instance, why can't > the > >>> producer reset the epoch and use the old sequence numbers when > assigning > >>> new sequence numbers to the buffered messages? If the producer knows > that > >>> in the old epoch, the order was x=1, x=2, x=3, then couldn't it use > that > >>> knowledge after the epoch bump to keep the ordering of those messages > the > >>> same? > >>> > >>> Is it fair to only expect to see OutOfOrderSequenceExceptions escape > from > >>> the producer if the brokers lose data, or do I need to guard against it > >>> happening even if there is no data loss on the brokers? > >>> > >>> Den tors. 17. apr. 2025 kl. 04.02 skrev Matthias J. Sax < > >> mj...@apache.org>: > >>> > >>>>>> The behavior I'm looking for is that if thread 1 sends message x=1, > >> and > >>>>>> thread 2 sees message x=1 come through Kafka, thread 2 can't then > >> write > >>>>>> message x=2 and have that be clobbered because thread 1's producer > is > >>>>>> retrying the send since it didn't receive the ack in time. > >>>> > >>>> This is not how it works. Idempotency guarantees are for a single > >>>> producer only -- if you have two producer, both write into the topic > >>>> interleaved, and there is no guarantee about the order of messages > >>>> between both producers. > >>>> > >>>> For the first producer two error cases can happen: either x=1 was not > >>>> appended to the log at all, and no offset was "assigned" to the > message. > >>>> For this case, the producer will retry, and if producer-2 writes in > >>>> between, the write of x=1 goes to a later offset. > >>>> > >>>> Or, the write was successful, but the ack was lost -- for this case, > >>>> there is no reason to block producer-2 either. Of course, producer-1 > >>>> might retry sending x=1 but the broker would detect this as an > >>>> idempotent write, and "drop the write on the floor", just re-sending > the > >>>> ack to producer-1. > >>>> > >>>> > >>>> > >>>>> I don't understand why this is needed? > >>>> > >>>> Assume you write x=1, x=2, x=3, and x=1 does not make it an returns > >>>> `OutOfOrderSequenceException`. For this case x=2 and x=3 might still > be > >>>> in the send buffer of the producer. If you don't close the producer, > and > >>>> just retry sending x=1, it would be written after x=3, so not in the > >>>> original order. > >>>> > >>>> This happens, because when `OutOfOrderSequenceException` error > happens, > >>>> the producer bumps its epoch, and resets sequence number to zero. This > >>>> allows the producer to keep sending data as the JavaDocs points out. > The > >>>> already buffered data in the producer send buffer, would use this new > >>>> epoch and reset sequence numbers. > >>>> > >>>> By closing the producer though, you also throw away pending writes for > >>>> x=2 and x=3, and would call producer.send() for all three messages > >>>> again, and thus can again send in the intended order x=1, x=2, x=3. > >>>> > >>>> > >>>> Does this answer your questions? > >>>> > >>>> > >>>> -Matthias > >>>> > >>>> > >>>> > >>>> On 4/15/25 3:19 AM, Stig Rohde Døssing wrote: > >>>>> Hi, > >>>>> > >>>>> If I understand correctly, the idempotent producer should be able to > >>>>> guarantee that messages are not duplicated when the producer retries > >>>> sends, > >>>>> and as of https://issues.apache.org/jira/browse/KAFKA-5494, it > should > >>>> allow > >>>>> for the producer to have multiple in flight requests at a time > without > >>>>> risking that the messages are reordered because batches are retried > out > >>>> of > >>>>> order. > >>>>> > >>>>> The behavior I'm looking for is that if thread 1 sends message x=1, > and > >>>>> thread 2 sees message x=1 come through Kafka, thread 2 can't then > write > >>>>> message x=2 and have that be clobbered because thread 1's producer is > >>>>> retrying the send since it didn't receive the ack in time. > >>>>> > >>>>> The KafkaProducer Javadoc explains that when using the idempotent > >>>> producer, > >>>>> "it is possible to continue sending after receiving an > >>>>> OutOfOrderSequenceException, but doing so can result in out of order > >>>>> delivery of pending messages. To ensure proper ordering, you should > >> close > >>>>> the producer and create a new instance." > >>>>> > >>>>> I don't understand why this is needed? > >>>>> > >>>>> If I understand the feature correctly (please correct me), messages > get > >>>>> sequence numbers, and requests fail if the messages are not received > in > >>>>> order by the broker. Most out of order errors are handled internally > by > >>>> the > >>>>> producer, either by completing the request or by retrying. The case > it > >>>>> can't handle is if the first pending message has a gap to the last > >> acked > >>>>> sequence number, e.g. if the last acked number is 10 and the first > >>>> message > >>>>> the producer has pending is 12. In that case there has been message > >> loss, > >>>>> and the exception escapes from the producer to the caller. > >>>>> > >>>>> Is this the case the Javadoc note is referring to? > >>>>> > >>>>> Why is it necessary/helpful to terminate and replace the producer in > >>>> order > >>>>> to guarantee ordering when this kind of gap occurs? > >>>>> > >>>> > >>>> > >>> > >> > >> > > > >