Hi there -- another update!
When looking into the implementation for the safe epoch bumps I realized
that we are already populating previousProducerID in memory as part of
KIP-360.
If we are to start using flexible fields, it is better to always use this
information and have an explicit (tagged)
Hey there -- small update to the KIP,
The KIP mentioned introducing ABORTABLE_ERROR and bumping TxnOffsetCommit
and Produce requests. I've changed the name in the KIP to
ABORTABLE_TRANSACTION and the corresponding exception
AbortableTransactionException to match the pattern we had for other
I don't think AddPartitions is a good example since we currenly don't gate
the version on TV or MV. (We only set a different flag depending on the TV)
Even if we did want to gate it on TV, I think the idea is to move away from
MV gating inter broker protocols. Ideally we can get to a state where
Hi, Justine,
Thanks for the reply.
Since AddPartitions is an inter broker request, will its version be gated
only by TV or other features like MV too? For example, if we need to change
the protocol for AddPartitions for reasons other than txn verification in
the future, will the new version be
One TV gates the flexible feature version (no rpcs involved, only the
transactional records that should only be gated by TV)
Another TV gates the ability to turn on kip-890 part 2. This would gate the
version of Produce and EndTxn (likely only used by transactions), and
specifies a flag in
Hi, Justine,
Which PRC/record protocols will TV guard? Going forward, will those
PRC/record protocols only be guarded by TV and not by other features like
MV?
Thanks,
Jun
On Mon, Feb 5, 2024 at 2:41 PM Justine Olshan
wrote:
> Hi Jun,
>
> Sorry I think I misunderstood your question or
Hi Jun,
Sorry I think I misunderstood your question or answered incorrectly. The TV
version should ideally be fully independent from MV.
At least for the changes I proposed, TV should not affect MV and MV should
not affect TV/
I think if we downgrade TV, only that feature should downgrade.
Hi, Justine,
Thanks for the reply.
So, if we downgrade TV, we could implicitly downgrade another feature (say
MV) that has dependency (e.g. RPC). What would we return for
FinalizedFeatures for MV in ApiVersionsResponse in that case?
Thanks,
Jun
On Fri, Feb 2, 2024 at 1:06 PM Justine Olshan
Hey Jun,
Yes, the idea is that if we downgrade TV (transaction version) we will stop
using the add partitions to txn optimization and stop writing the flexible
feature version of the log.
In the compatibility section I included some explanations on how this is
done.
Thanks,
Justine
On Fri, Feb
Hi, Justine,
Thanks for the update.
If we ever downgrade the transaction feature, any feature depending on
changes on top of those RPC/record
(AddPartitionsToTxnRequest/TransactionLogValue) changes made in KIP-890
will be automatically downgraded too?
Jun
On Tue, Jan 30, 2024 at 3:32 PM
Hey Jun,
I wanted to get back to you about your questions about MV/IBP.
Looking at the options, I think it makes the most sense to create a
separate feature for transactions and use that to version gate the features
we need to version gate (flexible transactional state records and using the
new
Thanks Jun,
I will update the KIP with the prev field for prepare as well.
PREPARE
producerId: x
previous/lastProducerId (tagged field): x
nextProducerId (tagged field): empty or z if y will overflow
producerEpoch: y + 1
COMPLETE
producerId: x or z if y overflowed
previous/lastProducerId
Hi, Justine,
101.3 Thanks for the explanation.
(1) My point was that the coordinator could fail right after writing the
prepare marker. When the new txn coordinator generates the complete marker
after the failover, it needs some field from the prepare marker to
determine whether it's written by
> Hmm -- we would fence the producer if the epoch is bumped and we get a
lower epoch. Yes -- we are intentionally adding this to prevent fencing.
I think Jun's point is that we can defer the fencing decision until
transition into complete state (which I believe is what the current logic
is
101.3 I realized that I actually have two questions.
> (1) In the non-overflow case, we need to write the previous produce Id
tagged field in the end maker so that we know if the marker is from the new
client. Since the end maker is derived from the prepare marker, should we
write the previous
Hi, Justine,
Thanks for the reply.
101.3 I realized that I actually have two questions.
(1) In the non-overflow case, we need to write the previous produce Id
tagged field in the end maker so that we know if the marker is from the new
client. Since the end maker is derived from the prepare
Hi Jun,
101.3 I can change "last seen" to "current producer id and epoch" if that
was the part that was confusing
110 I can mention this
111 I can do that
112 We still need it. But I am still finalizing the design. I will update
the KIP once I get the information finalized. Sorry for the delays.
Hi, Justine,
Thanks for the reply.
101.3 In the non-overflow case, the previous ID is the same as the produce
ID for the complete marker too, but we set the previous ID in the complete
marker. Earlier you mentioned that this is to know that the marker is
written by the new client so that we
Hey Jun,
101.3 We don't set the previous ID in the Prepare field since we don't need
it. It is the same producer ID as the main producer ID field.
110 Hmm -- maybe I need to reread your message about delayed markers. If we
receive a delayed endTxn marker after the transaction is already
Hi, Justine,
I don't see this create any issue. It just makes it a bit hard to explain
what this non-tagged produce id field means. We are essentially trying to
combine two actions (completing a txn and init a new produce Id) in a
single record. But, this may be fine too.
A few other follow up
Hey Jun,
I'm glad we are getting to convergence on the design. :)
While I understand it seems a little "weird". I'm not sure what the benefit
of writing an extra record to the log.
Is the concern a tool to describe transactions won't work (ie, the complete
state is needed to calculate the time
Hi, Justine,
Thanks for the explanation. I understand the intention now. In the overflow
case, we set the non-tagged field to the old pid (and the max epoch) in the
prepare marker so that we could correctly write the marker to the data
partition if the broker downgrades. When writing the complete
(1) the prepare marker is written, but the endTxn response is not received
by the client when the server downgrades
(2) the prepare marker is written, the endTxn response is received by the
client when the server downgrades.
I think I am still a little confused. In both of these cases, the
Hi, Justine,
Thanks for the reply.
I agree that we don't need to optimize for fencing during downgrades.
Regarding consistency, there are two possible cases: (1) the prepare marker
is written, but the endTxn response is not received by the client when the
server downgrades; (2) the prepare
Hi Jun,
In the case you describe, we would need to have a delayed request, send a
successful EndTxn, and a successful AddPartitionsToTxn and then have the
delayed EndTxn request go through for a given producer.
I'm trying to figure out if it is possible for the client to transition if
a previous
Hi, Justine,
Thanks for the reply.
101.4 "If the marker is written by the new client, we can as I mentioned in
the last email guarantee that any EndTxn requests with the same epoch are
from the same producer and the same transaction. Then we don't have to
return a fenced error but can handle
Hi Jun,
I can update the description.
I believe your second point is mentioned in the KIP. I can add more text on
this if it is helpful.
> The delayed message case can also violate EOS if the delayed message
comes in after the next addPartitionsToTxn request comes in. Effectively we
may see a
Hi, Justine,
Thanks for the reply. Sorry for the delay. I have a few more comments.
110. I think the motivation section could be improved. One of the
motivations listed by the KIP is "This can happen when a message gets stuck
or delayed due to networking issues or a network partition, the
would it build an offset map with just the latest timestamp for a key?
Cannot remember the details without reading the KIP, but yes, something
like this (I believe it actually needs to track both, offset and
timestamp per key).
I wonder if ordering assumptions are baked in there, why not
Hey Matthias,
I have actually never heard of KIP-280 so thanks for bringing it up. That
seems interesting. I wonder how it would work though -- would it build an
offset map with just the latest timestamp for a key? I wonder if ordering
assumptions are baked in there, why not use offset-based
Just a side note about Guozhang comments about timestamps.
If the producer sets the timestamp, putting the record into purgatory
seems not to be an issue (as already said: for this case we don't
guarantee timestamp order between writes of different producers anyway).
However, if the broker
> looks like we already have code to handle bumping the epoch and
when the epoch is Short.MAX_VALUE, we get a new producer ID.
My understanding is that this logic is currently encapsulated in the broker
and the client doesn't really know at which epoch value the new producer id
is generated.
That's a fair point about other clients.
I think the abortable error case is interesting because I'm curious how
other clients would handle this. I assume they would need to implement
handling for the error code unless they did something like "any unknown
error codes/any codes that aren't x,y,z
Thanks Justine for the replies! I agree with most of your thoughts.
Just for 3/7), though I agree for our own AK producer, since we do
"nextRequest(boolean hasIncompleteBatches)", we guarantee the end-txn
would not be sent until we've effectively flushed, but I was referring
to any future bugs or
Hey Guozhang. Thanks for taking a look and for the detailed comments! I'll
do my best to address below.
1. I see what you are saying here, but I think I need to look through the
sequence of events you mention. Typically we've seen this issue in a few
cases.
One is when we have a producer
Hello Justine,
Thanks for the great write-up! I made a quick pass through it and here
are some thoughts (I have not been able to read through this thread so
pardon me if they have overlapped or subsumed by previous comments):
First are some meta ones:
1. I think we need to also improve the
Yeah -- looks like we already have code to handle bumping the epoch and
when the epoch is Short.MAX_VALUE, we get a new producer ID. Since this is
already the behavior, do we want to change it further?
Justine
On Wed, Jan 18, 2023 at 1:12 PM Justine Olshan wrote:
> Hey all, just wanted to
Hey all, just wanted to quickly update and say I've modified the KIP to
explicitly mention that AddOffsetCommitsToTxnRequest will be replaced by a
coordinator-side (inter-broker) AddPartitionsToTxn implicit request. This
mirrors the user partitions and will implicitly add offset partitions to
It's good to know that KIP-588 addressed some of the issues. Looking at
the code, it still looks like there are some cases that would result in
fatal error, e.g. PRODUCER_FENCED is issued by the transaction coordinator
if epoch doesn't match, and the client treats it as a fatal error (code in
Thanks for the discussion Artem.
With respect to the handling of fenced producers, we have some behavior
already in place. As of KIP-588:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-588%3A+Allow+producers+to+recover+gracefully+from+transaction+timeouts,
we handle timeouts more
There are some workflows in the client that are implied by protocol
changes, e.g.:
- for new clients, epoch changes with every transaction and can overflow,
in old clients this condition was handled transparently, because epoch was
bumped in InitProducerId and it would return a new producer id if
Hey all, I've updated the KIP to incorporate Jason's suggestions.
https://cwiki.apache.org/confluence/display/KAFKA/KIP-890%3A+Transactions+Server-Side+Defense
1. Use AddPartitionsToTxn + verify flag to check on old clients
2. Updated AddPartitionsToTxn API to support transaction batching
3.
Thanks Jason. Those changes make sense to me. I will update the KIP.
On Fri, Jan 6, 2023 at 3:31 PM Jason Gustafson
wrote:
> Hey Justine,
>
> > I was wondering about compatibility here. When we send requests
> between brokers, we want to ensure that the receiving broker understands
> the
Hey Justine,
> I was wondering about compatibility here. When we send requests
between brokers, we want to ensure that the receiving broker understands
the request (specifically the new fields). Typically this is done via
IBP/metadata version.
I'm trying to think if there is a way around it but
As a follow up, I was just thinking about the batching a bit more.
I suppose if we have one request in flight and we queue up the other
produce requests in some sort of purgatory, we could send information out
for all of them rather than one by one. So that would be a benefit of
batching
Hey Jason -- thanks for the input -- I was just digging a bit deeper into
the design + implementation of the validation calls here and what you say
makes sense.
I was wondering about compatibility here. When we send requests
between brokers, we want to ensure that the receiving broker understands
Hi Justine,
Thanks for the proposal.
I was thinking about the implementation a little bit. In the current
proposal, the behavior depends on whether we have an old or new client. For
old clients, we send `DescribeTransactions` and verify the result and for
new clients, we send
Hi, Justine,
Thanks for the explanation. It makes sense to me now.
Jun
On Mon, Dec 19, 2022 at 1:42 PM Justine Olshan
wrote:
> Hi Jun,
>
> My understanding of the mechanism is that when we get to the last epoch, we
> increment to the fencing/last epoch and if any further requests come in for
Hi Jun,
My understanding of the mechanism is that when we get to the last epoch, we
increment to the fencing/last epoch and if any further requests come in for
this producer ID they are fenced. Then the producer gets a new ID and
restarts with epoch/sequence 0. The fenced epoch sticks around for
Hi, Justine,
Thanks for the explanation.
70. The proposed fencing logic doesn't apply when pid changes, is that
right? If so, I am not sure how complete we are addressing this issue if
the pid changes more frequently.
Thanks,
Jun
On Fri, Dec 16, 2022 at 9:16 AM Justine Olshan
wrote:
> Hi
Hi Jun,
Thanks for replying!
70.We already do the overflow mechanism, so my change would just make it
happen more often.
I was also not suggesting a new field in the log, but in the response,
which would be gated by the client version. Sorry if something there is
unclear. I think we are starting
Hi, Justine,
Thanks for the reply.
70. Assigning a new pid on int overflow seems a bit hacky. If we need a txn
level id, it will be better to model this explicitly. Adding a new field
would require a bit more work since it requires a new txn marker format in
the log. So, we probably need to
Thanks Matthias, I think we are on the same page.
The concern I had about your solution with the old clients is that we can't
distinguish between a late message and a message intended for the new
transaction on old clients -- basically any late message can turn into case
1.
I chose to rely on
What I mean is the following:
For both scenarios, late message or missing addPartitionToTxnRequest, a
record r is written to partition X, but X is not registered at the
TX-coordinator. Now there are two cases:
(1) A follow up transaction writes more data to the same partition X,
and r
Matthias — thanks again for taking time to look a this. You said:
> My proposal was only focusing to avoid dangling
transactions if records are added without registered partition. -- Maybe
you can add a few more details to the KIP about this scenario for better
documentation purpose?
I'm not
Hi, Justine,
Thanks for the KIP. A couple of comments.
70. Currently, the producer epoch is an int. I am not sure if it's enough
to accommodate all transactions in the lifetime of a producer. Should we
change that to a long or add a new long field like txnId?
71. "it will write the prepare
Thanks for the background.
20/30: SGTM. My proposal was only focusing to avoid dangling
transactions if records are added without registered partition. -- Maybe
you can add a few more details to the KIP about this scenario for better
documentation purpose?
40: I think you hit a fair point
Hi all,
After Artem's questions about error behavior, I've re-evaluated the
unknown producer ID exception and had some discussions offline.
I think generally it makes sense to simplify error handling in cases like
this and the UNKNOWN_PRODUCER_ID error has a pretty long and complicated
history.
Hey Matthias,
20/30 — Maybe I also didn't express myself clearly. For older clients we
don't have a way to distinguish between a previous and the current
transaction since we don't have the epoch bump. This means that a late
message from the previous transaction may be added to the new one. With
Thanks for the details Justine!
20)
The client side change for 2 is removing the addPartitions to transaction
call. We don't need to make this from the producer to the txn coordinator,
only server side.
I think I did not express myself clearly. I understand that we can (and
should) change
Hi Justine,
I think the interesting part is not in this logic (because it tries to
figure out when UNKNOWN_PRODUCER_ID is retriable and if it's retryable,
it's definitely not fatal), but what happens when this logic doesn't return
'true' and falls through. In the old clients it seems to be
Hi Artem and Jeff,
Thanks for taking a look and sorry for the slow response.
You both mentioned the change to handle UNKNOWN_PRODUCER_ID errors. To be
clear — this error code will only be sent again when the client's request
version is high enough to ensure we handle it correctly.
The current
Hi Justine,
Thanks for the KIP! I have two questions:
1) For new clients, we can once again return an error UNKNOWN_PRODUCER_ID
for sequences
that are non-zero when there is no producer state present on the server.
This will indicate we missed the 0 sequence and we don't yet want to write
to the
Hi Justine,
Thank you for the KIP. I have one question.
5) For new clients, we can once again return an error UNKNOWN_PRODUCER_ID
I believe we had problems in the past with returning UNKNOWN_PRODUCER_ID
because it was considered fatal and required client restart. It would be
good to spell out
Thanks for taking a look Matthias. I've tried to answer your questions
below:
10)
Right — so the hanging transaction only occurs when we have a late message
come in and the partition is never added to a transaction again. If we
never add the partition to a transaction, we will never write a
Thanks for the KIP.
Couple of clarification questions (I am not a broker expert do maybe
some question are obvious for others, but not for me with my lack of
broker knowledge).
(10)
The delayed message case can also violate EOS if the delayed message comes in
after the next
Hey all!
I'd like to start a discussion on my proposal to add some server-side
checks on transactions to avoid hanging transactions. I know this has been
an issue for some time, so I really hope this KIP will be helpful for many
users of EOS.
The KIP includes changes that will be compatible with
67 matches
Mail list logo