Re: [DISCUSS] KIP-939: Support Participation in 2PC

2024-03-01 Thread Artem Livshits
Hi Jun,

> 32. ... metric name ...

I've updated the metric name to be
*kafka.coordinator.transaction:type=TransactionStateManager,name=ActiveTransactionOpenTimeMax.*

Let me know if it works.

-Artem



On Thu, Feb 29, 2024 at 12:03 PM Artem Livshits 
wrote:

> Hi Jun,
>
> >  So, it doesn't provide the same guarantees as 2PC either.
>
> I think the key point is that we don't claim 2PC guarantees in that case.
> Maybe it's splitting hairs from the technical perspective (in the end of
> the day if the operator doesn't let the user use 2PC, it's going to be a
> "works until timeout" solution), but from user model perspective it
> provides a clear structure:
>
> - if 2PC is possible then all guarantees are in place and there is no gray
> area where we sort of provide guarantees but not fully
> - if 2PC is not possible, then it's a well-informed constrain / decision
> with well-known characteristics and the user can choose whether this is
> acceptable or not for them
>
> Maybe we can look at it from a slightly different perspective: we are not
> making a choice between allowing or not allowing using keepPrepareTxn=true
> when 2PC=false (even though that's exactly how it looks from the KIP).  In
> fact, we're making a choice is whether Flink will be able to use an
> official API when 2PC is not possible (and I think we've converged to agree
> that sometimes it won't be) or keep using a reflection hack.  In other
> words, we already have a hacky implementation for the case of
> keepPrepareTxn=true + 2PC=false, our choice is only whether we provide an
> official API for that or not.
>
> In general, if someone goes and implements a reflection-based solution
> that's an indication that there is a gap in public APIs.  And we can debate
> whether keepPreparedTxn=true + 2PC=false is the right API or not; and if we
> think it's not, then we should provide an alternative.  Right now the
> alternative is to just keep using the reflection and I think it's always
> worse than using a public API.
>
> -Artem
>
> On Wed, Feb 28, 2024 at 2:23 PM Jun Rao  wrote:
>
>> Hi, Artem,
>>
>> Thanks for the reply.
>>
>> I understand your concern on having a timeout breaking the 2PC guarantees.
>> However, the fallback plan to disable 2PC with an independent
>> keepPreparedTxn is subject to the timeout too. So, it doesn't provide the
>> same guarantees as 2PC either.
>>
>> To me, if we provide a new functionality, we should make it easy such that
>> the application developer only needs to implement it in one way, which is
>> always correct. Then, we can consider what additional things are needed to
>> make the operator comfortable enabling it.
>>
>> Jun
>>
>> On Tue, Feb 27, 2024 at 4:45 PM Artem Livshits
>>  wrote:
>>
>> > Hi Jun,
>> >
>> > Thank you for the discussion.
>> >
>> > > For 3b, it would be useful to understand the reason why an admin
>> doesn't
>> > authorize 2PC for self-hosted Flink
>> >
>> > I think the nuance here is that for cloud, there is a cloud admin
>> > (operator) and there is cluster admin (who, for example could manage
>> acls
>> > on topics or etc.).  The 2PC functionality can affect cloud operations,
>> > because a long running transaction can block the last stable offset and
>> > prevent compaction or data tiering.  In a multi-tenant environment, a
>> long
>> > running transaction that involves consumer offset may affect data that
>> is
>> > shared by multiple tenants (Flink transactions don't use consumer
>> offsets,
>> > so this is not an issue for Flink, but we'd need a separate ACL or some
>> > other way to express this permission if we wanted to go in that
>> direction).
>> >
>> > For that reason, I expect 2PC to be controlled by the cloud operator
>> and it
>> > just may not be scalable for the cloud operator to manage all potential
>> > interactions required to resolve in-doubt transactions (communicate to
>> the
>> > end users, etc.).  In general, we make no assumptions about Kafka
>> > applications -- they may come and go, they may abandon transactional ids
>> > and generate new ones.  For 2PC we need to make sure that the
>> application
>> > is highly available and wouldn't easily abandon an open transaction in
>> > Kafka.
>> >
>> > > If so, another way to address that is to allow the admin to set a
>> timeout
>> > even for the 2PC case.
>> >
>> > This effectively abandons the 2PC guarantee because it creates a case
>> for
>> > Kafka to unilaterally make an automatic decision on a prepared
>> > transaction.  I think it's fundamental for 2PC to abandon this ability
>> and
>> > wait for the external coordinator for the decision, after all the
>> > coordinator may legitimately be unavailable for an arbitrary amount of
>> > time.  Also, we already have a timeout on regular Kafka transactions,
>> > having another "special" timeout could be confusing, and a large enough
>> > timeout could still produce the undesirable effects for the cloud
>> > operations (so we kind of get worst of both options -- we 

Re: [DISCUSS] KIP-939: Support Participation in 2PC

2024-02-29 Thread Artem Livshits
Hi Jun,

>  So, it doesn't provide the same guarantees as 2PC either.

I think the key point is that we don't claim 2PC guarantees in that case.
Maybe it's splitting hairs from the technical perspective (in the end of
the day if the operator doesn't let the user use 2PC, it's going to be a
"works until timeout" solution), but from user model perspective it
provides a clear structure:

- if 2PC is possible then all guarantees are in place and there is no gray
area where we sort of provide guarantees but not fully
- if 2PC is not possible, then it's a well-informed constrain / decision
with well-known characteristics and the user can choose whether this is
acceptable or not for them

Maybe we can look at it from a slightly different perspective: we are not
making a choice between allowing or not allowing using keepPrepareTxn=true
when 2PC=false (even though that's exactly how it looks from the KIP).  In
fact, we're making a choice is whether Flink will be able to use an
official API when 2PC is not possible (and I think we've converged to agree
that sometimes it won't be) or keep using a reflection hack.  In other
words, we already have a hacky implementation for the case of
keepPrepareTxn=true + 2PC=false, our choice is only whether we provide an
official API for that or not.

In general, if someone goes and implements a reflection-based solution
that's an indication that there is a gap in public APIs.  And we can debate
whether keepPreparedTxn=true + 2PC=false is the right API or not; and if we
think it's not, then we should provide an alternative.  Right now the
alternative is to just keep using the reflection and I think it's always
worse than using a public API.

-Artem

On Wed, Feb 28, 2024 at 2:23 PM Jun Rao  wrote:

> Hi, Artem,
>
> Thanks for the reply.
>
> I understand your concern on having a timeout breaking the 2PC guarantees.
> However, the fallback plan to disable 2PC with an independent
> keepPreparedTxn is subject to the timeout too. So, it doesn't provide the
> same guarantees as 2PC either.
>
> To me, if we provide a new functionality, we should make it easy such that
> the application developer only needs to implement it in one way, which is
> always correct. Then, we can consider what additional things are needed to
> make the operator comfortable enabling it.
>
> Jun
>
> On Tue, Feb 27, 2024 at 4:45 PM Artem Livshits
>  wrote:
>
> > Hi Jun,
> >
> > Thank you for the discussion.
> >
> > > For 3b, it would be useful to understand the reason why an admin
> doesn't
> > authorize 2PC for self-hosted Flink
> >
> > I think the nuance here is that for cloud, there is a cloud admin
> > (operator) and there is cluster admin (who, for example could manage acls
> > on topics or etc.).  The 2PC functionality can affect cloud operations,
> > because a long running transaction can block the last stable offset and
> > prevent compaction or data tiering.  In a multi-tenant environment, a
> long
> > running transaction that involves consumer offset may affect data that is
> > shared by multiple tenants (Flink transactions don't use consumer
> offsets,
> > so this is not an issue for Flink, but we'd need a separate ACL or some
> > other way to express this permission if we wanted to go in that
> direction).
> >
> > For that reason, I expect 2PC to be controlled by the cloud operator and
> it
> > just may not be scalable for the cloud operator to manage all potential
> > interactions required to resolve in-doubt transactions (communicate to
> the
> > end users, etc.).  In general, we make no assumptions about Kafka
> > applications -- they may come and go, they may abandon transactional ids
> > and generate new ones.  For 2PC we need to make sure that the application
> > is highly available and wouldn't easily abandon an open transaction in
> > Kafka.
> >
> > > If so, another way to address that is to allow the admin to set a
> timeout
> > even for the 2PC case.
> >
> > This effectively abandons the 2PC guarantee because it creates a case for
> > Kafka to unilaterally make an automatic decision on a prepared
> > transaction.  I think it's fundamental for 2PC to abandon this ability
> and
> > wait for the external coordinator for the decision, after all the
> > coordinator may legitimately be unavailable for an arbitrary amount of
> > time.  Also, we already have a timeout on regular Kafka transactions,
> > having another "special" timeout could be confusing, and a large enough
> > timeout could still produce the undesirable effects for the cloud
> > operations (so we kind of get worst of both options -- we don't provide
> > guarantees and still have impact on operations).
> >
> > -Artem
> >
> > On Fri, Feb 23, 2024 at 8:55 AM Jun Rao 
> wrote:
> >
> > > Hi, Artem,
> > >
> > > Thanks for the reply.
> > >
> > > For 3b, it would be useful to understand the reason why an admin
> doesn't
> > > authorize 2PC for self-hosted Flink. Is the main reason that 2PC has
> > > unbounded timeout that could lead to 

Re: [DISCUSS] KIP-939: Support Participation in 2PC

2024-02-28 Thread Jun Rao
Hi, Artem,

Thanks for the reply.

I understand your concern on having a timeout breaking the 2PC guarantees.
However, the fallback plan to disable 2PC with an independent
keepPreparedTxn is subject to the timeout too. So, it doesn't provide the
same guarantees as 2PC either.

To me, if we provide a new functionality, we should make it easy such that
the application developer only needs to implement it in one way, which is
always correct. Then, we can consider what additional things are needed to
make the operator comfortable enabling it.

Jun

On Tue, Feb 27, 2024 at 4:45 PM Artem Livshits
 wrote:

> Hi Jun,
>
> Thank you for the discussion.
>
> > For 3b, it would be useful to understand the reason why an admin doesn't
> authorize 2PC for self-hosted Flink
>
> I think the nuance here is that for cloud, there is a cloud admin
> (operator) and there is cluster admin (who, for example could manage acls
> on topics or etc.).  The 2PC functionality can affect cloud operations,
> because a long running transaction can block the last stable offset and
> prevent compaction or data tiering.  In a multi-tenant environment, a long
> running transaction that involves consumer offset may affect data that is
> shared by multiple tenants (Flink transactions don't use consumer offsets,
> so this is not an issue for Flink, but we'd need a separate ACL or some
> other way to express this permission if we wanted to go in that direction).
>
> For that reason, I expect 2PC to be controlled by the cloud operator and it
> just may not be scalable for the cloud operator to manage all potential
> interactions required to resolve in-doubt transactions (communicate to the
> end users, etc.).  In general, we make no assumptions about Kafka
> applications -- they may come and go, they may abandon transactional ids
> and generate new ones.  For 2PC we need to make sure that the application
> is highly available and wouldn't easily abandon an open transaction in
> Kafka.
>
> > If so, another way to address that is to allow the admin to set a timeout
> even for the 2PC case.
>
> This effectively abandons the 2PC guarantee because it creates a case for
> Kafka to unilaterally make an automatic decision on a prepared
> transaction.  I think it's fundamental for 2PC to abandon this ability and
> wait for the external coordinator for the decision, after all the
> coordinator may legitimately be unavailable for an arbitrary amount of
> time.  Also, we already have a timeout on regular Kafka transactions,
> having another "special" timeout could be confusing, and a large enough
> timeout could still produce the undesirable effects for the cloud
> operations (so we kind of get worst of both options -- we don't provide
> guarantees and still have impact on operations).
>
> -Artem
>
> On Fri, Feb 23, 2024 at 8:55 AM Jun Rao  wrote:
>
> > Hi, Artem,
> >
> > Thanks for the reply.
> >
> > For 3b, it would be useful to understand the reason why an admin doesn't
> > authorize 2PC for self-hosted Flink. Is the main reason that 2PC has
> > unbounded timeout that could lead to unbounded outstanding transactions?
> If
> > so, another way to address that is to allow the admin to set a timeout
> even
> > for the 2PC case. The timeout would be long enough for behavioring
> > applications to complete 2PC operations, but not too long for
> non-behaving
> > applications' transactions to hang.
> >
> > Jun
> >
> > On Wed, Feb 21, 2024 at 4:34 PM Artem Livshits
> >  wrote:
> >
> > > Hi Jun,
> > >
> > > > 20A. One option is to make the API initTransactions(boolean
> enable2PC).
> > >
> > > We could do that.  I think there is a little bit of symmetry between
> the
> > > client and server that would get lost with this approach (server has
> > > enable2PC as config), but I don't really see a strong reason for
> > enable2PC
> > > to be a config vs. an argument for initTransactions.  But let's see if
> we
> > > find 20B to be a strong consideration for keeping a separate flag for
> > > keepPreparedTxn.
> > >
> > > > 20B. But realistically, we want Flink (and other apps) to have a
> single
> > > implementation
> > >
> > > That's correct and here's what I think can happen if we don't allow
> > > independent keepPreparedTxn:
> > >
> > > 1. Pre-KIP-939 self-hosted Flink vs. any Kafka cluster -- reflection is
> > > used, which effectively implements keepPreparedTxn=true without our
> > > explicit support.
> > > 2. KIP-939 self-hosted Flink vs. pre-KIP-939 Kafka cluster -- we can
> > > either fall back to reflection or we just say we don't support this,
> have
> > > to upgrade Kafka cluster first.
> > > 3. KIP-939 self-hosted Flink vs. KIP-939 Kafka cluster, this becomes
> > > interesting depending on whether the Kafka cluster authorizes 2PC or
> not:
> > >  3a. Kafka cluster autorizes 2PC for self-hosted Flink -- everything
> uses
> > > KIP-939 and there is no problem
> > >  3b. Kafka cluster doesn't authorize 2PC for self-hosted Flink -- we
> can
> > > either fallback to 

Re: [DISCUSS] KIP-939: Support Participation in 2PC

2024-02-28 Thread Andrew Schofield
Hi Artem,
I totally agree that a timeout for the 2PC case is a bad idea. It does abandon
the 2PC guarantee.

Thanks,
Andrew

> On 28 Feb 2024, at 00:44, Artem Livshits  
> wrote:
>
> Hi Jun,
>
> Thank you for the discussion.
>
>> For 3b, it would be useful to understand the reason why an admin doesn't
> authorize 2PC for self-hosted Flink
>
> I think the nuance here is that for cloud, there is a cloud admin
> (operator) and there is cluster admin (who, for example could manage acls
> on topics or etc.).  The 2PC functionality can affect cloud operations,
> because a long running transaction can block the last stable offset and
> prevent compaction or data tiering.  In a multi-tenant environment, a long
> running transaction that involves consumer offset may affect data that is
> shared by multiple tenants (Flink transactions don't use consumer offsets,
> so this is not an issue for Flink, but we'd need a separate ACL or some
> other way to express this permission if we wanted to go in that direction).
>
> For that reason, I expect 2PC to be controlled by the cloud operator and it
> just may not be scalable for the cloud operator to manage all potential
> interactions required to resolve in-doubt transactions (communicate to the
> end users, etc.).  In general, we make no assumptions about Kafka
> applications -- they may come and go, they may abandon transactional ids
> and generate new ones.  For 2PC we need to make sure that the application
> is highly available and wouldn't easily abandon an open transaction in
> Kafka.
>
>> If so, another way to address that is to allow the admin to set a timeout
> even for the 2PC case.
>
> This effectively abandons the 2PC guarantee because it creates a case for
> Kafka to unilaterally make an automatic decision on a prepared
> transaction.  I think it's fundamental for 2PC to abandon this ability and
> wait for the external coordinator for the decision, after all the
> coordinator may legitimately be unavailable for an arbitrary amount of
> time.  Also, we already have a timeout on regular Kafka transactions,
> having another "special" timeout could be confusing, and a large enough
> timeout could still produce the undesirable effects for the cloud
> operations (so we kind of get worst of both options -- we don't provide
> guarantees and still have impact on operations).
>
> -Artem
>
> On Fri, Feb 23, 2024 at 8:55 AM Jun Rao  wrote:
>
>> Hi, Artem,
>>
>> Thanks for the reply.
>>
>> For 3b, it would be useful to understand the reason why an admin doesn't
>> authorize 2PC for self-hosted Flink. Is the main reason that 2PC has
>> unbounded timeout that could lead to unbounded outstanding transactions? If
>> so, another way to address that is to allow the admin to set a timeout even
>> for the 2PC case. The timeout would be long enough for behavioring
>> applications to complete 2PC operations, but not too long for non-behaving
>> applications' transactions to hang.
>>
>> Jun
>>
>> On Wed, Feb 21, 2024 at 4:34 PM Artem Livshits
>>  wrote:
>>
>>> Hi Jun,
>>>
 20A. One option is to make the API initTransactions(boolean enable2PC).
>>>
>>> We could do that.  I think there is a little bit of symmetry between the
>>> client and server that would get lost with this approach (server has
>>> enable2PC as config), but I don't really see a strong reason for
>> enable2PC
>>> to be a config vs. an argument for initTransactions.  But let's see if we
>>> find 20B to be a strong consideration for keeping a separate flag for
>>> keepPreparedTxn.
>>>
 20B. But realistically, we want Flink (and other apps) to have a single
>>> implementation
>>>
>>> That's correct and here's what I think can happen if we don't allow
>>> independent keepPreparedTxn:
>>>
>>> 1. Pre-KIP-939 self-hosted Flink vs. any Kafka cluster -- reflection is
>>> used, which effectively implements keepPreparedTxn=true without our
>>> explicit support.
>>> 2. KIP-939 self-hosted Flink vs. pre-KIP-939 Kafka cluster -- we can
>>> either fall back to reflection or we just say we don't support this, have
>>> to upgrade Kafka cluster first.
>>> 3. KIP-939 self-hosted Flink vs. KIP-939 Kafka cluster, this becomes
>>> interesting depending on whether the Kafka cluster authorizes 2PC or not:
>>> 3a. Kafka cluster autorizes 2PC for self-hosted Flink -- everything uses
>>> KIP-939 and there is no problem
>>> 3b. Kafka cluster doesn't authorize 2PC for self-hosted Flink -- we can
>>> either fallback to reflection or use keepPreparedTxn=true even if 2PC is
>>> not enabled.
>>>
>>> It seems to be ok to not support case 2 (i.e. require Kafka upgrade
>> first),
>>> it shouldn't be an issue for cloud offerings as cloud providers are
>> likely
>>> to upgrade their Kafka to the latest versions.
>>>
>>> The case 3b seems to be important to support, though -- the latest
>> version
>>> of everything should work at least as well (and preferably better) than
>>> previous ones.  It's possible to downgrade to case 1, but it's probably

Re: [DISCUSS] KIP-939: Support Participation in 2PC

2024-02-27 Thread Artem Livshits
Hi Jun,

Thank you for the discussion.

> For 3b, it would be useful to understand the reason why an admin doesn't
authorize 2PC for self-hosted Flink

I think the nuance here is that for cloud, there is a cloud admin
(operator) and there is cluster admin (who, for example could manage acls
on topics or etc.).  The 2PC functionality can affect cloud operations,
because a long running transaction can block the last stable offset and
prevent compaction or data tiering.  In a multi-tenant environment, a long
running transaction that involves consumer offset may affect data that is
shared by multiple tenants (Flink transactions don't use consumer offsets,
so this is not an issue for Flink, but we'd need a separate ACL or some
other way to express this permission if we wanted to go in that direction).

For that reason, I expect 2PC to be controlled by the cloud operator and it
just may not be scalable for the cloud operator to manage all potential
interactions required to resolve in-doubt transactions (communicate to the
end users, etc.).  In general, we make no assumptions about Kafka
applications -- they may come and go, they may abandon transactional ids
and generate new ones.  For 2PC we need to make sure that the application
is highly available and wouldn't easily abandon an open transaction in
Kafka.

> If so, another way to address that is to allow the admin to set a timeout
even for the 2PC case.

This effectively abandons the 2PC guarantee because it creates a case for
Kafka to unilaterally make an automatic decision on a prepared
transaction.  I think it's fundamental for 2PC to abandon this ability and
wait for the external coordinator for the decision, after all the
coordinator may legitimately be unavailable for an arbitrary amount of
time.  Also, we already have a timeout on regular Kafka transactions,
having another "special" timeout could be confusing, and a large enough
timeout could still produce the undesirable effects for the cloud
operations (so we kind of get worst of both options -- we don't provide
guarantees and still have impact on operations).

-Artem

On Fri, Feb 23, 2024 at 8:55 AM Jun Rao  wrote:

> Hi, Artem,
>
> Thanks for the reply.
>
> For 3b, it would be useful to understand the reason why an admin doesn't
> authorize 2PC for self-hosted Flink. Is the main reason that 2PC has
> unbounded timeout that could lead to unbounded outstanding transactions? If
> so, another way to address that is to allow the admin to set a timeout even
> for the 2PC case. The timeout would be long enough for behavioring
> applications to complete 2PC operations, but not too long for non-behaving
> applications' transactions to hang.
>
> Jun
>
> On Wed, Feb 21, 2024 at 4:34 PM Artem Livshits
>  wrote:
>
> > Hi Jun,
> >
> > > 20A. One option is to make the API initTransactions(boolean enable2PC).
> >
> > We could do that.  I think there is a little bit of symmetry between the
> > client and server that would get lost with this approach (server has
> > enable2PC as config), but I don't really see a strong reason for
> enable2PC
> > to be a config vs. an argument for initTransactions.  But let's see if we
> > find 20B to be a strong consideration for keeping a separate flag for
> > keepPreparedTxn.
> >
> > > 20B. But realistically, we want Flink (and other apps) to have a single
> > implementation
> >
> > That's correct and here's what I think can happen if we don't allow
> > independent keepPreparedTxn:
> >
> > 1. Pre-KIP-939 self-hosted Flink vs. any Kafka cluster -- reflection is
> > used, which effectively implements keepPreparedTxn=true without our
> > explicit support.
> > 2. KIP-939 self-hosted Flink vs. pre-KIP-939 Kafka cluster -- we can
> > either fall back to reflection or we just say we don't support this, have
> > to upgrade Kafka cluster first.
> > 3. KIP-939 self-hosted Flink vs. KIP-939 Kafka cluster, this becomes
> > interesting depending on whether the Kafka cluster authorizes 2PC or not:
> >  3a. Kafka cluster autorizes 2PC for self-hosted Flink -- everything uses
> > KIP-939 and there is no problem
> >  3b. Kafka cluster doesn't authorize 2PC for self-hosted Flink -- we can
> > either fallback to reflection or use keepPreparedTxn=true even if 2PC is
> > not enabled.
> >
> > It seems to be ok to not support case 2 (i.e. require Kafka upgrade
> first),
> > it shouldn't be an issue for cloud offerings as cloud providers are
> likely
> > to upgrade their Kafka to the latest versions.
> >
> > The case 3b seems to be important to support, though -- the latest
> version
> > of everything should work at least as well (and preferably better) than
> > previous ones.  It's possible to downgrade to case 1, but it's probably
> not
> > sustainable as newer versions of Flink would also add other features that
> > the customers may want to take advantage of.
> >
> > If we enabled keepPreparedTxn=true even without 2PC, then we could enable
> > case 3b without the need to fall back to reflection, so 

Re: [DISCUSS] KIP-939: Support Participation in 2PC

2024-02-23 Thread Jun Rao
Hi, Artem,

Thanks for the reply.

For 3b, it would be useful to understand the reason why an admin doesn't
authorize 2PC for self-hosted Flink. Is the main reason that 2PC has
unbounded timeout that could lead to unbounded outstanding transactions? If
so, another way to address that is to allow the admin to set a timeout even
for the 2PC case. The timeout would be long enough for behavioring
applications to complete 2PC operations, but not too long for non-behaving
applications' transactions to hang.

Jun

On Wed, Feb 21, 2024 at 4:34 PM Artem Livshits
 wrote:

> Hi Jun,
>
> > 20A. One option is to make the API initTransactions(boolean enable2PC).
>
> We could do that.  I think there is a little bit of symmetry between the
> client and server that would get lost with this approach (server has
> enable2PC as config), but I don't really see a strong reason for enable2PC
> to be a config vs. an argument for initTransactions.  But let's see if we
> find 20B to be a strong consideration for keeping a separate flag for
> keepPreparedTxn.
>
> > 20B. But realistically, we want Flink (and other apps) to have a single
> implementation
>
> That's correct and here's what I think can happen if we don't allow
> independent keepPreparedTxn:
>
> 1. Pre-KIP-939 self-hosted Flink vs. any Kafka cluster -- reflection is
> used, which effectively implements keepPreparedTxn=true without our
> explicit support.
> 2. KIP-939 self-hosted Flink vs. pre-KIP-939 Kafka cluster -- we can
> either fall back to reflection or we just say we don't support this, have
> to upgrade Kafka cluster first.
> 3. KIP-939 self-hosted Flink vs. KIP-939 Kafka cluster, this becomes
> interesting depending on whether the Kafka cluster authorizes 2PC or not:
>  3a. Kafka cluster autorizes 2PC for self-hosted Flink -- everything uses
> KIP-939 and there is no problem
>  3b. Kafka cluster doesn't authorize 2PC for self-hosted Flink -- we can
> either fallback to reflection or use keepPreparedTxn=true even if 2PC is
> not enabled.
>
> It seems to be ok to not support case 2 (i.e. require Kafka upgrade first),
> it shouldn't be an issue for cloud offerings as cloud providers are likely
> to upgrade their Kafka to the latest versions.
>
> The case 3b seems to be important to support, though -- the latest version
> of everything should work at least as well (and preferably better) than
> previous ones.  It's possible to downgrade to case 1, but it's probably not
> sustainable as newer versions of Flink would also add other features that
> the customers may want to take advantage of.
>
> If we enabled keepPreparedTxn=true even without 2PC, then we could enable
> case 3b without the need to fall back to reflection, so we could get rid of
> reflection-based logic and just have a single implementation based on
> KIP-939.
>
> > 32. My suggestion is to change
>
> Let me think about it and I'll come back to this.
>
> -Artem
>
> On Wed, Feb 21, 2024 at 3:40 PM Jun Rao  wrote:
>
> > Hi, Artem,
> >
> > Thanks for the reply.
> >
> > 20A. One option is to make the API initTransactions(boolean enable2PC).
> > Then, it's clear from the code whether 2PC related logic should be added.
> >
> > 20B. But realistically, we want Flink (and other apps) to have a single
> > implementation of the 2PC logic, not two different implementations,
> right?
> >
> > 32. My suggestion is to
> > change
> >
> kafka.server:type=transaction-coordinator-metrics,name=active-transaction-open-time-max
> > to sth like
> > Metric NameType  Group
> > Tags   Description
> > active-transaction-open-time-max   Max   transaction-coordinator-metrics
> >  none  The max time a currently-open transaction has been open
> >
> > Jun
> >
> > On Wed, Feb 21, 2024 at 11:25 AM Artem Livshits
> >  wrote:
> >
> > > Hi Jun,
> > >
> > > > 20A.  This only takes care of the abort case. The application still
> > needs
> > > to be changed to handle the commit case properly
> > >
> > > My point here is that looking at the initTransactions() call it's not
> > clear
> > > what the semantics is.  Say I'm doing code review, I cannot say if the
> > code
> > > is correct or not -- if the config (that's something that's
> > > theoretically not known at the time of code review) is going to enable
> > 2PC,
> > > then the correct code should look one way, otherwise it would need to
> > look
> > > differently.  Also, say if code is written with InitTransaction()
> without
> > > explicit abort and then for whatever reason the code would get used
> with
> > > 2PC enabled (could be a library in a bigger product) it'll start
> breaking
> > > in a non-intuitive way.
> > >
> > > > 20B. Hmm, if the admin disables 2PC, there is likely a reason behind
> > that
> > >
> > > That's true, but reality may be more complicated.  Say a user wants to
> > run
> > > a self-managed Flink with Confluent cloud.  Confluent cloud adim may
> not
> > > be comfortable enabling 2PC to general user accounts that use services
> > not
> > > 

Re: [DISCUSS] KIP-939: Support Participation in 2PC

2024-02-21 Thread Artem Livshits
Hi Jun,

> 20A. One option is to make the API initTransactions(boolean enable2PC).

We could do that.  I think there is a little bit of symmetry between the
client and server that would get lost with this approach (server has
enable2PC as config), but I don't really see a strong reason for enable2PC
to be a config vs. an argument for initTransactions.  But let's see if we
find 20B to be a strong consideration for keeping a separate flag for
keepPreparedTxn.

> 20B. But realistically, we want Flink (and other apps) to have a single
implementation

That's correct and here's what I think can happen if we don't allow
independent keepPreparedTxn:

1. Pre-KIP-939 self-hosted Flink vs. any Kafka cluster -- reflection is
used, which effectively implements keepPreparedTxn=true without our
explicit support.
2. KIP-939 self-hosted Flink vs. pre-KIP-939 Kafka cluster -- we can
either fall back to reflection or we just say we don't support this, have
to upgrade Kafka cluster first.
3. KIP-939 self-hosted Flink vs. KIP-939 Kafka cluster, this becomes
interesting depending on whether the Kafka cluster authorizes 2PC or not:
 3a. Kafka cluster autorizes 2PC for self-hosted Flink -- everything uses
KIP-939 and there is no problem
 3b. Kafka cluster doesn't authorize 2PC for self-hosted Flink -- we can
either fallback to reflection or use keepPreparedTxn=true even if 2PC is
not enabled.

It seems to be ok to not support case 2 (i.e. require Kafka upgrade first),
it shouldn't be an issue for cloud offerings as cloud providers are likely
to upgrade their Kafka to the latest versions.

The case 3b seems to be important to support, though -- the latest version
of everything should work at least as well (and preferably better) than
previous ones.  It's possible to downgrade to case 1, but it's probably not
sustainable as newer versions of Flink would also add other features that
the customers may want to take advantage of.

If we enabled keepPreparedTxn=true even without 2PC, then we could enable
case 3b without the need to fall back to reflection, so we could get rid of
reflection-based logic and just have a single implementation based on
KIP-939.

> 32. My suggestion is to change

Let me think about it and I'll come back to this.

-Artem

On Wed, Feb 21, 2024 at 3:40 PM Jun Rao  wrote:

> Hi, Artem,
>
> Thanks for the reply.
>
> 20A. One option is to make the API initTransactions(boolean enable2PC).
> Then, it's clear from the code whether 2PC related logic should be added.
>
> 20B. But realistically, we want Flink (and other apps) to have a single
> implementation of the 2PC logic, not two different implementations, right?
>
> 32. My suggestion is to
> change
> kafka.server:type=transaction-coordinator-metrics,name=active-transaction-open-time-max
> to sth like
> Metric NameType  Group
> Tags   Description
> active-transaction-open-time-max   Max   transaction-coordinator-metrics
>  none  The max time a currently-open transaction has been open
>
> Jun
>
> On Wed, Feb 21, 2024 at 11:25 AM Artem Livshits
>  wrote:
>
> > Hi Jun,
> >
> > > 20A.  This only takes care of the abort case. The application still
> needs
> > to be changed to handle the commit case properly
> >
> > My point here is that looking at the initTransactions() call it's not
> clear
> > what the semantics is.  Say I'm doing code review, I cannot say if the
> code
> > is correct or not -- if the config (that's something that's
> > theoretically not known at the time of code review) is going to enable
> 2PC,
> > then the correct code should look one way, otherwise it would need to
> look
> > differently.  Also, say if code is written with InitTransaction() without
> > explicit abort and then for whatever reason the code would get used with
> > 2PC enabled (could be a library in a bigger product) it'll start breaking
> > in a non-intuitive way.
> >
> > > 20B. Hmm, if the admin disables 2PC, there is likely a reason behind
> that
> >
> > That's true, but reality may be more complicated.  Say a user wants to
> run
> > a self-managed Flink with Confluent cloud.  Confluent cloud adim may not
> > be comfortable enabling 2PC to general user accounts that use services
> not
> > managed by Confluent (the same way Confluent doesn't allow increasing max
> > transaction timeout for general user accounts).  Right now, self-managed
> > Flink works because it uses reflection, if it moves to use public APIs
> > provided by KIP-939 it'll break.
> >
> > > 32. Ok. That's the kafka metric. In that case, the metric name has a
> > group and a name. There is no type and no package name.
> >
> > Is this a suggestion to change or confirmation that the current logic is
> > ok?  I just copied an existing metric but can change if needed.
> >
> > -Artem
> >
> > On Tue, Feb 20, 2024 at 11:25 AM Jun Rao 
> wrote:
> >
> > > Hi, Artem,
> > >
> > > Thanks for the reply.
> > >
> > > 20. "Say if an application
> > > currently uses initTransactions() to achieve the current 

Re: [DISCUSS] KIP-939: Support Participation in 2PC

2024-02-21 Thread Artem Livshits
Hi Rowland,

> The Open Group DTP model and the XA interface requires that resource
managers be able to report prepared transactions only, so a prepare RPC
will be required.

It's required in the XA protocol, but I'm not sure we have to build it into
a Kafka.

Looks like we just need a catalog of prepared transactions and I wonder if
XA protocol could implement it outside of Kafka transactional state.  As an
example you can take a look at Flink that keeps track of prepared
transactions in its own storage.  I think it would be desirable if all
protocols kept their details outside of Kafka, so that we keep Kafka to be
the most open and protocol agnostic (and most efficient and simple) system.

-Artem

On Mon, Feb 19, 2024 at 12:13 PM Rowland Smith  wrote:

> Hi Artem,
>
> I think that we both have the same understanding. An explicit prepare RPC
> does not eliminate any conditions, it just reduces the window for possible
> undesirable conditions like pending in-doubt transactions. So there is no
> right or wrong answer, a prepare RPC will reduce the number of
> occurrences of in-doubt transactions, but with a performance cost of an
> extra RPC call on every transaction.
>
> The Open Group DTP model and the XA interface requires that resource
> managers be able to report prepared transactions only, so a prepare RPC
> will be required. I will include it in my KIP for XA interface support, and
> will propose an implementation where clients can choose whether they want a
> prepare RPC when not using the XA interface. How does that sound?
>
> - Rowland
>
> On Fri, Feb 16, 2024 at 7:15 PM Artem Livshits
>  wrote:
>
> > Hi Rowland,
> >
> > > I am not sure what you mean by guarantee,
> >
> > A guarantee would be an elimination of complexity or a condition.  E.g.
> if
> > adding an explicit prepare RPC eliminated in-doubt transactions, or
> > eliminated a significant complexity in implementation.
> >
> > > 1. Transactions that haven’t reached “prepared” state can be aborted
> via
> > timeout.
> >
> > The argument is that it doesn't eliminate any conditions, it merely
> reduces
> > a subset of circumstances for the conditions to happen, but the
> conditions
> > still happen and must be handled.  The operator still needs to set up
> > monitoring for run-away transactions, there still needs to be an
> > "out-of-band" channel to resolve run-away transactions (i.e. the
> operation
> > would need a way that's not a part of the 2PC protocol to reconcile with
> > the application owner), there still needs to be tooling for resolving
> > run-away transactions.
> >
> > On the downside, an explicit prepare RPC would have a performance hit on
> > the happy path in every single transaction.
> >
> > -Artem
> >
> > On Tue, Feb 6, 2024 at 7:35 PM Rowland Smith  wrote:
> >
> > > Hi Artem,
> > >
> > > I am not sure what you mean by guarantee, but I am referring to a
> better
> > > operational experience. You mentioned this as the first benefit of an
> > > explicit "prepare" RPC in the KIP.
> > >
> > >
> > > 1. Transactions that haven’t reached “prepared” state can be aborted
> via
> > > timeout.
> > >
> > > However, in explaining why an explicit "prepare" RPC was not included
> in
> > > the design, you make no further mention of this benefit. So what I am
> > > saying is this benefit is quite significant operationally. Many client
> > > application failures may occur before the transaction reaches the
> > prepared
> > > state, and the ability to automatically abort those transactions and
> > > unblock affected partitions without administrative intervention or fast
> > > restart of the client would be a worthwhile benefit. An explicit
> > "prepare"
> > > RPC will also be needed by the XA implementation, so I would like to
> see
> > it
> > > implemented for that reason. Otherwise, I will need to add this work to
> > my
> > > KIP.
> > >
> > > - Rowland
> > >
> > > On Mon, Feb 5, 2024 at 9:35 PM Artem Livshits
> > >  wrote:
> > >
> > > > Hi Rowland,
> > > >
> > > > Thank you for your reply.  I think I understand what you're saying
> and
> > > just
> > > > tried to provide a quick summary.  The
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-939%3A+Support+Participation+in+2PC#KIP939:SupportParticipationin2PC-Explicit%E2%80%9Cprepare%E2%80%9DRPC
> > > > actually goes into the details on what would be the benefits of
> adding
> > an
> > > > explicit prepare RPC and why those won't really add any advantages
> such
> > > as
> > > > elimination the needs for monitoring, tooling or providing additional
> > > > guarantees.  Let me know if you think of a guarantee that prepare RPC
> > > would
> > > > provide.
> > > >
> > > > -Artem
> > > >
> > > > On Mon, Feb 5, 2024 at 6:22 PM Rowland Smith 
> > wrote:
> > > >
> > > > > Hi Artem,
> > > > >
> > > > > I don't think that you understand what I am saying. In any
> > transaction,
> > > > > there is work done before the call to prepareTranscation() and work
> > > done
> 

Re: [DISCUSS] KIP-939: Support Participation in 2PC

2024-02-21 Thread Jun Rao
Hi, Artem,

Thanks for the reply.

20A. One option is to make the API initTransactions(boolean enable2PC).
Then, it's clear from the code whether 2PC related logic should be added.

20B. But realistically, we want Flink (and other apps) to have a single
implementation of the 2PC logic, not two different implementations, right?

32. My suggestion is to
change 
kafka.server:type=transaction-coordinator-metrics,name=active-transaction-open-time-max
to sth like
Metric NameType  Group
Tags   Description
active-transaction-open-time-max   Max   transaction-coordinator-metrics
 none  The max time a currently-open transaction has been open

Jun

On Wed, Feb 21, 2024 at 11:25 AM Artem Livshits
 wrote:

> Hi Jun,
>
> > 20A.  This only takes care of the abort case. The application still needs
> to be changed to handle the commit case properly
>
> My point here is that looking at the initTransactions() call it's not clear
> what the semantics is.  Say I'm doing code review, I cannot say if the code
> is correct or not -- if the config (that's something that's
> theoretically not known at the time of code review) is going to enable 2PC,
> then the correct code should look one way, otherwise it would need to look
> differently.  Also, say if code is written with InitTransaction() without
> explicit abort and then for whatever reason the code would get used with
> 2PC enabled (could be a library in a bigger product) it'll start breaking
> in a non-intuitive way.
>
> > 20B. Hmm, if the admin disables 2PC, there is likely a reason behind that
>
> That's true, but reality may be more complicated.  Say a user wants to run
> a self-managed Flink with Confluent cloud.  Confluent cloud adim may not
> be comfortable enabling 2PC to general user accounts that use services not
> managed by Confluent (the same way Confluent doesn't allow increasing max
> transaction timeout for general user accounts).  Right now, self-managed
> Flink works because it uses reflection, if it moves to use public APIs
> provided by KIP-939 it'll break.
>
> > 32. Ok. That's the kafka metric. In that case, the metric name has a
> group and a name. There is no type and no package name.
>
> Is this a suggestion to change or confirmation that the current logic is
> ok?  I just copied an existing metric but can change if needed.
>
> -Artem
>
> On Tue, Feb 20, 2024 at 11:25 AM Jun Rao  wrote:
>
> > Hi, Artem,
> >
> > Thanks for the reply.
> >
> > 20. "Say if an application
> > currently uses initTransactions() to achieve the current semantics, it
> > would need to be rewritten to use initTransactions() + abort to achieve
> the
> > same semantics if the config is changed. "
> >
> > This only takes care of the abort case. The application still needs to be
> > changed to handle the commit case properly
> > if transaction.two.phase.commit.enable is set to true.
> >
> > "Even when KIP-939 is implemented,
> > there would be situations when 2PC is disabled by the admin (e.g. Kafka
> > service providers may be reluctant to enable 2PC for Flink services that
> > users host themselves), so we either have to perpetuate the
> > reflection-based implementation in Flink or enable keepPreparedTxn=true
> > without 2PC."
> >
> > Hmm, if the admin disables 2PC, there is likely a reason behind that. I
> am
> > not sure that we should provide an API to encourage the application to
> > circumvent that.
> >
> > 32. Ok. That's the kafka metric. In that case, the metric name has a
> group
> > and a name. There is no type and no package name.
> >
> > Jun
> >
> >
> > On Thu, Feb 15, 2024 at 8:23 PM Artem Livshits
> >  wrote:
> >
> > > Hi Jun,
> > >
> > > Thank you for your questions.
> > >
> > > > 20. So to abort a prepared transaction after the producer start, we
> > could
> > > use ...
> > >
> > > I agree, initTransaction(true) + abort would accomplish the behavior of
> > > initTransactions(false), so we could technically have fewer ways to
> > achieve
> > > the same thing, which is generally valuable.  I wonder, though, if that
> > > would be intuitive from the application perspective.  Say if an
> > application
> > > currently uses initTransactions() to achieve the current semantics, it
> > > would need to be rewritten to use initTransactions() + abort to achieve
> > the
> > > same semantics if the config is changed.  I think this could create
> > > subtle confusion, as the config change is generally decoupled from
> > changing
> > > application implementation.
> > >
> > > >  The use case mentioned for keepPreparedTxn=true without 2PC doesn't
> > seem
> > > very important
> > >
> > > I agree, it's not a strict requirement.  It is, however, a missing
> option
> > > in the public API, so currently Flink has to use reflection to emulate
> > this
> > > functionality without 2PC support.   Even when KIP-939 is implemented,
> > > there would be situations when 2PC is disabled by the admin (e.g. Kafka
> > > service providers may be reluctant to enable 2PC for 

Re: [DISCUSS] KIP-939: Support Participation in 2PC

2024-02-21 Thread Artem Livshits
Hi Jun,

> 20A.  This only takes care of the abort case. The application still needs
to be changed to handle the commit case properly

My point here is that looking at the initTransactions() call it's not clear
what the semantics is.  Say I'm doing code review, I cannot say if the code
is correct or not -- if the config (that's something that's
theoretically not known at the time of code review) is going to enable 2PC,
then the correct code should look one way, otherwise it would need to look
differently.  Also, say if code is written with InitTransaction() without
explicit abort and then for whatever reason the code would get used with
2PC enabled (could be a library in a bigger product) it'll start breaking
in a non-intuitive way.

> 20B. Hmm, if the admin disables 2PC, there is likely a reason behind that

That's true, but reality may be more complicated.  Say a user wants to run
a self-managed Flink with Confluent cloud.  Confluent cloud adim may not
be comfortable enabling 2PC to general user accounts that use services not
managed by Confluent (the same way Confluent doesn't allow increasing max
transaction timeout for general user accounts).  Right now, self-managed
Flink works because it uses reflection, if it moves to use public APIs
provided by KIP-939 it'll break.

> 32. Ok. That's the kafka metric. In that case, the metric name has a
group and a name. There is no type and no package name.

Is this a suggestion to change or confirmation that the current logic is
ok?  I just copied an existing metric but can change if needed.

-Artem

On Tue, Feb 20, 2024 at 11:25 AM Jun Rao  wrote:

> Hi, Artem,
>
> Thanks for the reply.
>
> 20. "Say if an application
> currently uses initTransactions() to achieve the current semantics, it
> would need to be rewritten to use initTransactions() + abort to achieve the
> same semantics if the config is changed. "
>
> This only takes care of the abort case. The application still needs to be
> changed to handle the commit case properly
> if transaction.two.phase.commit.enable is set to true.
>
> "Even when KIP-939 is implemented,
> there would be situations when 2PC is disabled by the admin (e.g. Kafka
> service providers may be reluctant to enable 2PC for Flink services that
> users host themselves), so we either have to perpetuate the
> reflection-based implementation in Flink or enable keepPreparedTxn=true
> without 2PC."
>
> Hmm, if the admin disables 2PC, there is likely a reason behind that. I am
> not sure that we should provide an API to encourage the application to
> circumvent that.
>
> 32. Ok. That's the kafka metric. In that case, the metric name has a group
> and a name. There is no type and no package name.
>
> Jun
>
>
> On Thu, Feb 15, 2024 at 8:23 PM Artem Livshits
>  wrote:
>
> > Hi Jun,
> >
> > Thank you for your questions.
> >
> > > 20. So to abort a prepared transaction after the producer start, we
> could
> > use ...
> >
> > I agree, initTransaction(true) + abort would accomplish the behavior of
> > initTransactions(false), so we could technically have fewer ways to
> achieve
> > the same thing, which is generally valuable.  I wonder, though, if that
> > would be intuitive from the application perspective.  Say if an
> application
> > currently uses initTransactions() to achieve the current semantics, it
> > would need to be rewritten to use initTransactions() + abort to achieve
> the
> > same semantics if the config is changed.  I think this could create
> > subtle confusion, as the config change is generally decoupled from
> changing
> > application implementation.
> >
> > >  The use case mentioned for keepPreparedTxn=true without 2PC doesn't
> seem
> > very important
> >
> > I agree, it's not a strict requirement.  It is, however, a missing option
> > in the public API, so currently Flink has to use reflection to emulate
> this
> > functionality without 2PC support.   Even when KIP-939 is implemented,
> > there would be situations when 2PC is disabled by the admin (e.g. Kafka
> > service providers may be reluctant to enable 2PC for Flink services that
> > users host themselves), so we either have to perpetuate the
> > reflection-based implementation in Flink or enable keepPreparedTxn=true
> > without 2PC.
> >
> > > 32.
> >
> >
> kafka.server:type=transaction-coordinator-metrics,name=active-transaction-open-time-max
> >
> > I just followed the existing metric implementation example
> >
> >
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala#L95
> > ,
> > which maps to
> >
> >
> kafka.server:type=transaction-coordinator-metrics,name=partition-load-time-max.
> >
> > > 33. "If the value is 'true' then the corresponding field is set
> >
> > That's correct.  Updated the KIP.
> >
> > -Artem
> >
> > On Wed, Feb 7, 2024 at 10:06 AM Jun Rao 
> wrote:
> >
> > > Hi, Artem,
> > >
> > > Thanks for the reply.
> > >
> > > 20. So to abort a prepared transaction after producer start, we 

Re: [DISCUSS] KIP-939: Support Participation in 2PC

2024-02-20 Thread Jun Rao
Hi, Artem,

Thanks for the reply.

20. "Say if an application
currently uses initTransactions() to achieve the current semantics, it
would need to be rewritten to use initTransactions() + abort to achieve the
same semantics if the config is changed. "

This only takes care of the abort case. The application still needs to be
changed to handle the commit case properly
if transaction.two.phase.commit.enable is set to true.

"Even when KIP-939 is implemented,
there would be situations when 2PC is disabled by the admin (e.g. Kafka
service providers may be reluctant to enable 2PC for Flink services that
users host themselves), so we either have to perpetuate the
reflection-based implementation in Flink or enable keepPreparedTxn=true
without 2PC."

Hmm, if the admin disables 2PC, there is likely a reason behind that. I am
not sure that we should provide an API to encourage the application to
circumvent that.

32. Ok. That's the kafka metric. In that case, the metric name has a group
and a name. There is no type and no package name.

Jun


On Thu, Feb 15, 2024 at 8:23 PM Artem Livshits
 wrote:

> Hi Jun,
>
> Thank you for your questions.
>
> > 20. So to abort a prepared transaction after the producer start, we could
> use ...
>
> I agree, initTransaction(true) + abort would accomplish the behavior of
> initTransactions(false), so we could technically have fewer ways to achieve
> the same thing, which is generally valuable.  I wonder, though, if that
> would be intuitive from the application perspective.  Say if an application
> currently uses initTransactions() to achieve the current semantics, it
> would need to be rewritten to use initTransactions() + abort to achieve the
> same semantics if the config is changed.  I think this could create
> subtle confusion, as the config change is generally decoupled from changing
> application implementation.
>
> >  The use case mentioned for keepPreparedTxn=true without 2PC doesn't seem
> very important
>
> I agree, it's not a strict requirement.  It is, however, a missing option
> in the public API, so currently Flink has to use reflection to emulate this
> functionality without 2PC support.   Even when KIP-939 is implemented,
> there would be situations when 2PC is disabled by the admin (e.g. Kafka
> service providers may be reluctant to enable 2PC for Flink services that
> users host themselves), so we either have to perpetuate the
> reflection-based implementation in Flink or enable keepPreparedTxn=true
> without 2PC.
>
> > 32.
>
> kafka.server:type=transaction-coordinator-metrics,name=active-transaction-open-time-max
>
> I just followed the existing metric implementation example
>
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala#L95
> ,
> which maps to
>
> kafka.server:type=transaction-coordinator-metrics,name=partition-load-time-max.
>
> > 33. "If the value is 'true' then the corresponding field is set
>
> That's correct.  Updated the KIP.
>
> -Artem
>
> On Wed, Feb 7, 2024 at 10:06 AM Jun Rao  wrote:
>
> > Hi, Artem,
> >
> > Thanks for the reply.
> >
> > 20. So to abort a prepared transaction after producer start, we could use
> > either
> >   producer.initTransactions(false)
> > or
> >   producer.initTransactions(true)
> >   producer.abortTransaction
> > Could we just always use the latter API? If we do this, we could
> > potentially eliminate the keepPreparedTxn flag in initTransactions().
> After
> > the initTransactions() call, the outstanding txn is always preserved if
> 2pc
> > is enabled and aborted if 2pc is disabled. The use case mentioned for
> > keepPreparedTxn=true without 2PC doesn't seem very important. If we could
> > do that, it seems that we have (1) less redundant and simpler APIs; (2)
> > more symmetric syntax for aborting/committing a prepared txn after
> producer
> > restart.
> >
> > 32.
> >
> >
> kafka.server:type=transaction-coordinator-metrics,name=active-transaction-open-time-max
> > Is this a Yammer or kafka metric? The former uses the camel case for name
> > and type. The latter uses the hyphen notation, but doesn't have the type
> > attribute.
> >
> > 33. "If the value is 'true' then the corresponding field is set in the
> > InitProducerIdRequest and the KafkaProducer object is set into a state
> > which only allows calling .commitTransaction or .abortTransaction."
> > We should also allow .completeTransaction, right?
> >
> > Jun
> >
> >
> > On Tue, Feb 6, 2024 at 3:29 PM Artem Livshits
> >  wrote:
> >
> > > Hi Jun,
> > >
> > > > 20. For Flink usage, it seems that the APIs used to abort and commit
> a
> > > prepared txn are not symmetric.
> > >
> > > For Flink it is expected that Flink would call .commitTransaction or
> > > .abortTransaction directly, it wouldn't need to deal with
> > PreparedTxnState,
> > > the outcome is actually determined by the Flink's job manager, not by
> > > comparison of PreparedTxnState.  So for Flink, if the Kafka sync
> crashes
> > > 

Re: [DISCUSS] KIP-939: Support Participation in 2PC

2024-02-19 Thread Rowland Smith
Hi Artem,

I think that we both have the same understanding. An explicit prepare RPC
does not eliminate any conditions, it just reduces the window for possible
undesirable conditions like pending in-doubt transactions. So there is no
right or wrong answer, a prepare RPC will reduce the number of
occurrences of in-doubt transactions, but with a performance cost of an
extra RPC call on every transaction.

The Open Group DTP model and the XA interface requires that resource
managers be able to report prepared transactions only, so a prepare RPC
will be required. I will include it in my KIP for XA interface support, and
will propose an implementation where clients can choose whether they want a
prepare RPC when not using the XA interface. How does that sound?

- Rowland

On Fri, Feb 16, 2024 at 7:15 PM Artem Livshits
 wrote:

> Hi Rowland,
>
> > I am not sure what you mean by guarantee,
>
> A guarantee would be an elimination of complexity or a condition.  E.g. if
> adding an explicit prepare RPC eliminated in-doubt transactions, or
> eliminated a significant complexity in implementation.
>
> > 1. Transactions that haven’t reached “prepared” state can be aborted via
> timeout.
>
> The argument is that it doesn't eliminate any conditions, it merely reduces
> a subset of circumstances for the conditions to happen, but the conditions
> still happen and must be handled.  The operator still needs to set up
> monitoring for run-away transactions, there still needs to be an
> "out-of-band" channel to resolve run-away transactions (i.e. the operation
> would need a way that's not a part of the 2PC protocol to reconcile with
> the application owner), there still needs to be tooling for resolving
> run-away transactions.
>
> On the downside, an explicit prepare RPC would have a performance hit on
> the happy path in every single transaction.
>
> -Artem
>
> On Tue, Feb 6, 2024 at 7:35 PM Rowland Smith  wrote:
>
> > Hi Artem,
> >
> > I am not sure what you mean by guarantee, but I am referring to a better
> > operational experience. You mentioned this as the first benefit of an
> > explicit "prepare" RPC in the KIP.
> >
> >
> > 1. Transactions that haven’t reached “prepared” state can be aborted via
> > timeout.
> >
> > However, in explaining why an explicit "prepare" RPC was not included in
> > the design, you make no further mention of this benefit. So what I am
> > saying is this benefit is quite significant operationally. Many client
> > application failures may occur before the transaction reaches the
> prepared
> > state, and the ability to automatically abort those transactions and
> > unblock affected partitions without administrative intervention or fast
> > restart of the client would be a worthwhile benefit. An explicit
> "prepare"
> > RPC will also be needed by the XA implementation, so I would like to see
> it
> > implemented for that reason. Otherwise, I will need to add this work to
> my
> > KIP.
> >
> > - Rowland
> >
> > On Mon, Feb 5, 2024 at 9:35 PM Artem Livshits
> >  wrote:
> >
> > > Hi Rowland,
> > >
> > > Thank you for your reply.  I think I understand what you're saying and
> > just
> > > tried to provide a quick summary.  The
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-939%3A+Support+Participation+in+2PC#KIP939:SupportParticipationin2PC-Explicit%E2%80%9Cprepare%E2%80%9DRPC
> > > actually goes into the details on what would be the benefits of adding
> an
> > > explicit prepare RPC and why those won't really add any advantages such
> > as
> > > elimination the needs for monitoring, tooling or providing additional
> > > guarantees.  Let me know if you think of a guarantee that prepare RPC
> > would
> > > provide.
> > >
> > > -Artem
> > >
> > > On Mon, Feb 5, 2024 at 6:22 PM Rowland Smith 
> wrote:
> > >
> > > > Hi Artem,
> > > >
> > > > I don't think that you understand what I am saying. In any
> transaction,
> > > > there is work done before the call to prepareTranscation() and work
> > done
> > > > afterwards. Any work performed before the call to
> prepareTransaction()
> > > can
> > > > be aborted after a relatively short timeout if the client fails. It
> is
> > > only
> > > > after the prepareTransaction() call that a transaction becomes
> in-doubt
> > > and
> > > > must be remembered for a much longer period of time to allow the
> client
> > > to
> > > > recover and make the decision to either commit or abort. A
> considerable
> > > > amount of time might be spent before prepareTransaction() is called,
> > and
> > > if
> > > > the client fails in this period, relatively quick transaction abort
> > would
> > > > unblock any partitions and make the system fully available. So a
> > prepare
> > > > RPC would reduce the window where a client failure results in
> > potentially
> > > > long-lived blocking transactions.
> > > >
> > > > Here is the proposed sequence from the KIP with 2 added steps (4 and
> > 5):
> > > >
> > > >
> > > >1. Begin database transaction
> > > >  

Re: [DISCUSS] KIP-939: Support Participation in 2PC

2024-02-16 Thread Artem Livshits
Hi Rowland,

> I am not sure what you mean by guarantee,

A guarantee would be an elimination of complexity or a condition.  E.g. if
adding an explicit prepare RPC eliminated in-doubt transactions, or
eliminated a significant complexity in implementation.

> 1. Transactions that haven’t reached “prepared” state can be aborted via
timeout.

The argument is that it doesn't eliminate any conditions, it merely reduces
a subset of circumstances for the conditions to happen, but the conditions
still happen and must be handled.  The operator still needs to set up
monitoring for run-away transactions, there still needs to be an
"out-of-band" channel to resolve run-away transactions (i.e. the operation
would need a way that's not a part of the 2PC protocol to reconcile with
the application owner), there still needs to be tooling for resolving
run-away transactions.

On the downside, an explicit prepare RPC would have a performance hit on
the happy path in every single transaction.

-Artem

On Tue, Feb 6, 2024 at 7:35 PM Rowland Smith  wrote:

> Hi Artem,
>
> I am not sure what you mean by guarantee, but I am referring to a better
> operational experience. You mentioned this as the first benefit of an
> explicit "prepare" RPC in the KIP.
>
>
> 1. Transactions that haven’t reached “prepared” state can be aborted via
> timeout.
>
> However, in explaining why an explicit "prepare" RPC was not included in
> the design, you make no further mention of this benefit. So what I am
> saying is this benefit is quite significant operationally. Many client
> application failures may occur before the transaction reaches the prepared
> state, and the ability to automatically abort those transactions and
> unblock affected partitions without administrative intervention or fast
> restart of the client would be a worthwhile benefit. An explicit "prepare"
> RPC will also be needed by the XA implementation, so I would like to see it
> implemented for that reason. Otherwise, I will need to add this work to my
> KIP.
>
> - Rowland
>
> On Mon, Feb 5, 2024 at 9:35 PM Artem Livshits
>  wrote:
>
> > Hi Rowland,
> >
> > Thank you for your reply.  I think I understand what you're saying and
> just
> > tried to provide a quick summary.  The
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-939%3A+Support+Participation+in+2PC#KIP939:SupportParticipationin2PC-Explicit%E2%80%9Cprepare%E2%80%9DRPC
> > actually goes into the details on what would be the benefits of adding an
> > explicit prepare RPC and why those won't really add any advantages such
> as
> > elimination the needs for monitoring, tooling or providing additional
> > guarantees.  Let me know if you think of a guarantee that prepare RPC
> would
> > provide.
> >
> > -Artem
> >
> > On Mon, Feb 5, 2024 at 6:22 PM Rowland Smith  wrote:
> >
> > > Hi Artem,
> > >
> > > I don't think that you understand what I am saying. In any transaction,
> > > there is work done before the call to prepareTranscation() and work
> done
> > > afterwards. Any work performed before the call to prepareTransaction()
> > can
> > > be aborted after a relatively short timeout if the client fails. It is
> > only
> > > after the prepareTransaction() call that a transaction becomes in-doubt
> > and
> > > must be remembered for a much longer period of time to allow the client
> > to
> > > recover and make the decision to either commit or abort. A considerable
> > > amount of time might be spent before prepareTransaction() is called,
> and
> > if
> > > the client fails in this period, relatively quick transaction abort
> would
> > > unblock any partitions and make the system fully available. So a
> prepare
> > > RPC would reduce the window where a client failure results in
> potentially
> > > long-lived blocking transactions.
> > >
> > > Here is the proposed sequence from the KIP with 2 added steps (4 and
> 5):
> > >
> > >
> > >1. Begin database transaction
> > >2. Begin Kafka transaction
> > >3. Produce data to Kafka
> > >4. Make updates to the database
> > >5. Repeat steps 3 and 4 as many times as necessary based on
> > application
> > >needs.
> > >6. Prepare Kafka transaction [currently implicit operation,
> expressed
> > as
> > >flush]
> > >7. Write produced data to the database
> > >8. Write offsets of produced data to the database
> > >9. Commit database transaction
> > >10. Commit Kafka transaction
> > >
> > >
> > > If the client application crashes before step 6, it is safe to abort
> the
> > > Kafka transaction after a relatively short timeout.
> > >
> > > I fully agree with a layered approach. However, the XA layer is going
> to
> > > require certain capabilities from the layer below it, and one of those
> > > capabilities is to be able to identify and report prepared transactions
> > > during recovery.
> > >
> > > - Rowland
> > >
> > > On Mon, Feb 5, 2024 at 12:46 AM Artem Livshits
> > >  wrote:
> > >
> > > > Hi Rowland,
> > > >
> > > > Thank you 

Re: [DISCUSS] KIP-939: Support Participation in 2PC

2024-02-15 Thread Artem Livshits
Hi Jun,

Thank you for your questions.

> 20. So to abort a prepared transaction after the producer start, we could
use ...

I agree, initTransaction(true) + abort would accomplish the behavior of
initTransactions(false), so we could technically have fewer ways to achieve
the same thing, which is generally valuable.  I wonder, though, if that
would be intuitive from the application perspective.  Say if an application
currently uses initTransactions() to achieve the current semantics, it
would need to be rewritten to use initTransactions() + abort to achieve the
same semantics if the config is changed.  I think this could create
subtle confusion, as the config change is generally decoupled from changing
application implementation.

>  The use case mentioned for keepPreparedTxn=true without 2PC doesn't seem
very important

I agree, it's not a strict requirement.  It is, however, a missing option
in the public API, so currently Flink has to use reflection to emulate this
functionality without 2PC support.   Even when KIP-939 is implemented,
there would be situations when 2PC is disabled by the admin (e.g. Kafka
service providers may be reluctant to enable 2PC for Flink services that
users host themselves), so we either have to perpetuate the
reflection-based implementation in Flink or enable keepPreparedTxn=true
without 2PC.

> 32.
kafka.server:type=transaction-coordinator-metrics,name=active-transaction-open-time-max

I just followed the existing metric implementation example
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala#L95,
which maps to
kafka.server:type=transaction-coordinator-metrics,name=partition-load-time-max.

> 33. "If the value is 'true' then the corresponding field is set

That's correct.  Updated the KIP.

-Artem

On Wed, Feb 7, 2024 at 10:06 AM Jun Rao  wrote:

> Hi, Artem,
>
> Thanks for the reply.
>
> 20. So to abort a prepared transaction after producer start, we could use
> either
>   producer.initTransactions(false)
> or
>   producer.initTransactions(true)
>   producer.abortTransaction
> Could we just always use the latter API? If we do this, we could
> potentially eliminate the keepPreparedTxn flag in initTransactions(). After
> the initTransactions() call, the outstanding txn is always preserved if 2pc
> is enabled and aborted if 2pc is disabled. The use case mentioned for
> keepPreparedTxn=true without 2PC doesn't seem very important. If we could
> do that, it seems that we have (1) less redundant and simpler APIs; (2)
> more symmetric syntax for aborting/committing a prepared txn after producer
> restart.
>
> 32.
>
> kafka.server:type=transaction-coordinator-metrics,name=active-transaction-open-time-max
> Is this a Yammer or kafka metric? The former uses the camel case for name
> and type. The latter uses the hyphen notation, but doesn't have the type
> attribute.
>
> 33. "If the value is 'true' then the corresponding field is set in the
> InitProducerIdRequest and the KafkaProducer object is set into a state
> which only allows calling .commitTransaction or .abortTransaction."
> We should also allow .completeTransaction, right?
>
> Jun
>
>
> On Tue, Feb 6, 2024 at 3:29 PM Artem Livshits
>  wrote:
>
> > Hi Jun,
> >
> > > 20. For Flink usage, it seems that the APIs used to abort and commit a
> > prepared txn are not symmetric.
> >
> > For Flink it is expected that Flink would call .commitTransaction or
> > .abortTransaction directly, it wouldn't need to deal with
> PreparedTxnState,
> > the outcome is actually determined by the Flink's job manager, not by
> > comparison of PreparedTxnState.  So for Flink, if the Kafka sync crashes
> > and restarts there are 2 cases:
> >
> > 1. Transaction is not prepared.  In that case just call
> > producer.initTransactions(false) and then can start transactions as
> needed.
> > 2. Transaction is prepared.  In that case call
> > producer.initTransactions(true) and wait for the decision from the job
> > manager.  Note that it's not given that the transaction will get
> committed,
> > the decision could also be an abort.
> >
> >  > 21. transaction.max.timeout.ms could in theory be MAX_INT. Perhaps we
> > could use a negative timeout in the record to indicate 2PC?
> >
> > -1 sounds good, updated.
> >
> > > 30. The KIP has two different APIs to abort an ongoing txn. Do we need
> > both?
> >
> > I think of producer.initTransactions() to be an implementation for
> > adminClient.forceTerminateTransaction(transactionalId).
> >
> > > 31. "This would flush all the pending messages and transition the
> > producer
> >
> > Updated the KIP to clarify that IllegalStateException will be thrown.
> >
> > -Artem
> >
> >
> > On Mon, Feb 5, 2024 at 2:22 PM Jun Rao  wrote:
> >
> > > Hi, Artem,
> > >
> > > Thanks for the reply.
> > >
> > > 20. For Flink usage, it seems that the APIs used to abort and commit a
> > > prepared txn are not symmetric.
> > > To abort, the app will just call
> > >   

Re: [DISCUSS] KIP-939: Support Participation in 2PC

2024-02-07 Thread Jun Rao
Hi, Artem,

Thanks for the reply.

20. So to abort a prepared transaction after producer start, we could use
either
  producer.initTransactions(false)
or
  producer.initTransactions(true)
  producer.abortTransaction
Could we just always use the latter API? If we do this, we could
potentially eliminate the keepPreparedTxn flag in initTransactions(). After
the initTransactions() call, the outstanding txn is always preserved if 2pc
is enabled and aborted if 2pc is disabled. The use case mentioned for
keepPreparedTxn=true without 2PC doesn't seem very important. If we could
do that, it seems that we have (1) less redundant and simpler APIs; (2)
more symmetric syntax for aborting/committing a prepared txn after producer
restart.

32.
kafka.server:type=transaction-coordinator-metrics,name=active-transaction-open-time-max
Is this a Yammer or kafka metric? The former uses the camel case for name
and type. The latter uses the hyphen notation, but doesn't have the type
attribute.

33. "If the value is 'true' then the corresponding field is set in the
InitProducerIdRequest and the KafkaProducer object is set into a state
which only allows calling .commitTransaction or .abortTransaction."
We should also allow .completeTransaction, right?

Jun


On Tue, Feb 6, 2024 at 3:29 PM Artem Livshits
 wrote:

> Hi Jun,
>
> > 20. For Flink usage, it seems that the APIs used to abort and commit a
> prepared txn are not symmetric.
>
> For Flink it is expected that Flink would call .commitTransaction or
> .abortTransaction directly, it wouldn't need to deal with PreparedTxnState,
> the outcome is actually determined by the Flink's job manager, not by
> comparison of PreparedTxnState.  So for Flink, if the Kafka sync crashes
> and restarts there are 2 cases:
>
> 1. Transaction is not prepared.  In that case just call
> producer.initTransactions(false) and then can start transactions as needed.
> 2. Transaction is prepared.  In that case call
> producer.initTransactions(true) and wait for the decision from the job
> manager.  Note that it's not given that the transaction will get committed,
> the decision could also be an abort.
>
>  > 21. transaction.max.timeout.ms could in theory be MAX_INT. Perhaps we
> could use a negative timeout in the record to indicate 2PC?
>
> -1 sounds good, updated.
>
> > 30. The KIP has two different APIs to abort an ongoing txn. Do we need
> both?
>
> I think of producer.initTransactions() to be an implementation for
> adminClient.forceTerminateTransaction(transactionalId).
>
> > 31. "This would flush all the pending messages and transition the
> producer
>
> Updated the KIP to clarify that IllegalStateException will be thrown.
>
> -Artem
>
>
> On Mon, Feb 5, 2024 at 2:22 PM Jun Rao  wrote:
>
> > Hi, Artem,
> >
> > Thanks for the reply.
> >
> > 20. For Flink usage, it seems that the APIs used to abort and commit a
> > prepared txn are not symmetric.
> > To abort, the app will just call
> >   producer.initTransactions(false)
> >
> > To commit, the app needs to call
> >   producer.initTransactions(true)
> >   producer.completeTransaction(preparedTxnState)
> >
> > Will this be a concern? For the dual-writer usage, both abort/commit use
> > the same API.
> >
> > 21. transaction.max.timeout.ms could in theory be MAX_INT. Perhaps we
> > could
> > use a negative timeout in the record to indicate 2PC?
> >
> > 30. The KIP has two different APIs to abort an ongoing txn. Do we need
> > both?
> >   producer.initTransactions(false)
> >   adminClient.forceTerminateTransaction(transactionalId)
> >
> > 31. "This would flush all the pending messages and transition the
> producer
> > into a mode where only .commitTransaction, .abortTransaction, or
> > .completeTransaction could be called.  If the call is successful (all
> > messages successfully got flushed to all partitions) the transaction is
> > prepared."
> >  If the producer calls send() in that state, what exception will the
> caller
> > receive?
> >
> > Jun
> >
> >
> > On Fri, Feb 2, 2024 at 3:34 PM Artem Livshits
> >  wrote:
> >
> > > Hi Jun,
> > >
> > > >  Then, should we change the following in the example to use
> > > InitProducerId(true) instead?
> > >
> > > We could. I just thought that it's good to make the example
> > self-contained
> > > by starting from a clean state.
> > >
> > > > Also, could Flink just follow the dual-write recipe?
> > >
> > > I think it would bring some unnecessary logic to Flink (or any other
> > system
> > > that already has a transaction coordinator and just wants to drive
> Kafka
> > to
> > > the desired state).  We could discuss it with Flink folks, the current
> > > proposal was developed in collaboration with them.
> > >
> > > > 21. Could a non 2pc user explicitly set the TransactionTimeoutMs to
> > > Integer.MAX_VALUE?
> > >
> > > The server would reject this for regular transactions, it only accepts
> > > values that are <= *transaction.max.timeout.ms
> > >  *(a broker config).
> > >
> > > > 

Re: [DISCUSS] KIP-939: Support Participation in 2PC

2024-02-06 Thread Rowland Smith
Hi Artem,

I am not sure what you mean by guarantee, but I am referring to a better
operational experience. You mentioned this as the first benefit of an
explicit "prepare" RPC in the KIP.


1. Transactions that haven’t reached “prepared” state can be aborted via
timeout.

However, in explaining why an explicit "prepare" RPC was not included in
the design, you make no further mention of this benefit. So what I am
saying is this benefit is quite significant operationally. Many client
application failures may occur before the transaction reaches the prepared
state, and the ability to automatically abort those transactions and
unblock affected partitions without administrative intervention or fast
restart of the client would be a worthwhile benefit. An explicit "prepare"
RPC will also be needed by the XA implementation, so I would like to see it
implemented for that reason. Otherwise, I will need to add this work to my
KIP.

- Rowland

On Mon, Feb 5, 2024 at 9:35 PM Artem Livshits
 wrote:

> Hi Rowland,
>
> Thank you for your reply.  I think I understand what you're saying and just
> tried to provide a quick summary.  The
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-939%3A+Support+Participation+in+2PC#KIP939:SupportParticipationin2PC-Explicit%E2%80%9Cprepare%E2%80%9DRPC
> actually goes into the details on what would be the benefits of adding an
> explicit prepare RPC and why those won't really add any advantages such as
> elimination the needs for monitoring, tooling or providing additional
> guarantees.  Let me know if you think of a guarantee that prepare RPC would
> provide.
>
> -Artem
>
> On Mon, Feb 5, 2024 at 6:22 PM Rowland Smith  wrote:
>
> > Hi Artem,
> >
> > I don't think that you understand what I am saying. In any transaction,
> > there is work done before the call to prepareTranscation() and work done
> > afterwards. Any work performed before the call to prepareTransaction()
> can
> > be aborted after a relatively short timeout if the client fails. It is
> only
> > after the prepareTransaction() call that a transaction becomes in-doubt
> and
> > must be remembered for a much longer period of time to allow the client
> to
> > recover and make the decision to either commit or abort. A considerable
> > amount of time might be spent before prepareTransaction() is called, and
> if
> > the client fails in this period, relatively quick transaction abort would
> > unblock any partitions and make the system fully available. So a prepare
> > RPC would reduce the window where a client failure results in potentially
> > long-lived blocking transactions.
> >
> > Here is the proposed sequence from the KIP with 2 added steps (4 and 5):
> >
> >
> >1. Begin database transaction
> >2. Begin Kafka transaction
> >3. Produce data to Kafka
> >4. Make updates to the database
> >5. Repeat steps 3 and 4 as many times as necessary based on
> application
> >needs.
> >6. Prepare Kafka transaction [currently implicit operation, expressed
> as
> >flush]
> >7. Write produced data to the database
> >8. Write offsets of produced data to the database
> >9. Commit database transaction
> >10. Commit Kafka transaction
> >
> >
> > If the client application crashes before step 6, it is safe to abort the
> > Kafka transaction after a relatively short timeout.
> >
> > I fully agree with a layered approach. However, the XA layer is going to
> > require certain capabilities from the layer below it, and one of those
> > capabilities is to be able to identify and report prepared transactions
> > during recovery.
> >
> > - Rowland
> >
> > On Mon, Feb 5, 2024 at 12:46 AM Artem Livshits
> >  wrote:
> >
> > > Hi Rowland,
> > >
> > > Thank you for your feedback.  Using an explicit prepare RPC was
> discussed
> > > and is listed in the rejected alternatives:
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-939%3A+Support+Participation+in+2PC#KIP939:SupportParticipationin2PC-Explicit%E2%80%9Cprepare%E2%80%9DRPC
> > > .
> > > Basically, even if we had an explicit prepare RPC, it doesn't avoid the
> > > fact that a crashed client could cause a blocking transaction.  This
> is,
> > > btw, is not just a specific property of this concrete proposal, it's a
> > > fundamental trade off of any form of 2PC -- any 2PC implementation must
> > > allow for infinitely "in-doubt" transactions that may not be
> unilaterally
> > > automatically resolved within one participant.
> > >
> > > To mitigate the issue, using 2PC requires a special permission, so that
> > the
> > > Kafka admin could control that only applications that follow proper
> > > standards in terms of availability (i.e. will automatically restart and
> > > cleanup after a crash) would be allowed to utilize 2PC.  It is also
> > assumed
> > > that any practical deployment utilizing 2PC would have monitoring set
> up,
> > > so that an operator could be alerted to investigate and manually
> resolve
> > > in-doubt 

Re: [DISCUSS] KIP-939: Support Participation in 2PC

2024-02-06 Thread Artem Livshits
Hi Jun,

> 20. For Flink usage, it seems that the APIs used to abort and commit a
prepared txn are not symmetric.

For Flink it is expected that Flink would call .commitTransaction or
.abortTransaction directly, it wouldn't need to deal with PreparedTxnState,
the outcome is actually determined by the Flink's job manager, not by
comparison of PreparedTxnState.  So for Flink, if the Kafka sync crashes
and restarts there are 2 cases:

1. Transaction is not prepared.  In that case just call
producer.initTransactions(false) and then can start transactions as needed.
2. Transaction is prepared.  In that case call
producer.initTransactions(true) and wait for the decision from the job
manager.  Note that it's not given that the transaction will get committed,
the decision could also be an abort.

 > 21. transaction.max.timeout.ms could in theory be MAX_INT. Perhaps we
could use a negative timeout in the record to indicate 2PC?

-1 sounds good, updated.

> 30. The KIP has two different APIs to abort an ongoing txn. Do we need
both?

I think of producer.initTransactions() to be an implementation for
adminClient.forceTerminateTransaction(transactionalId).

> 31. "This would flush all the pending messages and transition the producer

Updated the KIP to clarify that IllegalStateException will be thrown.

-Artem


On Mon, Feb 5, 2024 at 2:22 PM Jun Rao  wrote:

> Hi, Artem,
>
> Thanks for the reply.
>
> 20. For Flink usage, it seems that the APIs used to abort and commit a
> prepared txn are not symmetric.
> To abort, the app will just call
>   producer.initTransactions(false)
>
> To commit, the app needs to call
>   producer.initTransactions(true)
>   producer.completeTransaction(preparedTxnState)
>
> Will this be a concern? For the dual-writer usage, both abort/commit use
> the same API.
>
> 21. transaction.max.timeout.ms could in theory be MAX_INT. Perhaps we
> could
> use a negative timeout in the record to indicate 2PC?
>
> 30. The KIP has two different APIs to abort an ongoing txn. Do we need
> both?
>   producer.initTransactions(false)
>   adminClient.forceTerminateTransaction(transactionalId)
>
> 31. "This would flush all the pending messages and transition the producer
> into a mode where only .commitTransaction, .abortTransaction, or
> .completeTransaction could be called.  If the call is successful (all
> messages successfully got flushed to all partitions) the transaction is
> prepared."
>  If the producer calls send() in that state, what exception will the caller
> receive?
>
> Jun
>
>
> On Fri, Feb 2, 2024 at 3:34 PM Artem Livshits
>  wrote:
>
> > Hi Jun,
> >
> > >  Then, should we change the following in the example to use
> > InitProducerId(true) instead?
> >
> > We could. I just thought that it's good to make the example
> self-contained
> > by starting from a clean state.
> >
> > > Also, could Flink just follow the dual-write recipe?
> >
> > I think it would bring some unnecessary logic to Flink (or any other
> system
> > that already has a transaction coordinator and just wants to drive Kafka
> to
> > the desired state).  We could discuss it with Flink folks, the current
> > proposal was developed in collaboration with them.
> >
> > > 21. Could a non 2pc user explicitly set the TransactionTimeoutMs to
> > Integer.MAX_VALUE?
> >
> > The server would reject this for regular transactions, it only accepts
> > values that are <= *transaction.max.timeout.ms
> >  *(a broker config).
> >
> > > 24. Hmm, In KIP-890, without 2pc, the coordinator expects the endTxn
> > request to use the ongoing pid. ...
> >
> > Without 2PC there is no case where the pid could change between starting
> a
> > transaction and endTxn (InitProducerId would abort any ongoing
> > transaction).  WIth 2PC there is now a case where there could be
> > InitProducerId that can change the pid without aborting the transaction,
> so
> > we need to handle that.  I wouldn't say that the flow is different, but
> > it's rather extended to handle new cases.  The main principle is still
> the
> > same -- for all operations we use the latest "operational" pid and epoch
> > known to the client, this way we guarantee that we can fence zombie /
> split
> > brain clients by disrupting the "latest known" pid + epoch progression.
> >
> > > 25. "We send out markers using the original ongoing transaction
> > ProducerId and ProducerEpoch" ...
> >
> > Updated.
> >
> > -Artem
> >
> > On Mon, Jan 29, 2024 at 4:57 PM Jun Rao 
> wrote:
> >
> > > Hi, Artem,
> > >
> > > Thanks for the reply.
> > >
> > > 20. So for the dual-write recipe, we should always call
> > > InitProducerId(keepPreparedTxn=true) from the producer? Then, should we
> > > change the following in the example to use InitProducerId(true)
> instead?
> > > 1. InitProducerId(false); TC STATE: Empty, ProducerId=42,
> > > ProducerEpoch=MAX-1, PrevProducerId=-1, NextProducerId=-1,
> > > NextProducerEpoch=-1; RESPONSE ProducerId=42, Epoch=MAX-1,
> > > 

Re: [DISCUSS] KIP-939: Support Participation in 2PC

2024-02-05 Thread Artem Livshits
Hi Rowland,

Thank you for your reply.  I think I understand what you're saying and just
tried to provide a quick summary.  The
https://cwiki.apache.org/confluence/display/KAFKA/KIP-939%3A+Support+Participation+in+2PC#KIP939:SupportParticipationin2PC-Explicit%E2%80%9Cprepare%E2%80%9DRPC
actually goes into the details on what would be the benefits of adding an
explicit prepare RPC and why those won't really add any advantages such as
elimination the needs for monitoring, tooling or providing additional
guarantees.  Let me know if you think of a guarantee that prepare RPC would
provide.

-Artem

On Mon, Feb 5, 2024 at 6:22 PM Rowland Smith  wrote:

> Hi Artem,
>
> I don't think that you understand what I am saying. In any transaction,
> there is work done before the call to prepareTranscation() and work done
> afterwards. Any work performed before the call to prepareTransaction() can
> be aborted after a relatively short timeout if the client fails. It is only
> after the prepareTransaction() call that a transaction becomes in-doubt and
> must be remembered for a much longer period of time to allow the client to
> recover and make the decision to either commit or abort. A considerable
> amount of time might be spent before prepareTransaction() is called, and if
> the client fails in this period, relatively quick transaction abort would
> unblock any partitions and make the system fully available. So a prepare
> RPC would reduce the window where a client failure results in potentially
> long-lived blocking transactions.
>
> Here is the proposed sequence from the KIP with 2 added steps (4 and 5):
>
>
>1. Begin database transaction
>2. Begin Kafka transaction
>3. Produce data to Kafka
>4. Make updates to the database
>5. Repeat steps 3 and 4 as many times as necessary based on application
>needs.
>6. Prepare Kafka transaction [currently implicit operation, expressed as
>flush]
>7. Write produced data to the database
>8. Write offsets of produced data to the database
>9. Commit database transaction
>10. Commit Kafka transaction
>
>
> If the client application crashes before step 6, it is safe to abort the
> Kafka transaction after a relatively short timeout.
>
> I fully agree with a layered approach. However, the XA layer is going to
> require certain capabilities from the layer below it, and one of those
> capabilities is to be able to identify and report prepared transactions
> during recovery.
>
> - Rowland
>
> On Mon, Feb 5, 2024 at 12:46 AM Artem Livshits
>  wrote:
>
> > Hi Rowland,
> >
> > Thank you for your feedback.  Using an explicit prepare RPC was discussed
> > and is listed in the rejected alternatives:
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-939%3A+Support+Participation+in+2PC#KIP939:SupportParticipationin2PC-Explicit%E2%80%9Cprepare%E2%80%9DRPC
> > .
> > Basically, even if we had an explicit prepare RPC, it doesn't avoid the
> > fact that a crashed client could cause a blocking transaction.  This is,
> > btw, is not just a specific property of this concrete proposal, it's a
> > fundamental trade off of any form of 2PC -- any 2PC implementation must
> > allow for infinitely "in-doubt" transactions that may not be unilaterally
> > automatically resolved within one participant.
> >
> > To mitigate the issue, using 2PC requires a special permission, so that
> the
> > Kafka admin could control that only applications that follow proper
> > standards in terms of availability (i.e. will automatically restart and
> > cleanup after a crash) would be allowed to utilize 2PC.  It is also
> assumed
> > that any practical deployment utilizing 2PC would have monitoring set up,
> > so that an operator could be alerted to investigate and manually resolve
> > in-doubt transactions (the metric and tooling support for doing so are
> also
> > described in the KIP).
> >
> > For XA support, I wonder if we could take a layered approach and store XA
> > information in a separate store, say in a compacted topic.  This way, the
> > core Kafka protocol could be decoupled from specific implementations (and
> > extra performance requirements that a specific implementation may impose)
> > and serve as a foundation for multiple implementations.
> >
> > -Artem
> >
> > On Sun, Feb 4, 2024 at 1:37 PM Rowland Smith  wrote:
> >
> > > Hi Artem,
> > >
> > > It has been a while, but I have gotten back to this. I understand that
> > when
> > > 2PC is used, the transaction timeout will be effectively infinite. I
> > don't
> > > think that this behavior is desirable. A long running transaction can
> be
> > > extremely disruptive since it blocks consumers on any partitions
> written
> > to
> > > within the pending transaction. The primary reason for a long running
> > > transaction is a failure of the client, or the network connecting the
> > > client to the broker. If such a failure occurs before the client calls
> > > the new prepareTransaction() method, it should be 

Re: [DISCUSS] KIP-939: Support Participation in 2PC

2024-02-05 Thread Rowland Smith
Hi Artem,

I don't think that you understand what I am saying. In any transaction,
there is work done before the call to prepareTranscation() and work done
afterwards. Any work performed before the call to prepareTransaction() can
be aborted after a relatively short timeout if the client fails. It is only
after the prepareTransaction() call that a transaction becomes in-doubt and
must be remembered for a much longer period of time to allow the client to
recover and make the decision to either commit or abort. A considerable
amount of time might be spent before prepareTransaction() is called, and if
the client fails in this period, relatively quick transaction abort would
unblock any partitions and make the system fully available. So a prepare
RPC would reduce the window where a client failure results in potentially
long-lived blocking transactions.

Here is the proposed sequence from the KIP with 2 added steps (4 and 5):


   1. Begin database transaction
   2. Begin Kafka transaction
   3. Produce data to Kafka
   4. Make updates to the database
   5. Repeat steps 3 and 4 as many times as necessary based on application
   needs.
   6. Prepare Kafka transaction [currently implicit operation, expressed as
   flush]
   7. Write produced data to the database
   8. Write offsets of produced data to the database
   9. Commit database transaction
   10. Commit Kafka transaction


If the client application crashes before step 6, it is safe to abort the
Kafka transaction after a relatively short timeout.

I fully agree with a layered approach. However, the XA layer is going to
require certain capabilities from the layer below it, and one of those
capabilities is to be able to identify and report prepared transactions
during recovery.

- Rowland

On Mon, Feb 5, 2024 at 12:46 AM Artem Livshits
 wrote:

> Hi Rowland,
>
> Thank you for your feedback.  Using an explicit prepare RPC was discussed
> and is listed in the rejected alternatives:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-939%3A+Support+Participation+in+2PC#KIP939:SupportParticipationin2PC-Explicit%E2%80%9Cprepare%E2%80%9DRPC
> .
> Basically, even if we had an explicit prepare RPC, it doesn't avoid the
> fact that a crashed client could cause a blocking transaction.  This is,
> btw, is not just a specific property of this concrete proposal, it's a
> fundamental trade off of any form of 2PC -- any 2PC implementation must
> allow for infinitely "in-doubt" transactions that may not be unilaterally
> automatically resolved within one participant.
>
> To mitigate the issue, using 2PC requires a special permission, so that the
> Kafka admin could control that only applications that follow proper
> standards in terms of availability (i.e. will automatically restart and
> cleanup after a crash) would be allowed to utilize 2PC.  It is also assumed
> that any practical deployment utilizing 2PC would have monitoring set up,
> so that an operator could be alerted to investigate and manually resolve
> in-doubt transactions (the metric and tooling support for doing so are also
> described in the KIP).
>
> For XA support, I wonder if we could take a layered approach and store XA
> information in a separate store, say in a compacted topic.  This way, the
> core Kafka protocol could be decoupled from specific implementations (and
> extra performance requirements that a specific implementation may impose)
> and serve as a foundation for multiple implementations.
>
> -Artem
>
> On Sun, Feb 4, 2024 at 1:37 PM Rowland Smith  wrote:
>
> > Hi Artem,
> >
> > It has been a while, but I have gotten back to this. I understand that
> when
> > 2PC is used, the transaction timeout will be effectively infinite. I
> don't
> > think that this behavior is desirable. A long running transaction can be
> > extremely disruptive since it blocks consumers on any partitions written
> to
> > within the pending transaction. The primary reason for a long running
> > transaction is a failure of the client, or the network connecting the
> > client to the broker. If such a failure occurs before the client calls
> > the new prepareTransaction() method, it should be OK to abort the
> > transaction after a relatively short timeout period. This approach would
> > minimize the inconvenience and disruption of a long running transaction
> > blocking consumers, and provide higher availability for a system using
> > Kafka.
> >
> > In order to achieve this behavior, I think we would need a 'prepare' RPC
> > call so that the server knows that a transaction has been prepared, and
> > does not timeout and abort such transactions. There will be some cost to
> > this extra RPC call, but there will also be a benefit of better system
> > availability in case of failures.
> >
> > There is another reason why I would prefer this implementation. I am
> > working on an XA KIP, and XA requires that Kafka brokers be able to
> provide
> > a list of prepared transactions during recovery.  The broker can only
> 

Re: [DISCUSS] KIP-939: Support Participation in 2PC

2024-02-05 Thread Jun Rao
Hi, Artem,

Thanks for the reply.

20. For Flink usage, it seems that the APIs used to abort and commit a
prepared txn are not symmetric.
To abort, the app will just call
  producer.initTransactions(false)

To commit, the app needs to call
  producer.initTransactions(true)
  producer.completeTransaction(preparedTxnState)

Will this be a concern? For the dual-writer usage, both abort/commit use
the same API.

21. transaction.max.timeout.ms could in theory be MAX_INT. Perhaps we could
use a negative timeout in the record to indicate 2PC?

30. The KIP has two different APIs to abort an ongoing txn. Do we need both?
  producer.initTransactions(false)
  adminClient.forceTerminateTransaction(transactionalId)

31. "This would flush all the pending messages and transition the producer
into a mode where only .commitTransaction, .abortTransaction, or
.completeTransaction could be called.  If the call is successful (all
messages successfully got flushed to all partitions) the transaction is
prepared."
 If the producer calls send() in that state, what exception will the caller
receive?

Jun


On Fri, Feb 2, 2024 at 3:34 PM Artem Livshits
 wrote:

> Hi Jun,
>
> >  Then, should we change the following in the example to use
> InitProducerId(true) instead?
>
> We could. I just thought that it's good to make the example self-contained
> by starting from a clean state.
>
> > Also, could Flink just follow the dual-write recipe?
>
> I think it would bring some unnecessary logic to Flink (or any other system
> that already has a transaction coordinator and just wants to drive Kafka to
> the desired state).  We could discuss it with Flink folks, the current
> proposal was developed in collaboration with them.
>
> > 21. Could a non 2pc user explicitly set the TransactionTimeoutMs to
> Integer.MAX_VALUE?
>
> The server would reject this for regular transactions, it only accepts
> values that are <= *transaction.max.timeout.ms
>  *(a broker config).
>
> > 24. Hmm, In KIP-890, without 2pc, the coordinator expects the endTxn
> request to use the ongoing pid. ...
>
> Without 2PC there is no case where the pid could change between starting a
> transaction and endTxn (InitProducerId would abort any ongoing
> transaction).  WIth 2PC there is now a case where there could be
> InitProducerId that can change the pid without aborting the transaction, so
> we need to handle that.  I wouldn't say that the flow is different, but
> it's rather extended to handle new cases.  The main principle is still the
> same -- for all operations we use the latest "operational" pid and epoch
> known to the client, this way we guarantee that we can fence zombie / split
> brain clients by disrupting the "latest known" pid + epoch progression.
>
> > 25. "We send out markers using the original ongoing transaction
> ProducerId and ProducerEpoch" ...
>
> Updated.
>
> -Artem
>
> On Mon, Jan 29, 2024 at 4:57 PM Jun Rao  wrote:
>
> > Hi, Artem,
> >
> > Thanks for the reply.
> >
> > 20. So for the dual-write recipe, we should always call
> > InitProducerId(keepPreparedTxn=true) from the producer? Then, should we
> > change the following in the example to use InitProducerId(true) instead?
> > 1. InitProducerId(false); TC STATE: Empty, ProducerId=42,
> > ProducerEpoch=MAX-1, PrevProducerId=-1, NextProducerId=-1,
> > NextProducerEpoch=-1; RESPONSE ProducerId=42, Epoch=MAX-1,
> > OngoingTxnProducerId=-1, OngoingTxnEpoch=-1.
> > Also, could Flink just follow the dual-write recipe? It's simpler if
> there
> > is one way to solve the 2pc issue.
> >
> > 21. Could a non 2pc user explicitly set the TransactionTimeoutMs to
> > Integer.MAX_VALUE?
> >
> > 24. Hmm, In KIP-890, without 2pc, the coordinator expects the endTxn
> > request to use the ongoing pid. With 2pc, the coordinator now expects the
> > endTxn request to use the next pid. So, the flow is different, right?
> >
> > 25. "We send out markers using the original ongoing transaction
> ProducerId
> > and ProducerEpoch"
> > We should use ProducerEpoch + 1 in the marker, right?
> >
> > Jun
> >
> > On Fri, Jan 26, 2024 at 8:35 PM Artem Livshits
> >  wrote:
> >
> > > Hi Jun,
> > >
> > > > 20.  I am a bit confused by how we set keepPreparedTxn.  ...
> > >
> > > keepPreparedTxn=true informs the transaction coordinator that it should
> > > keep the ongoing transaction, if any.  If the keepPreparedTxn=false,
> then
> > > any ongoing transaction is aborted (this is exactly the current
> > behavior).
> > > enable2Pc is a separate argument that is controlled by the
> > > *transaction.two.phase.commit.enable *setting on the client.
> > >
> > > To start 2PC, the client just needs to set
> > > *transaction.two.phase.commit.enable*=true in the config.  Then if the
> > > client knows the status of the transaction upfront (in the case of
> Flink,
> > > Flink keeps the knowledge if the transaction is prepared in its own
> > store,
> > > so it always knows upfront), it can set keepPreparedTxn 

Re: [DISCUSS] KIP-939: Support Participation in 2PC

2024-02-04 Thread Artem Livshits
Hi Rowland,

Thank you for your feedback.  Using an explicit prepare RPC was discussed
and is listed in the rejected alternatives:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-939%3A+Support+Participation+in+2PC#KIP939:SupportParticipationin2PC-Explicit%E2%80%9Cprepare%E2%80%9DRPC.
Basically, even if we had an explicit prepare RPC, it doesn't avoid the
fact that a crashed client could cause a blocking transaction.  This is,
btw, is not just a specific property of this concrete proposal, it's a
fundamental trade off of any form of 2PC -- any 2PC implementation must
allow for infinitely "in-doubt" transactions that may not be unilaterally
automatically resolved within one participant.

To mitigate the issue, using 2PC requires a special permission, so that the
Kafka admin could control that only applications that follow proper
standards in terms of availability (i.e. will automatically restart and
cleanup after a crash) would be allowed to utilize 2PC.  It is also assumed
that any practical deployment utilizing 2PC would have monitoring set up,
so that an operator could be alerted to investigate and manually resolve
in-doubt transactions (the metric and tooling support for doing so are also
described in the KIP).

For XA support, I wonder if we could take a layered approach and store XA
information in a separate store, say in a compacted topic.  This way, the
core Kafka protocol could be decoupled from specific implementations (and
extra performance requirements that a specific implementation may impose)
and serve as a foundation for multiple implementations.

-Artem

On Sun, Feb 4, 2024 at 1:37 PM Rowland Smith  wrote:

> Hi Artem,
>
> It has been a while, but I have gotten back to this. I understand that when
> 2PC is used, the transaction timeout will be effectively infinite. I don't
> think that this behavior is desirable. A long running transaction can be
> extremely disruptive since it blocks consumers on any partitions written to
> within the pending transaction. The primary reason for a long running
> transaction is a failure of the client, or the network connecting the
> client to the broker. If such a failure occurs before the client calls
> the new prepareTransaction() method, it should be OK to abort the
> transaction after a relatively short timeout period. This approach would
> minimize the inconvenience and disruption of a long running transaction
> blocking consumers, and provide higher availability for a system using
> Kafka.
>
> In order to achieve this behavior, I think we would need a 'prepare' RPC
> call so that the server knows that a transaction has been prepared, and
> does not timeout and abort such transactions. There will be some cost to
> this extra RPC call, but there will also be a benefit of better system
> availability in case of failures.
>
> There is another reason why I would prefer this implementation. I am
> working on an XA KIP, and XA requires that Kafka brokers be able to provide
> a list of prepared transactions during recovery.  The broker can only know
> that a transaction has been prepared if an RPC call is made., so my KIP
> will need this functionality. In the XA KIP, I would like to use as much of
> the KIP-939 solution as possible, so it would be helpful if
> prepareTransactions() sent a 'prepare' RPC, and the broker recorded the
> prepared transaction state.
>
> This could be made configurable behavior if we are concerned that the cost
> of the extra RPC call is too much, and that some users would prefer to have
> speed in exchange for less system availability in some cases of client or
> network failure.
>
> Let me know what you think.
>
> -Rowland
>
> On Fri, Jan 5, 2024 at 8:03 PM Artem Livshits
>  wrote:
>
> > Hi Rowland,
> >
> > Thank you for the feedback.  For the 2PC cases, the expectation is that
> the
> > timeout on the client would be set to "effectively infinite", that would
> > exceed all practical 2PC delays.  Now I think that this flexibility is
> > confusing and can be misused, I have updated the KIP to just say that if
> > 2PC is used, the transaction never expires.
> >
> > -Artem
> >
> > On Thu, Jan 4, 2024 at 6:14 PM Rowland Smith  wrote:
> >
> > > It is probably me. I copied the original message subject into a new
> > email.
> > > Perhaps that is not enough to link them.
> > >
> > > It was not my understanding from reading KIP-939 that we are doing away
> > > with any transactional timeout in the Kafka broker. As I understand it,
> > we
> > > are allowing the application to set the transaction timeout to a value
> > that
> > > exceeds the *transaction.max.timeout.ms
> > > * setting
> > > on the broker, and having no timeout if the application does not set
> > > *transaction.timeout.ms
> > > * on the producer. The KIP says that
> the
> > > semantics of *transaction.timeout.ms *
> > are
> > > not being changed, so I take that to mean that 

Re: [DISCUSS] KIP-939: Support Participation in 2PC

2024-02-04 Thread Rowland Smith
Hi Artem,

It has been a while, but I have gotten back to this. I understand that when
2PC is used, the transaction timeout will be effectively infinite. I don't
think that this behavior is desirable. A long running transaction can be
extremely disruptive since it blocks consumers on any partitions written to
within the pending transaction. The primary reason for a long running
transaction is a failure of the client, or the network connecting the
client to the broker. If such a failure occurs before the client calls
the new prepareTransaction() method, it should be OK to abort the
transaction after a relatively short timeout period. This approach would
minimize the inconvenience and disruption of a long running transaction
blocking consumers, and provide higher availability for a system using
Kafka.

In order to achieve this behavior, I think we would need a 'prepare' RPC
call so that the server knows that a transaction has been prepared, and
does not timeout and abort such transactions. There will be some cost to
this extra RPC call, but there will also be a benefit of better system
availability in case of failures.

There is another reason why I would prefer this implementation. I am
working on an XA KIP, and XA requires that Kafka brokers be able to provide
a list of prepared transactions during recovery.  The broker can only know
that a transaction has been prepared if an RPC call is made., so my KIP
will need this functionality. In the XA KIP, I would like to use as much of
the KIP-939 solution as possible, so it would be helpful if
prepareTransactions() sent a 'prepare' RPC, and the broker recorded the
prepared transaction state.

This could be made configurable behavior if we are concerned that the cost
of the extra RPC call is too much, and that some users would prefer to have
speed in exchange for less system availability in some cases of client or
network failure.

Let me know what you think.

-Rowland

On Fri, Jan 5, 2024 at 8:03 PM Artem Livshits
 wrote:

> Hi Rowland,
>
> Thank you for the feedback.  For the 2PC cases, the expectation is that the
> timeout on the client would be set to "effectively infinite", that would
> exceed all practical 2PC delays.  Now I think that this flexibility is
> confusing and can be misused, I have updated the KIP to just say that if
> 2PC is used, the transaction never expires.
>
> -Artem
>
> On Thu, Jan 4, 2024 at 6:14 PM Rowland Smith  wrote:
>
> > It is probably me. I copied the original message subject into a new
> email.
> > Perhaps that is not enough to link them.
> >
> > It was not my understanding from reading KIP-939 that we are doing away
> > with any transactional timeout in the Kafka broker. As I understand it,
> we
> > are allowing the application to set the transaction timeout to a value
> that
> > exceeds the *transaction.max.timeout.ms
> > * setting
> > on the broker, and having no timeout if the application does not set
> > *transaction.timeout.ms
> > * on the producer. The KIP says that the
> > semantics of *transaction.timeout.ms *
> are
> > not being changed, so I take that to mean that the broker will continue
> to
> > enforce a timeout if provided, and abort transactions that exceed it.
> From
> > the KIP:
> >
> > Client Configuration Changes
> >
> > *transaction.two.phase.commit.enable* The default would be ‘false’.  If
> set
> > to ‘true’, then the broker is informed that the client is participating
> in
> > two phase commit protocol and can set transaction timeout to values that
> > exceed *transaction.max.timeout.ms *
> > setting
> > on the broker (if the timeout is not set explicitly on the client and the
> > two phase commit is set to ‘true’ then the transaction never expires).
> >
> > *transaction.timeout.ms * The semantics
> is
> > not changed, but it can be set to values that exceed
> > *transaction.max.timeout.ms
> > * if two.phase.commit.enable is set
> to
> > ‘true’.
> >
> >
> > Thinking about this more I believe we would also have a possible race
> > condition if the broker is unaware that a transaction has been prepared.
> > The application might call prepare and get a positive response, but the
> > broker might have already aborted the transaction for exceeding the
> > timeout. It is a general rule of 2PC that once a transaction has been
> > prepared it must be possible for it to be committed or aborted. It seems
> in
> > this case a prepared transaction might already be aborted by the broker,
> so
> > it would be impossible to commit.
> >
> > I hope this is making sense and I am not misunderstanding the KIP. Please
> > let me know if I am.
> >
> > - Rowland
> >
> >
> > On Thu, Jan 4, 2024 at 12:56 PM Justine Olshan
> > 
> > wrote:
> >
> > > Hey Rowland,
> > >
> > > Not sure why this message showed up in a different thread from 

Re: [DISCUSS] KIP-939: Support Participation in 2PC

2024-02-02 Thread Artem Livshits
Hi Jun,

>  Then, should we change the following in the example to use
InitProducerId(true) instead?

We could. I just thought that it's good to make the example self-contained
by starting from a clean state.

> Also, could Flink just follow the dual-write recipe?

I think it would bring some unnecessary logic to Flink (or any other system
that already has a transaction coordinator and just wants to drive Kafka to
the desired state).  We could discuss it with Flink folks, the current
proposal was developed in collaboration with them.

> 21. Could a non 2pc user explicitly set the TransactionTimeoutMs to
Integer.MAX_VALUE?

The server would reject this for regular transactions, it only accepts
values that are <= *transaction.max.timeout.ms
 *(a broker config).

> 24. Hmm, In KIP-890, without 2pc, the coordinator expects the endTxn
request to use the ongoing pid. ...

Without 2PC there is no case where the pid could change between starting a
transaction and endTxn (InitProducerId would abort any ongoing
transaction).  WIth 2PC there is now a case where there could be
InitProducerId that can change the pid without aborting the transaction, so
we need to handle that.  I wouldn't say that the flow is different, but
it's rather extended to handle new cases.  The main principle is still the
same -- for all operations we use the latest "operational" pid and epoch
known to the client, this way we guarantee that we can fence zombie / split
brain clients by disrupting the "latest known" pid + epoch progression.

> 25. "We send out markers using the original ongoing transaction
ProducerId and ProducerEpoch" ...

Updated.

-Artem

On Mon, Jan 29, 2024 at 4:57 PM Jun Rao  wrote:

> Hi, Artem,
>
> Thanks for the reply.
>
> 20. So for the dual-write recipe, we should always call
> InitProducerId(keepPreparedTxn=true) from the producer? Then, should we
> change the following in the example to use InitProducerId(true) instead?
> 1. InitProducerId(false); TC STATE: Empty, ProducerId=42,
> ProducerEpoch=MAX-1, PrevProducerId=-1, NextProducerId=-1,
> NextProducerEpoch=-1; RESPONSE ProducerId=42, Epoch=MAX-1,
> OngoingTxnProducerId=-1, OngoingTxnEpoch=-1.
> Also, could Flink just follow the dual-write recipe? It's simpler if there
> is one way to solve the 2pc issue.
>
> 21. Could a non 2pc user explicitly set the TransactionTimeoutMs to
> Integer.MAX_VALUE?
>
> 24. Hmm, In KIP-890, without 2pc, the coordinator expects the endTxn
> request to use the ongoing pid. With 2pc, the coordinator now expects the
> endTxn request to use the next pid. So, the flow is different, right?
>
> 25. "We send out markers using the original ongoing transaction ProducerId
> and ProducerEpoch"
> We should use ProducerEpoch + 1 in the marker, right?
>
> Jun
>
> On Fri, Jan 26, 2024 at 8:35 PM Artem Livshits
>  wrote:
>
> > Hi Jun,
> >
> > > 20.  I am a bit confused by how we set keepPreparedTxn.  ...
> >
> > keepPreparedTxn=true informs the transaction coordinator that it should
> > keep the ongoing transaction, if any.  If the keepPreparedTxn=false, then
> > any ongoing transaction is aborted (this is exactly the current
> behavior).
> > enable2Pc is a separate argument that is controlled by the
> > *transaction.two.phase.commit.enable *setting on the client.
> >
> > To start 2PC, the client just needs to set
> > *transaction.two.phase.commit.enable*=true in the config.  Then if the
> > client knows the status of the transaction upfront (in the case of Flink,
> > Flink keeps the knowledge if the transaction is prepared in its own
> store,
> > so it always knows upfront), it can set keepPreparedTxn accordingly, then
> > if the transaction was prepared, it'll be ready for the client to
> complete
> > the appropriate action; if the client doesn't have a knowledge that the
> > transaction is prepared, keepPreparedTxn is going to be false, in which
> > case we'll get to a clean state (the same way we do today).
> >
> > For the dual-write recipe, the client doesn't know upfront if the
> > transaction is prepared, this information is implicitly encoded
> > PreparedTxnState value that can be used to resolve the transaction state.
> > In that case, keepPreparedTxn should always be true, because we don't
> know
> > upfront and we don't want to accidentally abort a committed transaction.
> >
> > The forceTerminateTransaction call can just use keepPreparedTxn=false, it
> > actually doesn't matter if it sets Enable2Pc flag.
> >
> > > 21. TransactionLogValue: Do we need some field to identify whether this
> > is written for 2PC so that ongoing txn is never auto aborted?
> >
> > The TransactionTimeoutMs would be set to Integer.MAX_VALUE if 2PC was
> > enabled.  I've added a note to the KIP about this.
> >
> > > 22
> >
> > You're right it's a typo.  I fixed it as well as step 9 (REQUEST:
> > ProducerId=73, ProducerEpoch=MAX).
> >
> > > 23. It's a bit weird that Enable2Pc is driven by a config while
> > KeepPreparedTxn is from an 

Re: [DISCUSS] KIP-939: Support Participation in 2PC

2024-01-29 Thread Jun Rao
Hi, Artem,

Thanks for the reply.

20. So for the dual-write recipe, we should always call
InitProducerId(keepPreparedTxn=true) from the producer? Then, should we
change the following in the example to use InitProducerId(true) instead?
1. InitProducerId(false); TC STATE: Empty, ProducerId=42,
ProducerEpoch=MAX-1, PrevProducerId=-1, NextProducerId=-1,
NextProducerEpoch=-1; RESPONSE ProducerId=42, Epoch=MAX-1,
OngoingTxnProducerId=-1, OngoingTxnEpoch=-1.
Also, could Flink just follow the dual-write recipe? It's simpler if there
is one way to solve the 2pc issue.

21. Could a non 2pc user explicitly set the TransactionTimeoutMs to
Integer.MAX_VALUE?

24. Hmm, In KIP-890, without 2pc, the coordinator expects the endTxn
request to use the ongoing pid. With 2pc, the coordinator now expects the
endTxn request to use the next pid. So, the flow is different, right?

25. "We send out markers using the original ongoing transaction ProducerId
and ProducerEpoch"
We should use ProducerEpoch + 1 in the marker, right?

Jun

On Fri, Jan 26, 2024 at 8:35 PM Artem Livshits
 wrote:

> Hi Jun,
>
> > 20.  I am a bit confused by how we set keepPreparedTxn.  ...
>
> keepPreparedTxn=true informs the transaction coordinator that it should
> keep the ongoing transaction, if any.  If the keepPreparedTxn=false, then
> any ongoing transaction is aborted (this is exactly the current behavior).
> enable2Pc is a separate argument that is controlled by the
> *transaction.two.phase.commit.enable *setting on the client.
>
> To start 2PC, the client just needs to set
> *transaction.two.phase.commit.enable*=true in the config.  Then if the
> client knows the status of the transaction upfront (in the case of Flink,
> Flink keeps the knowledge if the transaction is prepared in its own store,
> so it always knows upfront), it can set keepPreparedTxn accordingly, then
> if the transaction was prepared, it'll be ready for the client to complete
> the appropriate action; if the client doesn't have a knowledge that the
> transaction is prepared, keepPreparedTxn is going to be false, in which
> case we'll get to a clean state (the same way we do today).
>
> For the dual-write recipe, the client doesn't know upfront if the
> transaction is prepared, this information is implicitly encoded
> PreparedTxnState value that can be used to resolve the transaction state.
> In that case, keepPreparedTxn should always be true, because we don't know
> upfront and we don't want to accidentally abort a committed transaction.
>
> The forceTerminateTransaction call can just use keepPreparedTxn=false, it
> actually doesn't matter if it sets Enable2Pc flag.
>
> > 21. TransactionLogValue: Do we need some field to identify whether this
> is written for 2PC so that ongoing txn is never auto aborted?
>
> The TransactionTimeoutMs would be set to Integer.MAX_VALUE if 2PC was
> enabled.  I've added a note to the KIP about this.
>
> > 22
>
> You're right it's a typo.  I fixed it as well as step 9 (REQUEST:
> ProducerId=73, ProducerEpoch=MAX).
>
> > 23. It's a bit weird that Enable2Pc is driven by a config while
> KeepPreparedTxn is from an API param ...
>
> The intent to use 2PC doesn't change from transaction to transaction, but
> the intent to keep prepared txn may change from transaction to
> transaction.  In dual-write recipes the distinction is not clear, but for
> use cases where keepPreparedTxn value is known upfront (e.g. Flink) it's
> more prominent.  E.g. a Flink's Kafka sink operator could be deployed with
> *transaction.two.phase.commit.enable*=true hardcoded in the image, but
> keepPreparedTxn cannot be hardcoded in the image, because it depends on the
> job manager's state.
>
> > 24
>
> The flow is actually going to be the same way as it is now -- the "main"
> producer id + epoch needs to be used in all operations to prevent fencing
> (it's sort of a common "header" in all RPC calls that follow the same
> rules).  The ongoing txn info is just additional info for making a commit /
> abort decision based on the PreparedTxnState from the DB.
>
> --Artem
>
> On Thu, Jan 25, 2024 at 11:05 AM Jun Rao  wrote:
>
> > Hi, Artem,
> >
> > Thanks for the reply. A few more comments.
> >
> > 20. I am a bit confused by how we set keepPreparedTxn. From the KIP, I
> got
> > the following (1) to start 2pc, we call
> > InitProducerId(keepPreparedTxn=false); (2) when the producer fails and
> > needs to do recovery, it calls InitProducerId(keepPreparedTxn=true); (3)
> > Admin.forceTerminateTransaction() calls
> > InitProducerId(keepPreparedTxn=false).
> > 20.1 In (1), when a producer calls InitProducerId(false) with 2pc
> enabled,
> > and there is an ongoing txn, should the server return an error to the
> > InitProducerId request? If so, what would be the error code?
> > 20.2 How do we distinguish between (1) and (3)? It's the same API call
> but
> > (1) doesn't abort ongoing txn and (2) does.
> > 20.3 The usage in (1) seems unintuitive. 2pc implies keeping the ongoing
> > txn. So, 

Re: [DISCUSS] KIP-939: Support Participation in 2PC

2024-01-26 Thread Artem Livshits
Hi Jun,

> 20.  I am a bit confused by how we set keepPreparedTxn.  ...

keepPreparedTxn=true informs the transaction coordinator that it should
keep the ongoing transaction, if any.  If the keepPreparedTxn=false, then
any ongoing transaction is aborted (this is exactly the current behavior).
enable2Pc is a separate argument that is controlled by the
*transaction.two.phase.commit.enable *setting on the client.

To start 2PC, the client just needs to set
*transaction.two.phase.commit.enable*=true in the config.  Then if the
client knows the status of the transaction upfront (in the case of Flink,
Flink keeps the knowledge if the transaction is prepared in its own store,
so it always knows upfront), it can set keepPreparedTxn accordingly, then
if the transaction was prepared, it'll be ready for the client to complete
the appropriate action; if the client doesn't have a knowledge that the
transaction is prepared, keepPreparedTxn is going to be false, in which
case we'll get to a clean state (the same way we do today).

For the dual-write recipe, the client doesn't know upfront if the
transaction is prepared, this information is implicitly encoded
PreparedTxnState value that can be used to resolve the transaction state.
In that case, keepPreparedTxn should always be true, because we don't know
upfront and we don't want to accidentally abort a committed transaction.

The forceTerminateTransaction call can just use keepPreparedTxn=false, it
actually doesn't matter if it sets Enable2Pc flag.

> 21. TransactionLogValue: Do we need some field to identify whether this
is written for 2PC so that ongoing txn is never auto aborted?

The TransactionTimeoutMs would be set to Integer.MAX_VALUE if 2PC was
enabled.  I've added a note to the KIP about this.

> 22

You're right it's a typo.  I fixed it as well as step 9 (REQUEST:
ProducerId=73, ProducerEpoch=MAX).

> 23. It's a bit weird that Enable2Pc is driven by a config while
KeepPreparedTxn is from an API param ...

The intent to use 2PC doesn't change from transaction to transaction, but
the intent to keep prepared txn may change from transaction to
transaction.  In dual-write recipes the distinction is not clear, but for
use cases where keepPreparedTxn value is known upfront (e.g. Flink) it's
more prominent.  E.g. a Flink's Kafka sink operator could be deployed with
*transaction.two.phase.commit.enable*=true hardcoded in the image, but
keepPreparedTxn cannot be hardcoded in the image, because it depends on the
job manager's state.

> 24

The flow is actually going to be the same way as it is now -- the "main"
producer id + epoch needs to be used in all operations to prevent fencing
(it's sort of a common "header" in all RPC calls that follow the same
rules).  The ongoing txn info is just additional info for making a commit /
abort decision based on the PreparedTxnState from the DB.

--Artem

On Thu, Jan 25, 2024 at 11:05 AM Jun Rao  wrote:

> Hi, Artem,
>
> Thanks for the reply. A few more comments.
>
> 20. I am a bit confused by how we set keepPreparedTxn. From the KIP, I got
> the following (1) to start 2pc, we call
> InitProducerId(keepPreparedTxn=false); (2) when the producer fails and
> needs to do recovery, it calls InitProducerId(keepPreparedTxn=true); (3)
> Admin.forceTerminateTransaction() calls
> InitProducerId(keepPreparedTxn=false).
> 20.1 In (1), when a producer calls InitProducerId(false) with 2pc enabled,
> and there is an ongoing txn, should the server return an error to the
> InitProducerId request? If so, what would be the error code?
> 20.2 How do we distinguish between (1) and (3)? It's the same API call but
> (1) doesn't abort ongoing txn and (2) does.
> 20.3 The usage in (1) seems unintuitive. 2pc implies keeping the ongoing
> txn. So, setting keepPreparedTxn to false to start 2pc seems counter
> intuitive.
>
> 21. TransactionLogValue: Do we need some field to identify whether this is
> written for 2PC so that ongoing txn is never auto aborted?
>
> 22. "8. InitProducerId(true); TC STATE: Ongoing, ProducerId=42,
> ProducerEpoch=MAX-1, PrevProducerId=-1, NextProducerId=73,
> NextProducerEpoch=MAX; RESPONSE ProducerId=73, Epoch=MAX-1,
> OngoingTxnProducerId=42, OngoingTxnEpoch=MAX-1"
> It seems in the above example, Epoch in RESPONSE should be MAX to match
> NextProducerEpoch?
>
> 23. It's a bit weird that Enable2Pc is driven by a config
> while KeepPreparedTxn is from an API param. Should we make them more
> consistent since they seem related?
>
> 24. "9. Commit; REQUEST: ProducerId=73, ProducerEpoch=MAX-1; TC STATE:
> PrepareCommit, ProducerId=42, ProducerEpoch=MAX, PrevProducerId=73,
> NextProducerId=85, NextProducerEpoch=0; RESPONSE ProducerId=85, Epoch=0,
> When a commit request is sent, it uses the latest ProducerId and
> ProducerEpoch."
> The step where we use the next produceId to commit an old txn works, but
> can be confusing. It's going to be hard for people implementing this new
> client protocol to figure out when to use the current or 

Re: [DISCUSS] KIP-939: Support Participation in 2PC

2024-01-25 Thread Jun Rao
Hi, Artem,

Thanks for the reply. A few more comments.

20. I am a bit confused by how we set keepPreparedTxn. From the KIP, I got
the following (1) to start 2pc, we call
InitProducerId(keepPreparedTxn=false); (2) when the producer fails and
needs to do recovery, it calls InitProducerId(keepPreparedTxn=true); (3)
Admin.forceTerminateTransaction() calls
InitProducerId(keepPreparedTxn=false).
20.1 In (1), when a producer calls InitProducerId(false) with 2pc enabled,
and there is an ongoing txn, should the server return an error to the
InitProducerId request? If so, what would be the error code?
20.2 How do we distinguish between (1) and (3)? It's the same API call but
(1) doesn't abort ongoing txn and (2) does.
20.3 The usage in (1) seems unintuitive. 2pc implies keeping the ongoing
txn. So, setting keepPreparedTxn to false to start 2pc seems counter
intuitive.

21. TransactionLogValue: Do we need some field to identify whether this is
written for 2PC so that ongoing txn is never auto aborted?

22. "8. InitProducerId(true); TC STATE: Ongoing, ProducerId=42,
ProducerEpoch=MAX-1, PrevProducerId=-1, NextProducerId=73,
NextProducerEpoch=MAX; RESPONSE ProducerId=73, Epoch=MAX-1,
OngoingTxnProducerId=42, OngoingTxnEpoch=MAX-1"
It seems in the above example, Epoch in RESPONSE should be MAX to match
NextProducerEpoch?

23. It's a bit weird that Enable2Pc is driven by a config
while KeepPreparedTxn is from an API param. Should we make them more
consistent since they seem related?

24. "9. Commit; REQUEST: ProducerId=73, ProducerEpoch=MAX-1; TC STATE:
PrepareCommit, ProducerId=42, ProducerEpoch=MAX, PrevProducerId=73,
NextProducerId=85, NextProducerEpoch=0; RESPONSE ProducerId=85, Epoch=0,
When a commit request is sent, it uses the latest ProducerId and
ProducerEpoch."
The step where we use the next produceId to commit an old txn works, but
can be confusing. It's going to be hard for people implementing this new
client protocol to figure out when to use the current or the new producerId
in the EndTxnRequest. One potential way to improve this is to extend
EndTxnRequest with a new field like expectedNextProducerId. Then we can
always use the old produceId in the existing field, but set
expectedNextProducerId to bypass the fencing logic when needed.

Thanks,

Jun



On Mon, Dec 18, 2023 at 2:06 PM Artem Livshits
 wrote:

> Hi Jun,
>
> Thank you for the comments.
>
> > 10. For the two new fields in Enable2Pc and KeepPreparedTxn ...
>
> I added a note that all combinations are valid.  Enable2Pc=false &
> KeepPreparedTxn=true could be potentially useful for backward compatibility
> with Flink, when the new version of Flink that implements KIP-319 tries to
> work with a cluster that doesn't authorize 2PC.
>
> > 11.  InitProducerIdResponse: If there is no ongoing txn, what will
> OngoingTxnProducerId and OngoingTxnEpoch be set?
>
> I added a note that they will be set to -1.  The client then will know that
> there is no ongoing txn and .completeTransaction becomes a no-op (but still
> required before .send is enabled).
>
> > 12. ListTransactionsRequest related changes: It seems those are already
> covered by KIP-994?
>
> Removed from this KIP.
>
> > 13. TransactionalLogValue ...
>
> This is now updated to work on top of KIP-890.
>
> > 14. "Note that the (producerId, epoch) pair that corresponds to the
> ongoing transaction ...
>
> This is now updated to work on top of KIP-890.
>
> > 15. active-transaction-total-time-max : ...
>
> Updated.
>
> > 16. "transaction.two.phase.commit.enable The default would be ‘false’.
> If it’s ‘false’, 2PC functionality is disabled even if the ACL is set ...
>
> Disabling 2PC effectively removes all authorization to use it, hence I
> thought TRANSACTIONAL_ID_AUTHORIZATION_FAILED would be appropriate.
>
> Do you suggest using a different error code for 2PC authorization vs some
> other authorization (e.g. TRANSACTIONAL_ID_2PC_AUTHORIZATION_FAILED) or a
> different code for disabled vs. unauthorised (e.g.
> TWO_PHASE_COMMIT_DISABLED) or both?
>
> > 17. completeTransaction(). We expect this to be only used during
> recovery.
>
> It can also be used if, say, a commit to the database fails and the result
> is inconclusive, e.g.
>
> 1. Begin DB transaction
> 2. Begin Kafka transaction
> 3. Prepare Kafka transaction
> 4. Commit DB transaction
> 5. The DB commit fails, figure out the state of the transaction by reading
> the PreparedTxnState from DB
> 6. Complete Kafka transaction with the PreparedTxnState.
>
> > 18. "either prepareTransaction was called or initTransaction(true) was
> called": "either" should be "neither"?
>
> Updated.
>
> > 19. Since InitProducerId always bumps up the epoch, it creates a
> situation ...
>
> InitProducerId only bumps the producer epoch, the ongoing transaction epoch
> stays the same, no matter how many times the InitProducerId is called
> before the transaction is completed.  Eventually the epoch may overflow,
> and then a new producer id would be allocated, but the 

Re: [DISCUSS] KIP-939: Support Participation in 2PC

2024-01-05 Thread Artem Livshits
Hi Rowland,

Thank you for the feedback.  For the 2PC cases, the expectation is that the
timeout on the client would be set to "effectively infinite", that would
exceed all practical 2PC delays.  Now I think that this flexibility is
confusing and can be misused, I have updated the KIP to just say that if
2PC is used, the transaction never expires.

-Artem

On Thu, Jan 4, 2024 at 6:14 PM Rowland Smith  wrote:

> It is probably me. I copied the original message subject into a new email.
> Perhaps that is not enough to link them.
>
> It was not my understanding from reading KIP-939 that we are doing away
> with any transactional timeout in the Kafka broker. As I understand it, we
> are allowing the application to set the transaction timeout to a value that
> exceeds the *transaction.max.timeout.ms
> * setting
> on the broker, and having no timeout if the application does not set
> *transaction.timeout.ms
> * on the producer. The KIP says that the
> semantics of *transaction.timeout.ms * are
> not being changed, so I take that to mean that the broker will continue to
> enforce a timeout if provided, and abort transactions that exceed it. From
> the KIP:
>
> Client Configuration Changes
>
> *transaction.two.phase.commit.enable* The default would be ‘false’.  If set
> to ‘true’, then the broker is informed that the client is participating in
> two phase commit protocol and can set transaction timeout to values that
> exceed *transaction.max.timeout.ms *
> setting
> on the broker (if the timeout is not set explicitly on the client and the
> two phase commit is set to ‘true’ then the transaction never expires).
>
> *transaction.timeout.ms * The semantics is
> not changed, but it can be set to values that exceed
> *transaction.max.timeout.ms
> * if two.phase.commit.enable is set to
> ‘true’.
>
>
> Thinking about this more I believe we would also have a possible race
> condition if the broker is unaware that a transaction has been prepared.
> The application might call prepare and get a positive response, but the
> broker might have already aborted the transaction for exceeding the
> timeout. It is a general rule of 2PC that once a transaction has been
> prepared it must be possible for it to be committed or aborted. It seems in
> this case a prepared transaction might already be aborted by the broker, so
> it would be impossible to commit.
>
> I hope this is making sense and I am not misunderstanding the KIP. Please
> let me know if I am.
>
> - Rowland
>
>
> On Thu, Jan 4, 2024 at 12:56 PM Justine Olshan
> 
> wrote:
>
> > Hey Rowland,
> >
> > Not sure why this message showed up in a different thread from the other
> > KIP-939 discussion (is it just me?)
> >
> > In KIP-939, we do away with having any transactional timeout on the Kafka
> > side. The external coordinator is fully responsible for controlling
> whether
> > the transaction completes.
> >
> > While I think there is some use in having a prepare stage, I just wanted
> to
> > clarify what the current KIP is proposing.
> >
> > Thanks,
> > Justine
> >
> > On Wed, Jan 3, 2024 at 7:49 PM Rowland Smith  wrote:
> >
> > > Hi Artem,
> > >
> > > I saw your response in the thread I started discussing Kafka
> distributed
> > > transaction support and the XA interface. I would like to work with you
> > to
> > > add XA support to Kafka on top of the excellent foundational work that
> > you
> > > have started with KIP-939. I agree that explicit XA support should not
> be
> > > included in the Kafka codebase as long as the right set of basic
> > operations
> > > are provided. I will begin pulling together a KIP to follow KIP-939.
> > >
> > > I did have one comment on KIP-939 itself. I see that you considered an
> > > explicit "prepare" RPC, but decided not to add it. If I understand your
> > > design correctly, that would mean that a 2PC transaction would have a
> > > single timeout that would need to be long enough to ensure that
> prepared
> > > transactions are not aborted when an external coordinator fails.
> However,
> > > this also means that an unprepared transaction would not be aborted
> > without
> > > waiting for the same timeout. Since long running transactions block
> > > transactional consumers, having a long timeout for all transactions
> could
> > > be disruptive. An explicit "prepare " RPC would allow the server to
> abort
> > > unprepared transactions after a relatively short timeout, and apply a
> > much
> > > longer timeout only to prepared transactions. The explicit "prepare"
> RPC
> > > would make Kafka server more resilient to client failure at the cost of
> > an
> > > extra synchronous RPC call. I think its worth reconsidering this.
> > >
> > > With an XA implementation this might become a more significant issue
> > since
> > > the transaction 

Re: [DISCUSS] KIP-939: Support Participation in 2PC

2024-01-04 Thread Rowland Smith
It is probably me. I copied the original message subject into a new email.
Perhaps that is not enough to link them.

It was not my understanding from reading KIP-939 that we are doing away
with any transactional timeout in the Kafka broker. As I understand it, we
are allowing the application to set the transaction timeout to a value that
exceeds the *transaction.max.timeout.ms
* setting
on the broker, and having no timeout if the application does not set
*transaction.timeout.ms
* on the producer. The KIP says that the
semantics of *transaction.timeout.ms * are
not being changed, so I take that to mean that the broker will continue to
enforce a timeout if provided, and abort transactions that exceed it. From
the KIP:

Client Configuration Changes

*transaction.two.phase.commit.enable* The default would be ‘false’.  If set
to ‘true’, then the broker is informed that the client is participating in
two phase commit protocol and can set transaction timeout to values that
exceed *transaction.max.timeout.ms * setting
on the broker (if the timeout is not set explicitly on the client and the
two phase commit is set to ‘true’ then the transaction never expires).

*transaction.timeout.ms * The semantics is
not changed, but it can be set to values that exceed
*transaction.max.timeout.ms
* if two.phase.commit.enable is set to
‘true’.


Thinking about this more I believe we would also have a possible race
condition if the broker is unaware that a transaction has been prepared.
The application might call prepare and get a positive response, but the
broker might have already aborted the transaction for exceeding the
timeout. It is a general rule of 2PC that once a transaction has been
prepared it must be possible for it to be committed or aborted. It seems in
this case a prepared transaction might already be aborted by the broker, so
it would be impossible to commit.

I hope this is making sense and I am not misunderstanding the KIP. Please
let me know if I am.

- Rowland


On Thu, Jan 4, 2024 at 12:56 PM Justine Olshan 
wrote:

> Hey Rowland,
>
> Not sure why this message showed up in a different thread from the other
> KIP-939 discussion (is it just me?)
>
> In KIP-939, we do away with having any transactional timeout on the Kafka
> side. The external coordinator is fully responsible for controlling whether
> the transaction completes.
>
> While I think there is some use in having a prepare stage, I just wanted to
> clarify what the current KIP is proposing.
>
> Thanks,
> Justine
>
> On Wed, Jan 3, 2024 at 7:49 PM Rowland Smith  wrote:
>
> > Hi Artem,
> >
> > I saw your response in the thread I started discussing Kafka distributed
> > transaction support and the XA interface. I would like to work with you
> to
> > add XA support to Kafka on top of the excellent foundational work that
> you
> > have started with KIP-939. I agree that explicit XA support should not be
> > included in the Kafka codebase as long as the right set of basic
> operations
> > are provided. I will begin pulling together a KIP to follow KIP-939.
> >
> > I did have one comment on KIP-939 itself. I see that you considered an
> > explicit "prepare" RPC, but decided not to add it. If I understand your
> > design correctly, that would mean that a 2PC transaction would have a
> > single timeout that would need to be long enough to ensure that prepared
> > transactions are not aborted when an external coordinator fails. However,
> > this also means that an unprepared transaction would not be aborted
> without
> > waiting for the same timeout. Since long running transactions block
> > transactional consumers, having a long timeout for all transactions could
> > be disruptive. An explicit "prepare " RPC would allow the server to abort
> > unprepared transactions after a relatively short timeout, and apply a
> much
> > longer timeout only to prepared transactions. The explicit "prepare" RPC
> > would make Kafka server more resilient to client failure at the cost of
> an
> > extra synchronous RPC call. I think its worth reconsidering this.
> >
> > With an XA implementation this might become a more significant issue
> since
> > the transaction coordinator has no memory of unprepared transactions
> across
> > restarts. Such transactions would need to be cleared by hand through the
> > admin client even when the transaction coordinator restarts successfully.
> >
> > - Rowland
> >
>


Re: [DISCUSS] KIP-939: Support Participation in 2PC

2024-01-04 Thread Justine Olshan
Hey Rowland,

Not sure why this message showed up in a different thread from the other
KIP-939 discussion (is it just me?)

In KIP-939, we do away with having any transactional timeout on the Kafka
side. The external coordinator is fully responsible for controlling whether
the transaction completes.

While I think there is some use in having a prepare stage, I just wanted to
clarify what the current KIP is proposing.

Thanks,
Justine

On Wed, Jan 3, 2024 at 7:49 PM Rowland Smith  wrote:

> Hi Artem,
>
> I saw your response in the thread I started discussing Kafka distributed
> transaction support and the XA interface. I would like to work with you to
> add XA support to Kafka on top of the excellent foundational work that you
> have started with KIP-939. I agree that explicit XA support should not be
> included in the Kafka codebase as long as the right set of basic operations
> are provided. I will begin pulling together a KIP to follow KIP-939.
>
> I did have one comment on KIP-939 itself. I see that you considered an
> explicit "prepare" RPC, but decided not to add it. If I understand your
> design correctly, that would mean that a 2PC transaction would have a
> single timeout that would need to be long enough to ensure that prepared
> transactions are not aborted when an external coordinator fails. However,
> this also means that an unprepared transaction would not be aborted without
> waiting for the same timeout. Since long running transactions block
> transactional consumers, having a long timeout for all transactions could
> be disruptive. An explicit "prepare " RPC would allow the server to abort
> unprepared transactions after a relatively short timeout, and apply a much
> longer timeout only to prepared transactions. The explicit "prepare" RPC
> would make Kafka server more resilient to client failure at the cost of an
> extra synchronous RPC call. I think its worth reconsidering this.
>
> With an XA implementation this might become a more significant issue since
> the transaction coordinator has no memory of unprepared transactions across
> restarts. Such transactions would need to be cleared by hand through the
> admin client even when the transaction coordinator restarts successfully.
>
> - Rowland
>


Re: [DISCUSS] KIP-939: Support Participation in 2PC

2024-01-03 Thread Rowland Smith
Hi Artem,

I saw your response in the thread I started discussing Kafka distributed
transaction support and the XA interface. I would like to work with you to
add XA support to Kafka on top of the excellent foundational work that you
have started with KIP-939. I agree that explicit XA support should not be
included in the Kafka codebase as long as the right set of basic operations
are provided. I will begin pulling together a KIP to follow KIP-939.

I did have one comment on KIP-939 itself. I see that you considered an
explicit "prepare" RPC, but decided not to add it. If I understand your
design correctly, that would mean that a 2PC transaction would have a
single timeout that would need to be long enough to ensure that prepared
transactions are not aborted when an external coordinator fails. However,
this also means that an unprepared transaction would not be aborted without
waiting for the same timeout. Since long running transactions block
transactional consumers, having a long timeout for all transactions could
be disruptive. An explicit "prepare " RPC would allow the server to abort
unprepared transactions after a relatively short timeout, and apply a much
longer timeout only to prepared transactions. The explicit "prepare" RPC
would make Kafka server more resilient to client failure at the cost of an
extra synchronous RPC call. I think its worth reconsidering this.

With an XA implementation this might become a more significant issue since
the transaction coordinator has no memory of unprepared transactions across
restarts. Such transactions would need to be cleared by hand through the
admin client even when the transaction coordinator restarts successfully.

- Rowland


Re: [DISCUSS] KIP-939: Support Participation in 2PC

2023-12-18 Thread Artem Livshits
Hi Jun,

Thank you for the comments.

> 10. For the two new fields in Enable2Pc and KeepPreparedTxn ...

I added a note that all combinations are valid.  Enable2Pc=false &
KeepPreparedTxn=true could be potentially useful for backward compatibility
with Flink, when the new version of Flink that implements KIP-319 tries to
work with a cluster that doesn't authorize 2PC.

> 11.  InitProducerIdResponse: If there is no ongoing txn, what will
OngoingTxnProducerId and OngoingTxnEpoch be set?

I added a note that they will be set to -1.  The client then will know that
there is no ongoing txn and .completeTransaction becomes a no-op (but still
required before .send is enabled).

> 12. ListTransactionsRequest related changes: It seems those are already
covered by KIP-994?

Removed from this KIP.

> 13. TransactionalLogValue ...

This is now updated to work on top of KIP-890.

> 14. "Note that the (producerId, epoch) pair that corresponds to the
ongoing transaction ...

This is now updated to work on top of KIP-890.

> 15. active-transaction-total-time-max : ...

Updated.

> 16. "transaction.two.phase.commit.enable The default would be ‘false’.
If it’s ‘false’, 2PC functionality is disabled even if the ACL is set ...

Disabling 2PC effectively removes all authorization to use it, hence I
thought TRANSACTIONAL_ID_AUTHORIZATION_FAILED would be appropriate.

Do you suggest using a different error code for 2PC authorization vs some
other authorization (e.g. TRANSACTIONAL_ID_2PC_AUTHORIZATION_FAILED) or a
different code for disabled vs. unauthorised (e.g.
TWO_PHASE_COMMIT_DISABLED) or both?

> 17. completeTransaction(). We expect this to be only used during recovery.

It can also be used if, say, a commit to the database fails and the result
is inconclusive, e.g.

1. Begin DB transaction
2. Begin Kafka transaction
3. Prepare Kafka transaction
4. Commit DB transaction
5. The DB commit fails, figure out the state of the transaction by reading
the PreparedTxnState from DB
6. Complete Kafka transaction with the PreparedTxnState.

> 18. "either prepareTransaction was called or initTransaction(true) was
called": "either" should be "neither"?

Updated.

> 19. Since InitProducerId always bumps up the epoch, it creates a
situation ...

InitProducerId only bumps the producer epoch, the ongoing transaction epoch
stays the same, no matter how many times the InitProducerId is called
before the transaction is completed.  Eventually the epoch may overflow,
and then a new producer id would be allocated, but the ongoing transaction
producer id would stay the same.

I've added a couple examples in the KIP (
https://cwiki.apache.org/confluence/display/KAFKA/KIP-939%3A+Support+Participation+in+2PC#KIP939:SupportParticipationin2PC-PersistedDataFormatChanges)
that walk through some scenarios and show how the state is changed.

-Artem

On Fri, Dec 8, 2023 at 6:04 PM Jun Rao  wrote:

> Hi, Artem,
>
> Thanks for the KIP. A few comments below.
>
> 10. For the two new fields in Enable2Pc and KeepPreparedTxn in
> InitProducerId, it would be useful to document a bit more detail on what
> values are set under what cases. For example, are all four combinations
> valid?
>
> 11.  InitProducerIdResponse: If there is no ongoing txn, what will
> OngoingTxnProducerId and OngoingTxnEpoch be set?
>
> 12. ListTransactionsRequest related changes: It seems those are already
> covered by KIP-994?
>
> 13. TransactionalLogValue: Could we name TransactionProducerId and
> ProducerId better? It's not clear from the name which is for which.
>
> 14. "Note that the (producerId, epoch) pair that corresponds to the ongoing
> transaction is going to be written instead of the existing ProducerId and
> ProducerEpoch fields (which are renamed to reflect the semantics) to
> support downgrade.": I am a bit confused on that. Are we writing different
> values to the existing fields? Then, we can't downgrade, right?
>
> 15. active-transaction-total-time-max : Would
> active-transaction-open-time-max be more intuitive? Also, could we include
> the full name (group, tags, etc)?
>
> 16. "transaction.two.phase.commit.enable The default would be ‘false’.  If
> it’s ‘false’, 2PC functionality is disabled even if the ACL is set, clients
> that attempt to use this functionality would receive
> TRANSACTIONAL_ID_AUTHORIZATION_FAILED error."
> TRANSACTIONAL_ID_AUTHORIZATION_FAILED seems unintuitive for the client to
> understand what the actual cause is.
>
> 17. completeTransaction(). We expect this to be only used during recovery.
> Could we document this clearly? Could we prevent it from being used
> incorrectly (e.g. throw an exception if the producer has called other
> methods like send())?
>
> 18. "either prepareTransaction was called or initTransaction(true) was
> called": "either" should be "neither"?
>
> 19. Since InitProducerId always bumps up the epoch, it creates a situation
> where there could be multiple outstanding txns. The following is an example
> of a potential problem 

Re: [DISCUSS] KIP-939: Support Participation in 2PC

2023-12-18 Thread Artem Livshits
Hi Justine,

I've updated the KIP based on the KIP-890 updates.  Now KIP-939 only needs
to add one tagged field NextProducerEpoch as the other required fields will
be added as part of KIP-890.

> But here we could call the InitProducerId multiple times and we only want
the producer with the correct epoch to be able to commit the transaction

That's correct, the epoch cannot be inferred from the state in this case
because InitProducerId can be called multiple times.  I've also added an
example in the KIP that walks through the epoch overflow scenarios.

-Artem


On Thu, Dec 7, 2023 at 2:43 PM Justine Olshan 
wrote:

> Hey Artem,
>
> Thanks for the updates. I think what you say makes sense. I just updated my
> KIP so I want to reconcile some of the changes we made especially with
> respect to the TransactionLogValue.
>
> Firstly, I believe tagged fields require a default value so that if they
> are not filled, we return the default (and know that they were empty). For
> my KIP, I proposed the default for producer ID tagged fields should be -1.
> I was wondering if we could update the KIP to include the default values
> for producer ID and epoch.
>
> Next, I noticed we decided to rename the fields. I guess that the field
> "NextProducerId" in my KIP correlates to "ProducerId" in this KIP. Is that
> correct? So we would have "TransactionProducerId" for the non-tagged field
> and have "ProducerId" (NextProducerId) and "PrevProducerId" as tagged
> fields the final version after KIP-890 and KIP-936 are implemented. Is this
> correct? I think the tags will need updating, but that is trivial.
>
> The final question I had was with respect to storing the new epoch. In
> KIP-890 part 2 (epoch bumps) I think we concluded that we don't need to
> store the epoch since we can interpret the previous epoch based on the
> producer ID. But here we could call the InitProducerId multiple times and
> we only want the producer with the correct epoch to be able to commit the
> transaction. Is that the correct reasoning for why we need epoch here but
> not the Prepare/Commit state.
>
> Thanks,
> Justine
>
> On Wed, Nov 22, 2023 at 9:48 AM Artem Livshits
>  wrote:
>
> > Hi Justine,
> >
> > After thinking a bit about supporting atomic dual writes for Kafka +
> NoSQL
> > database, I came to a conclusion that we do need to bump the epoch even
> > with InitProducerId(keepPreparedTxn=true).  As I described in my previous
> > email, we wouldn't need to bump the epoch to protect from zombies so that
> > reasoning is still true.  But we cannot protect from split-brain
> scenarios
> > when two or more instances of a producer with the same transactional id
> try
> > to produce at the same time.  The dual-write example for SQL databases (
> > https://github.com/apache/kafka/pull/14231/files) doesn't have a
> > split-brain problem because execution is protected by the update lock on
> > the transaction state record; however NoSQL databases may not have this
> > protection (I'll write an example for NoSQL database dual-write soon).
> >
> > In a nutshell, here is an example of a split-brain scenario:
> >
> >1. (instance1) InitProducerId(keepPreparedTxn=true), got epoch=42
> >2. (instance2) InitProducerId(keepPreparedTxn=true), got epoch=42
> >3. (instance1) CommitTxn, epoch bumped to 43
> >4. (instance2) CommitTxn, this is considered a retry, so it got epoch
> 43
> >as well
> >5. (instance1) Produce messageA w/sequence 1
> >6. (instance2) Produce messageB w/sequence 1, this is considered a
> >duplicate
> >7. (instance2) Produce messageC w/sequence 2
> >8. (instance1) Produce messageD w/sequence 2, this is considered a
> >duplicate
> >
> > Now if either of those commit the transaction, it would have a mix of
> > messages from the two instances (messageA and messageC).  With the proper
> > epoch bump, instance1 would get fenced at step 3.
> >
> > In order to update epoch in InitProducerId(keepPreparedTxn=true) we need
> to
> > preserve the ongoing transaction's epoch (and producerId, if the epoch
> > overflows), because we'd need to make a correct decision when we compare
> > the PreparedTxnState that we read from the database with the (producerId,
> > epoch) of the ongoing transaction.
> >
> > I've updated the KIP with the following:
> >
> >- Ongoing transaction now has 2 (producerId, epoch) pairs -- one pair
> >describes the ongoing transaction, the other pair describes expected
> > epoch
> >for operations on this transactional id
> >- InitProducerIdResponse now returns 2 (producerId, epoch) pairs
> >- TransactionalLogValue now has 2 (producerId, epoch) pairs, the new
> >values added as tagged fields, so it's easy to downgrade
> >- Added a note about downgrade in the Compatibility section
> >- Added a rejected alternative
> >
> > -Artem
> >
> > On Fri, Oct 6, 2023 at 5:16 PM Artem Livshits 
> > wrote:
> >
> > > Hi Justine,
> > >
> > > Thank you for the questions.  

Re: [DISCUSS] KIP-939: Support Participation in 2PC

2023-12-08 Thread Jun Rao
Hi, Artem,

Thanks for the KIP. A few comments below.

10. For the two new fields in Enable2Pc and KeepPreparedTxn in
InitProducerId, it would be useful to document a bit more detail on what
values are set under what cases. For example, are all four combinations
valid?

11.  InitProducerIdResponse: If there is no ongoing txn, what will
OngoingTxnProducerId and OngoingTxnEpoch be set?

12. ListTransactionsRequest related changes: It seems those are already
covered by KIP-994?

13. TransactionalLogValue: Could we name TransactionProducerId and
ProducerId better? It's not clear from the name which is for which.

14. "Note that the (producerId, epoch) pair that corresponds to the ongoing
transaction is going to be written instead of the existing ProducerId and
ProducerEpoch fields (which are renamed to reflect the semantics) to
support downgrade.": I am a bit confused on that. Are we writing different
values to the existing fields? Then, we can't downgrade, right?

15. active-transaction-total-time-max : Would
active-transaction-open-time-max be more intuitive? Also, could we include
the full name (group, tags, etc)?

16. "transaction.two.phase.commit.enable The default would be ‘false’.  If
it’s ‘false’, 2PC functionality is disabled even if the ACL is set, clients
that attempt to use this functionality would receive
TRANSACTIONAL_ID_AUTHORIZATION_FAILED error."
TRANSACTIONAL_ID_AUTHORIZATION_FAILED seems unintuitive for the client to
understand what the actual cause is.

17. completeTransaction(). We expect this to be only used during recovery.
Could we document this clearly? Could we prevent it from being used
incorrectly (e.g. throw an exception if the producer has called other
methods like send())?

18. "either prepareTransaction was called or initTransaction(true) was
called": "either" should be "neither"?

19. Since InitProducerId always bumps up the epoch, it creates a situation
where there could be multiple outstanding txns. The following is an example
of a potential problem during recovery.
   The last txn epoch in the external store is 41 when the app dies.
   Instance1 is created for recovery.
 1. (instance1) InitProducerId(keepPreparedTxn=true), epoch=42,
ongoingEpoch=41
 2. (instance1) dies before completeTxn(41) can be called.
   Instance2 is created for recovery.
 3. (instance2) InitProducerId(keepPreparedTxn=true), epoch=43,
ongoingEpoch=42
 4. (instance2) completeTxn(41) => abort
   The first problem is that 41 now is aborted when it should be committed.
The second one is that it's not clear who could abort epoch 42, which is
still open.

Jun


On Thu, Dec 7, 2023 at 2:43 PM Justine Olshan 
wrote:

> Hey Artem,
>
> Thanks for the updates. I think what you say makes sense. I just updated my
> KIP so I want to reconcile some of the changes we made especially with
> respect to the TransactionLogValue.
>
> Firstly, I believe tagged fields require a default value so that if they
> are not filled, we return the default (and know that they were empty). For
> my KIP, I proposed the default for producer ID tagged fields should be -1.
> I was wondering if we could update the KIP to include the default values
> for producer ID and epoch.
>
> Next, I noticed we decided to rename the fields. I guess that the field
> "NextProducerId" in my KIP correlates to "ProducerId" in this KIP. Is that
> correct? So we would have "TransactionProducerId" for the non-tagged field
> and have "ProducerId" (NextProducerId) and "PrevProducerId" as tagged
> fields the final version after KIP-890 and KIP-936 are implemented. Is this
> correct? I think the tags will need updating, but that is trivial.
>
> The final question I had was with respect to storing the new epoch. In
> KIP-890 part 2 (epoch bumps) I think we concluded that we don't need to
> store the epoch since we can interpret the previous epoch based on the
> producer ID. But here we could call the InitProducerId multiple times and
> we only want the producer with the correct epoch to be able to commit the
> transaction. Is that the correct reasoning for why we need epoch here but
> not the Prepare/Commit state.
>
> Thanks,
> Justine
>
> On Wed, Nov 22, 2023 at 9:48 AM Artem Livshits
>  wrote:
>
> > Hi Justine,
> >
> > After thinking a bit about supporting atomic dual writes for Kafka +
> NoSQL
> > database, I came to a conclusion that we do need to bump the epoch even
> > with InitProducerId(keepPreparedTxn=true).  As I described in my previous
> > email, we wouldn't need to bump the epoch to protect from zombies so that
> > reasoning is still true.  But we cannot protect from split-brain
> scenarios
> > when two or more instances of a producer with the same transactional id
> try
> > to produce at the same time.  The dual-write example for SQL databases (
> > https://github.com/apache/kafka/pull/14231/files) doesn't have a
> > split-brain problem because execution is protected by the update lock on
> > the transaction state record; however NoSQL 

Re: [DISCUSS] KIP-939: Support Participation in 2PC

2023-12-07 Thread Justine Olshan
Hey Artem,

Thanks for the updates. I think what you say makes sense. I just updated my
KIP so I want to reconcile some of the changes we made especially with
respect to the TransactionLogValue.

Firstly, I believe tagged fields require a default value so that if they
are not filled, we return the default (and know that they were empty). For
my KIP, I proposed the default for producer ID tagged fields should be -1.
I was wondering if we could update the KIP to include the default values
for producer ID and epoch.

Next, I noticed we decided to rename the fields. I guess that the field
"NextProducerId" in my KIP correlates to "ProducerId" in this KIP. Is that
correct? So we would have "TransactionProducerId" for the non-tagged field
and have "ProducerId" (NextProducerId) and "PrevProducerId" as tagged
fields the final version after KIP-890 and KIP-936 are implemented. Is this
correct? I think the tags will need updating, but that is trivial.

The final question I had was with respect to storing the new epoch. In
KIP-890 part 2 (epoch bumps) I think we concluded that we don't need to
store the epoch since we can interpret the previous epoch based on the
producer ID. But here we could call the InitProducerId multiple times and
we only want the producer with the correct epoch to be able to commit the
transaction. Is that the correct reasoning for why we need epoch here but
not the Prepare/Commit state.

Thanks,
Justine

On Wed, Nov 22, 2023 at 9:48 AM Artem Livshits
 wrote:

> Hi Justine,
>
> After thinking a bit about supporting atomic dual writes for Kafka + NoSQL
> database, I came to a conclusion that we do need to bump the epoch even
> with InitProducerId(keepPreparedTxn=true).  As I described in my previous
> email, we wouldn't need to bump the epoch to protect from zombies so that
> reasoning is still true.  But we cannot protect from split-brain scenarios
> when two or more instances of a producer with the same transactional id try
> to produce at the same time.  The dual-write example for SQL databases (
> https://github.com/apache/kafka/pull/14231/files) doesn't have a
> split-brain problem because execution is protected by the update lock on
> the transaction state record; however NoSQL databases may not have this
> protection (I'll write an example for NoSQL database dual-write soon).
>
> In a nutshell, here is an example of a split-brain scenario:
>
>1. (instance1) InitProducerId(keepPreparedTxn=true), got epoch=42
>2. (instance2) InitProducerId(keepPreparedTxn=true), got epoch=42
>3. (instance1) CommitTxn, epoch bumped to 43
>4. (instance2) CommitTxn, this is considered a retry, so it got epoch 43
>as well
>5. (instance1) Produce messageA w/sequence 1
>6. (instance2) Produce messageB w/sequence 1, this is considered a
>duplicate
>7. (instance2) Produce messageC w/sequence 2
>8. (instance1) Produce messageD w/sequence 2, this is considered a
>duplicate
>
> Now if either of those commit the transaction, it would have a mix of
> messages from the two instances (messageA and messageC).  With the proper
> epoch bump, instance1 would get fenced at step 3.
>
> In order to update epoch in InitProducerId(keepPreparedTxn=true) we need to
> preserve the ongoing transaction's epoch (and producerId, if the epoch
> overflows), because we'd need to make a correct decision when we compare
> the PreparedTxnState that we read from the database with the (producerId,
> epoch) of the ongoing transaction.
>
> I've updated the KIP with the following:
>
>- Ongoing transaction now has 2 (producerId, epoch) pairs -- one pair
>describes the ongoing transaction, the other pair describes expected
> epoch
>for operations on this transactional id
>- InitProducerIdResponse now returns 2 (producerId, epoch) pairs
>- TransactionalLogValue now has 2 (producerId, epoch) pairs, the new
>values added as tagged fields, so it's easy to downgrade
>- Added a note about downgrade in the Compatibility section
>- Added a rejected alternative
>
> -Artem
>
> On Fri, Oct 6, 2023 at 5:16 PM Artem Livshits 
> wrote:
>
> > Hi Justine,
> >
> > Thank you for the questions.  Currently (pre-KIP-939) we always bump the
> > epoch on InitProducerId and abort an ongoing transaction (if any).  I
> > expect this behavior will continue with KIP-890 as well.
> >
> > With KIP-939 we need to support the case when the ongoing transaction
> > needs to be preserved when keepPreparedTxn=true.  Bumping epoch without
> > aborting or committing a transaction is tricky because epoch is a short
> > value and it's easy to overflow.  Currently, the overflow case is handled
> > by aborting the ongoing transaction, which would send out transaction
> > markers with epoch=Short.MAX_VALUE to the partition leaders, which would
> > fence off any messages with the producer id that started the transaction
> > (they would have epoch that is less than Short.MAX_VALUE).  Then it is
> safe
> > to allocate a 

Re: [DISCUSS] KIP-939: Support Participation in 2PC

2023-11-22 Thread Artem Livshits
Hi Justine,

After thinking a bit about supporting atomic dual writes for Kafka + NoSQL
database, I came to a conclusion that we do need to bump the epoch even
with InitProducerId(keepPreparedTxn=true).  As I described in my previous
email, we wouldn't need to bump the epoch to protect from zombies so that
reasoning is still true.  But we cannot protect from split-brain scenarios
when two or more instances of a producer with the same transactional id try
to produce at the same time.  The dual-write example for SQL databases (
https://github.com/apache/kafka/pull/14231/files) doesn't have a
split-brain problem because execution is protected by the update lock on
the transaction state record; however NoSQL databases may not have this
protection (I'll write an example for NoSQL database dual-write soon).

In a nutshell, here is an example of a split-brain scenario:

   1. (instance1) InitProducerId(keepPreparedTxn=true), got epoch=42
   2. (instance2) InitProducerId(keepPreparedTxn=true), got epoch=42
   3. (instance1) CommitTxn, epoch bumped to 43
   4. (instance2) CommitTxn, this is considered a retry, so it got epoch 43
   as well
   5. (instance1) Produce messageA w/sequence 1
   6. (instance2) Produce messageB w/sequence 1, this is considered a
   duplicate
   7. (instance2) Produce messageC w/sequence 2
   8. (instance1) Produce messageD w/sequence 2, this is considered a
   duplicate

Now if either of those commit the transaction, it would have a mix of
messages from the two instances (messageA and messageC).  With the proper
epoch bump, instance1 would get fenced at step 3.

In order to update epoch in InitProducerId(keepPreparedTxn=true) we need to
preserve the ongoing transaction's epoch (and producerId, if the epoch
overflows), because we'd need to make a correct decision when we compare
the PreparedTxnState that we read from the database with the (producerId,
epoch) of the ongoing transaction.

I've updated the KIP with the following:

   - Ongoing transaction now has 2 (producerId, epoch) pairs -- one pair
   describes the ongoing transaction, the other pair describes expected epoch
   for operations on this transactional id
   - InitProducerIdResponse now returns 2 (producerId, epoch) pairs
   - TransactionalLogValue now has 2 (producerId, epoch) pairs, the new
   values added as tagged fields, so it's easy to downgrade
   - Added a note about downgrade in the Compatibility section
   - Added a rejected alternative

-Artem

On Fri, Oct 6, 2023 at 5:16 PM Artem Livshits 
wrote:

> Hi Justine,
>
> Thank you for the questions.  Currently (pre-KIP-939) we always bump the
> epoch on InitProducerId and abort an ongoing transaction (if any).  I
> expect this behavior will continue with KIP-890 as well.
>
> With KIP-939 we need to support the case when the ongoing transaction
> needs to be preserved when keepPreparedTxn=true.  Bumping epoch without
> aborting or committing a transaction is tricky because epoch is a short
> value and it's easy to overflow.  Currently, the overflow case is handled
> by aborting the ongoing transaction, which would send out transaction
> markers with epoch=Short.MAX_VALUE to the partition leaders, which would
> fence off any messages with the producer id that started the transaction
> (they would have epoch that is less than Short.MAX_VALUE).  Then it is safe
> to allocate a new producer id and use it in new transactions.
>
> We could say that maybe when keepPreparedTxn=true we bump epoch unless it
> leads to overflow, and don't bump epoch in the overflow case.  I don't
> think it's a good solution because if it's not safe to keep the same epoch
> when keepPreparedTxn=true, then we must handle the epoch overflow case as
> well.  So either we should convince ourselves that it's safe to keep the
> epoch and do it in the general case, or we always bump the epoch and handle
> the overflow.
>
> With KIP-890, we bump the epoch on every transaction commit / abort.  This
> guarantees that even if InitProducerId(keepPreparedTxn=true) doesn't
> increment epoch on the ongoing transaction, the client will have to call
> commit or abort to finish the transaction and will increment the epoch (and
> handle epoch overflow, if needed).  If the ongoing transaction was in a bad
> state and had some zombies waiting to arrive, the abort operation would
> fence them because with KIP-890 every abort would bump the epoch.
>
> We could also look at this from the following perspective.  With KIP-890,
> zombies won't be able to cross transaction boundaries; each transaction
> completion creates a boundary and any activity in the past gets confined in
> the boundary.  Then data in any partition would look like this:
>
> 1. message1, epoch=42
> 2. message2, epoch=42
> 3. message3, epoch=42
> 4. marker (commit or abort), epoch=43
>
> Now if we inject steps 3a and 3b like this:
>
> 1. message1, epoch=42
> 2. message2, epoch=42
> 3. message3, epoch=42
> 3a. crash
> 3b. 

Re: [DISCUSS] KIP-939: Support Participation in 2PC

2023-10-06 Thread Artem Livshits
Hi Raman,

Thank you for the questions.  Given that the primary effect of setting
enable2pc flag is disabling timeout, it makes sense to make enable2pc have
similar behavior w.r.t. when it can be set.

One clarification about the Ongoing case -- the current (pre-KIP-939)
behavior is to abort ongoing transaction and let the client retry
(eventually getting into CompleteAbort state), so even though transaction
timeout is not changed when actually hitting the ongoing transaction, the
new timeout value would take effect before the call completes to the
caller.  So if we look from the caller perspective, the transaction timeout
is set whenever the InitProducerId functionality is used.

-Artem

On Wed, Oct 4, 2023 at 8:58 PM Raman Verma  wrote:

> Hello Artem,
>
> Now that `InitProducerIdRequest` will have an extra parameter (enable2PC),
> can the client change the value of this parameter during an ongoing
> transaction.
>
> Here is how the transaction coordinator responds to InitProducerId requests
> according
> to the current transaction's state.
>
> - Empty | CompleteAbort | CompleteCommit
> Bump epoch and move to Empty state. Accept any changes from incoming
> InitProducerId
> request like transactionTimeoutMs
>
> - Ongoing
> Bump epoch and move to PrepareEpochFence state. Transaction time out is not
> changed.
>
> - PrepareAbort | PrepareCommit
> No changes internally. Return Concurrent transactions error to the client.
>
> I guess we should allow the same behavior for mutating enable2PC flag
> under these conditions as for transaction timeout value.
>


Re: [DISCUSS] KIP-939: Support Participation in 2PC

2023-10-06 Thread Artem Livshits
Hi Justine,

Thank you for the questions.  Currently (pre-KIP-939) we always bump the
epoch on InitProducerId and abort an ongoing transaction (if any).  I
expect this behavior will continue with KIP-890 as well.

With KIP-939 we need to support the case when the ongoing transaction needs
to be preserved when keepPreparedTxn=true.  Bumping epoch without aborting
or committing a transaction is tricky because epoch is a short value and
it's easy to overflow.  Currently, the overflow case is handled by aborting
the ongoing transaction, which would send out transaction markers with
epoch=Short.MAX_VALUE to the partition leaders, which would fence off any
messages with the producer id that started the transaction (they would have
epoch that is less than Short.MAX_VALUE).  Then it is safe to allocate a
new producer id and use it in new transactions.

We could say that maybe when keepPreparedTxn=true we bump epoch unless it
leads to overflow, and don't bump epoch in the overflow case.  I don't
think it's a good solution because if it's not safe to keep the same epoch
when keepPreparedTxn=true, then we must handle the epoch overflow case as
well.  So either we should convince ourselves that it's safe to keep the
epoch and do it in the general case, or we always bump the epoch and handle
the overflow.

With KIP-890, we bump the epoch on every transaction commit / abort.  This
guarantees that even if InitProducerId(keepPreparedTxn=true) doesn't
increment epoch on the ongoing transaction, the client will have to call
commit or abort to finish the transaction and will increment the epoch (and
handle epoch overflow, if needed).  If the ongoing transaction was in a bad
state and had some zombies waiting to arrive, the abort operation would
fence them because with KIP-890 every abort would bump the epoch.

We could also look at this from the following perspective.  With KIP-890,
zombies won't be able to cross transaction boundaries; each transaction
completion creates a boundary and any activity in the past gets confined in
the boundary.  Then data in any partition would look like this:

1. message1, epoch=42
2. message2, epoch=42
3. message3, epoch=42
4. marker (commit or abort), epoch=43

Now if we inject steps 3a and 3b like this:

1. message1, epoch=42
2. message2, epoch=42
3. message3, epoch=42
3a. crash
3b. InitProducerId(keepPreparedTxn=true)
4. marker (commit or abort), epoch=43

The invariant still holds even with steps 3a and 3b -- whatever activity
was in the past will get confined in the past with mandatory abort / commit
that must follow  InitProducerId(keepPreparedTxn=true).

So KIP-890 provides the proper isolation between transactions, so injecting
crash + InitProducerId(keepPreparedTxn=true) into the transaction sequence
is safe from the zombie protection perspective.

That said, I'm still thinking about it and looking for cases that might
break because we don't bump epoch when
InitProducerId(keepPreparedTxn=true), if such cases exist, we'll need to
develop the logic to handle epoch overflow for ongoing transactions.

-Artem



On Tue, Oct 3, 2023 at 10:15 AM Justine Olshan 
wrote:

> Hey Artem,
>
> Thanks for the KIP. I had a question about epoch bumping.
>
> Previously when we send an InitProducerId request on Producer startup, we
> bump the epoch and abort the transaction. Is it correct to assume that we
> will still bump the epoch, but just not abort the transaction?
> If we still bump the epoch in this case, how does this interact with
> KIP-890 where we also bump the epoch on every transaction. (I think this
> means that we may skip epochs and the data itself will all have the same
> epoch)
>
> I may have follow ups depending on the answer to this. :)
>
> Thanks,
> Justine
>
> On Thu, Sep 7, 2023 at 9:51 PM Artem Livshits
>  wrote:
>
> > Hi Alex,
> >
> > Thank you for your questions.
> >
> > > the purpose of having broker-level transaction.two.phase.commit.enable
> >
> > The thinking is that 2PC is a bit of an advanced construct so enabling
> 2PC
> > in a Kafka cluster should be an explicit decision.  If it is set to
> 'false'
> > InitiProducerId (and initTransactions) would
> > return TRANSACTIONAL_ID_AUTHORIZATION_FAILED.
> >
> > > WDYT about adding an AdminClient method that returns the state of
> > transaction.two.phase.commit.enable
> >
> > I wonder if the client could just try to use 2PC and then handle the
> error
> > (e.g. if it needs to fall back to ordinary transactions).  This way it
> > could uniformly handle cases when Kafka cluster doesn't support 2PC
> > completely and cases when 2PC is restricted to certain users.  We could
> > also expose this config in describeConfigs, if the fallback approach
> > doesn't work for some scenarios.
> >
> > -Artem
> >
> >
> > On Tue, Sep 5, 2023 at 12:45 PM Alexander Sorokoumov
> >  wrote:
> >
> > > Hi Artem,
> > >
> > > Thanks for publishing this KIP!
> > >
> > > Can you please clarify the purpose of having broker-level
> > > 

RE: [DISCUSS] KIP-939: Support Participation in 2PC

2023-10-04 Thread Raman Verma
Hello Artem,

Now that `InitProducerIdRequest` will have an extra parameter (enable2PC),
can the client change the value of this parameter during an ongoing
transaction.

Here is how the transaction coordinator responds to InitProducerId requests
according
to the current transaction's state.

- Empty | CompleteAbort | CompleteCommit
Bump epoch and move to Empty state. Accept any changes from incoming
InitProducerId
request like transactionTimeoutMs

- Ongoing
Bump epoch and move to PrepareEpochFence state. Transaction time out is not
changed.

- PrepareAbort | PrepareCommit
No changes internally. Return Concurrent transactions error to the client.

I guess we should allow the same behavior for mutating enable2PC flag
under these conditions as for transaction timeout value.


Re: [DISCUSS] KIP-939: Support Participation in 2PC

2023-10-03 Thread Justine Olshan
Hey Artem,

Thanks for the KIP. I had a question about epoch bumping.

Previously when we send an InitProducerId request on Producer startup, we
bump the epoch and abort the transaction. Is it correct to assume that we
will still bump the epoch, but just not abort the transaction?
If we still bump the epoch in this case, how does this interact with
KIP-890 where we also bump the epoch on every transaction. (I think this
means that we may skip epochs and the data itself will all have the same
epoch)

I may have follow ups depending on the answer to this. :)

Thanks,
Justine

On Thu, Sep 7, 2023 at 9:51 PM Artem Livshits
 wrote:

> Hi Alex,
>
> Thank you for your questions.
>
> > the purpose of having broker-level transaction.two.phase.commit.enable
>
> The thinking is that 2PC is a bit of an advanced construct so enabling 2PC
> in a Kafka cluster should be an explicit decision.  If it is set to 'false'
> InitiProducerId (and initTransactions) would
> return TRANSACTIONAL_ID_AUTHORIZATION_FAILED.
>
> > WDYT about adding an AdminClient method that returns the state of
> transaction.two.phase.commit.enable
>
> I wonder if the client could just try to use 2PC and then handle the error
> (e.g. if it needs to fall back to ordinary transactions).  This way it
> could uniformly handle cases when Kafka cluster doesn't support 2PC
> completely and cases when 2PC is restricted to certain users.  We could
> also expose this config in describeConfigs, if the fallback approach
> doesn't work for some scenarios.
>
> -Artem
>
>
> On Tue, Sep 5, 2023 at 12:45 PM Alexander Sorokoumov
>  wrote:
>
> > Hi Artem,
> >
> > Thanks for publishing this KIP!
> >
> > Can you please clarify the purpose of having broker-level
> > transaction.two.phase.commit.enable config in addition to the new ACL? If
> > the brokers are configured with
> transaction.two.phase.commit.enable=false,
> > at what point will a client configured with
> > transaction.two.phase.commit.enable=true fail? Will it happen at
> > KafkaProducer#initTransactions?
> >
> > WDYT about adding an AdminClient method that returns the state of t
> > ransaction.two.phase.commit.enable? This way, clients would know in
> advance
> > if 2PC is enabled on the brokers.
> >
> > Best,
> > Alex
> >
> > On Fri, Aug 25, 2023 at 9:40 AM Roger Hoover 
> > wrote:
> >
> > > Other than supporting multiplexing transactional streams on a single
> > > producer, I don't see how to improve it.
> > >
> > > On Thu, Aug 24, 2023 at 12:12 PM Artem Livshits
> > >  wrote:
> > >
> > > > Hi Roger,
> > > >
> > > > Thank you for summarizing the cons.  I agree and I'm curious what
> would
> > > be
> > > > the alternatives to solve these problems better and if they can be
> > > > incorporated into this proposal (or built independently in addition
> to
> > or
> > > > on top of this proposal).  E.g. one potential extension we discussed
> > > > earlier in the thread could be multiplexing logical transactional
> > > "streams"
> > > > with a single producer.
> > > >
> > > > -Artem
> > > >
> > > > On Wed, Aug 23, 2023 at 4:50 PM Roger Hoover  >
> > > > wrote:
> > > >
> > > > > Thanks.  I like that you're moving Kafka toward supporting this
> > > > dual-write
> > > > > pattern.  Each use case needs to consider the tradeoffs.  You
> already
> > > > > summarized the pros very well in the KIP.  I would summarize the
> cons
> > > > > as follows:
> > > > >
> > > > > - you sacrifice availability - each write requires both DB and
> Kafka
> > to
> > > > be
> > > > > available so I think your overall application availability is 1 -
> > p(DB
> > > is
> > > > > unavailable)*p(Kafka is unavailable).
> > > > > - latency will be higher and throughput lower - each write requires
> > > both
> > > > > writes to DB and Kafka while holding an exclusive lock in DB.
> > > > > - you need to create a producer per unit of concurrency in your app
> > > which
> > > > > has some overhead in the app and Kafka side (number of connections,
> > > poor
> > > > > batching).  I assume the producers would need to be configured for
> > low
> > > > > latency (linger.ms=0)
> > > > > - there's some complexity in managing stable transactional ids for
> > each
> > > > > producer/concurrency unit in your application.  With k8s
> deployment,
> > > you
> > > > > may need to switch to something like a StatefulSet that gives each
> > pod
> > > a
> > > > > stable identity across restarts.  On top of that pod identity which
> > you
> > > > can
> > > > > use as a prefix, you then assign unique transactional ids to each
> > > > > concurrency unit (thread/goroutine).
> > > > >
> > > > > On Wed, Aug 23, 2023 at 12:53 PM Artem Livshits
> > > > >  wrote:
> > > > >
> > > > > > Hi Roger,
> > > > > >
> > > > > > Thank you for the feedback.  You make a very good point that we
> > also
> > > > > > discussed internally.  Adding support for multiple concurrent
> > > > > > transactions in one producer could be valuable but it seems to
> be a
> > > > > fairly
> > > > > > large 

Re: [DISCUSS] KIP-939: Support Participation in 2PC

2023-09-07 Thread Artem Livshits
Hi Alex,

Thank you for your questions.

> the purpose of having broker-level transaction.two.phase.commit.enable

The thinking is that 2PC is a bit of an advanced construct so enabling 2PC
in a Kafka cluster should be an explicit decision.  If it is set to 'false'
InitiProducerId (and initTransactions) would
return TRANSACTIONAL_ID_AUTHORIZATION_FAILED.

> WDYT about adding an AdminClient method that returns the state of
transaction.two.phase.commit.enable

I wonder if the client could just try to use 2PC and then handle the error
(e.g. if it needs to fall back to ordinary transactions).  This way it
could uniformly handle cases when Kafka cluster doesn't support 2PC
completely and cases when 2PC is restricted to certain users.  We could
also expose this config in describeConfigs, if the fallback approach
doesn't work for some scenarios.

-Artem


On Tue, Sep 5, 2023 at 12:45 PM Alexander Sorokoumov
 wrote:

> Hi Artem,
>
> Thanks for publishing this KIP!
>
> Can you please clarify the purpose of having broker-level
> transaction.two.phase.commit.enable config in addition to the new ACL? If
> the brokers are configured with transaction.two.phase.commit.enable=false,
> at what point will a client configured with
> transaction.two.phase.commit.enable=true fail? Will it happen at
> KafkaProducer#initTransactions?
>
> WDYT about adding an AdminClient method that returns the state of t
> ransaction.two.phase.commit.enable? This way, clients would know in advance
> if 2PC is enabled on the brokers.
>
> Best,
> Alex
>
> On Fri, Aug 25, 2023 at 9:40 AM Roger Hoover 
> wrote:
>
> > Other than supporting multiplexing transactional streams on a single
> > producer, I don't see how to improve it.
> >
> > On Thu, Aug 24, 2023 at 12:12 PM Artem Livshits
> >  wrote:
> >
> > > Hi Roger,
> > >
> > > Thank you for summarizing the cons.  I agree and I'm curious what would
> > be
> > > the alternatives to solve these problems better and if they can be
> > > incorporated into this proposal (or built independently in addition to
> or
> > > on top of this proposal).  E.g. one potential extension we discussed
> > > earlier in the thread could be multiplexing logical transactional
> > "streams"
> > > with a single producer.
> > >
> > > -Artem
> > >
> > > On Wed, Aug 23, 2023 at 4:50 PM Roger Hoover 
> > > wrote:
> > >
> > > > Thanks.  I like that you're moving Kafka toward supporting this
> > > dual-write
> > > > pattern.  Each use case needs to consider the tradeoffs.  You already
> > > > summarized the pros very well in the KIP.  I would summarize the cons
> > > > as follows:
> > > >
> > > > - you sacrifice availability - each write requires both DB and Kafka
> to
> > > be
> > > > available so I think your overall application availability is 1 -
> p(DB
> > is
> > > > unavailable)*p(Kafka is unavailable).
> > > > - latency will be higher and throughput lower - each write requires
> > both
> > > > writes to DB and Kafka while holding an exclusive lock in DB.
> > > > - you need to create a producer per unit of concurrency in your app
> > which
> > > > has some overhead in the app and Kafka side (number of connections,
> > poor
> > > > batching).  I assume the producers would need to be configured for
> low
> > > > latency (linger.ms=0)
> > > > - there's some complexity in managing stable transactional ids for
> each
> > > > producer/concurrency unit in your application.  With k8s deployment,
> > you
> > > > may need to switch to something like a StatefulSet that gives each
> pod
> > a
> > > > stable identity across restarts.  On top of that pod identity which
> you
> > > can
> > > > use as a prefix, you then assign unique transactional ids to each
> > > > concurrency unit (thread/goroutine).
> > > >
> > > > On Wed, Aug 23, 2023 at 12:53 PM Artem Livshits
> > > >  wrote:
> > > >
> > > > > Hi Roger,
> > > > >
> > > > > Thank you for the feedback.  You make a very good point that we
> also
> > > > > discussed internally.  Adding support for multiple concurrent
> > > > > transactions in one producer could be valuable but it seems to be a
> > > > fairly
> > > > > large and independent change that would deserve a separate KIP.  If
> > > such
> > > > > support is added we could modify 2PC functionality to incorporate
> > that.
> > > > >
> > > > > > Maybe not too bad but a bit of pain to manage these ids inside
> each
> > > > > process and across all application processes.
> > > > >
> > > > > I'm not sure if supporting multiple transactions in one producer
> > would
> > > > make
> > > > > id management simpler: we'd need to store a piece of data per
> > > > transaction,
> > > > > so whether it's N producers with a single transaction or N
> > transactions
> > > > > with a single producer, it's still roughly the same amount of data
> to
> > > > > manage.  In fact, managing transactional ids (current proposal)
> might
> > > be
> > > > > easier, because the id is controlled by the application and it
> knows
> > > how
> > > > to
> > > > > 

Re: [DISCUSS] KIP-939: Support Participation in 2PC

2023-09-05 Thread Alexander Sorokoumov
Hi Artem,

Thanks for publishing this KIP!

Can you please clarify the purpose of having broker-level
transaction.two.phase.commit.enable config in addition to the new ACL? If
the brokers are configured with transaction.two.phase.commit.enable=false,
at what point will a client configured with
transaction.two.phase.commit.enable=true fail? Will it happen at
KafkaProducer#initTransactions?

WDYT about adding an AdminClient method that returns the state of t
ransaction.two.phase.commit.enable? This way, clients would know in advance
if 2PC is enabled on the brokers.

Best,
Alex

On Fri, Aug 25, 2023 at 9:40 AM Roger Hoover  wrote:

> Other than supporting multiplexing transactional streams on a single
> producer, I don't see how to improve it.
>
> On Thu, Aug 24, 2023 at 12:12 PM Artem Livshits
>  wrote:
>
> > Hi Roger,
> >
> > Thank you for summarizing the cons.  I agree and I'm curious what would
> be
> > the alternatives to solve these problems better and if they can be
> > incorporated into this proposal (or built independently in addition to or
> > on top of this proposal).  E.g. one potential extension we discussed
> > earlier in the thread could be multiplexing logical transactional
> "streams"
> > with a single producer.
> >
> > -Artem
> >
> > On Wed, Aug 23, 2023 at 4:50 PM Roger Hoover 
> > wrote:
> >
> > > Thanks.  I like that you're moving Kafka toward supporting this
> > dual-write
> > > pattern.  Each use case needs to consider the tradeoffs.  You already
> > > summarized the pros very well in the KIP.  I would summarize the cons
> > > as follows:
> > >
> > > - you sacrifice availability - each write requires both DB and Kafka to
> > be
> > > available so I think your overall application availability is 1 - p(DB
> is
> > > unavailable)*p(Kafka is unavailable).
> > > - latency will be higher and throughput lower - each write requires
> both
> > > writes to DB and Kafka while holding an exclusive lock in DB.
> > > - you need to create a producer per unit of concurrency in your app
> which
> > > has some overhead in the app and Kafka side (number of connections,
> poor
> > > batching).  I assume the producers would need to be configured for low
> > > latency (linger.ms=0)
> > > - there's some complexity in managing stable transactional ids for each
> > > producer/concurrency unit in your application.  With k8s deployment,
> you
> > > may need to switch to something like a StatefulSet that gives each pod
> a
> > > stable identity across restarts.  On top of that pod identity which you
> > can
> > > use as a prefix, you then assign unique transactional ids to each
> > > concurrency unit (thread/goroutine).
> > >
> > > On Wed, Aug 23, 2023 at 12:53 PM Artem Livshits
> > >  wrote:
> > >
> > > > Hi Roger,
> > > >
> > > > Thank you for the feedback.  You make a very good point that we also
> > > > discussed internally.  Adding support for multiple concurrent
> > > > transactions in one producer could be valuable but it seems to be a
> > > fairly
> > > > large and independent change that would deserve a separate KIP.  If
> > such
> > > > support is added we could modify 2PC functionality to incorporate
> that.
> > > >
> > > > > Maybe not too bad but a bit of pain to manage these ids inside each
> > > > process and across all application processes.
> > > >
> > > > I'm not sure if supporting multiple transactions in one producer
> would
> > > make
> > > > id management simpler: we'd need to store a piece of data per
> > > transaction,
> > > > so whether it's N producers with a single transaction or N
> transactions
> > > > with a single producer, it's still roughly the same amount of data to
> > > > manage.  In fact, managing transactional ids (current proposal) might
> > be
> > > > easier, because the id is controlled by the application and it knows
> > how
> > > to
> > > > complete the transaction after crash / restart; while a TID would be
> > > > generated by Kafka and that would create a question of starting Kafka
> > > > transaction, but not saving its TID and then crashing, then figuring
> > out
> > > > which transactions to abort and etc.
> > > >
> > > > > 2) creating a separate producer for each concurrency slot in the
> > > > application
> > > >
> > > > This is a very valid concern.  Maybe we'd need to have some
> > multiplexing
> > > of
> > > > transactional logical "streams" over the same connection.  Seems
> like a
> > > > separate KIP, though.
> > > >
> > > > > Otherwise, it seems you're left with single-threaded model per
> > > > application process?
> > > >
> > > > That's a fair assessment.  Not necessarily exactly single-threaded
> per
> > > > application, but a single producer per thread model (i.e. an
> > application
> > > > could have a pool of threads + producers to increase concurrency).
> > > >
> > > > -Artem
> > > >
> > > > On Tue, Aug 22, 2023 at 7:22 PM Roger Hoover  >
> > > > wrote:
> > > >
> > > > > Artem,
> > > > >
> > > > > Thanks for the reply.
> > > > >
> > > > > If I 

Re: [DISCUSS] KIP-939: Support Participation in 2PC

2023-08-25 Thread Roger Hoover
Other than supporting multiplexing transactional streams on a single
producer, I don't see how to improve it.

On Thu, Aug 24, 2023 at 12:12 PM Artem Livshits
 wrote:

> Hi Roger,
>
> Thank you for summarizing the cons.  I agree and I'm curious what would be
> the alternatives to solve these problems better and if they can be
> incorporated into this proposal (or built independently in addition to or
> on top of this proposal).  E.g. one potential extension we discussed
> earlier in the thread could be multiplexing logical transactional "streams"
> with a single producer.
>
> -Artem
>
> On Wed, Aug 23, 2023 at 4:50 PM Roger Hoover 
> wrote:
>
> > Thanks.  I like that you're moving Kafka toward supporting this
> dual-write
> > pattern.  Each use case needs to consider the tradeoffs.  You already
> > summarized the pros very well in the KIP.  I would summarize the cons
> > as follows:
> >
> > - you sacrifice availability - each write requires both DB and Kafka to
> be
> > available so I think your overall application availability is 1 - p(DB is
> > unavailable)*p(Kafka is unavailable).
> > - latency will be higher and throughput lower - each write requires both
> > writes to DB and Kafka while holding an exclusive lock in DB.
> > - you need to create a producer per unit of concurrency in your app which
> > has some overhead in the app and Kafka side (number of connections, poor
> > batching).  I assume the producers would need to be configured for low
> > latency (linger.ms=0)
> > - there's some complexity in managing stable transactional ids for each
> > producer/concurrency unit in your application.  With k8s deployment, you
> > may need to switch to something like a StatefulSet that gives each pod a
> > stable identity across restarts.  On top of that pod identity which you
> can
> > use as a prefix, you then assign unique transactional ids to each
> > concurrency unit (thread/goroutine).
> >
> > On Wed, Aug 23, 2023 at 12:53 PM Artem Livshits
> >  wrote:
> >
> > > Hi Roger,
> > >
> > > Thank you for the feedback.  You make a very good point that we also
> > > discussed internally.  Adding support for multiple concurrent
> > > transactions in one producer could be valuable but it seems to be a
> > fairly
> > > large and independent change that would deserve a separate KIP.  If
> such
> > > support is added we could modify 2PC functionality to incorporate that.
> > >
> > > > Maybe not too bad but a bit of pain to manage these ids inside each
> > > process and across all application processes.
> > >
> > > I'm not sure if supporting multiple transactions in one producer would
> > make
> > > id management simpler: we'd need to store a piece of data per
> > transaction,
> > > so whether it's N producers with a single transaction or N transactions
> > > with a single producer, it's still roughly the same amount of data to
> > > manage.  In fact, managing transactional ids (current proposal) might
> be
> > > easier, because the id is controlled by the application and it knows
> how
> > to
> > > complete the transaction after crash / restart; while a TID would be
> > > generated by Kafka and that would create a question of starting Kafka
> > > transaction, but not saving its TID and then crashing, then figuring
> out
> > > which transactions to abort and etc.
> > >
> > > > 2) creating a separate producer for each concurrency slot in the
> > > application
> > >
> > > This is a very valid concern.  Maybe we'd need to have some
> multiplexing
> > of
> > > transactional logical "streams" over the same connection.  Seems like a
> > > separate KIP, though.
> > >
> > > > Otherwise, it seems you're left with single-threaded model per
> > > application process?
> > >
> > > That's a fair assessment.  Not necessarily exactly single-threaded per
> > > application, but a single producer per thread model (i.e. an
> application
> > > could have a pool of threads + producers to increase concurrency).
> > >
> > > -Artem
> > >
> > > On Tue, Aug 22, 2023 at 7:22 PM Roger Hoover 
> > > wrote:
> > >
> > > > Artem,
> > > >
> > > > Thanks for the reply.
> > > >
> > > > If I understand correctly, Kafka does not support concurrent
> > transactions
> > > > from the same producer (transactional id).  I think this means that
> > > > applications that want to support in-process concurrency (say
> > > thread-level
> > > > concurrency with row-level DB locking) would need to manage separate
> > > > transactional ids and producers per thread and then store txn state
> > > > accordingly.   The potential usability downsides I see are
> > > > 1) managing a set of transactional ids for each application process
> > that
> > > > scales up to it's max concurrency.  Maybe not too bad but a bit of
> pain
> > > to
> > > > manage these ids inside each process and across all application
> > > processes.
> > > > 2) creating a separate producer for each concurrency slot in the
> > > > application - this could create a lot more producers and resultant
> > > 

Re: [DISCUSS] KIP-939: Support Participation in 2PC

2023-08-24 Thread Artem Livshits
Hi Roger,

Thank you for summarizing the cons.  I agree and I'm curious what would be
the alternatives to solve these problems better and if they can be
incorporated into this proposal (or built independently in addition to or
on top of this proposal).  E.g. one potential extension we discussed
earlier in the thread could be multiplexing logical transactional "streams"
with a single producer.

-Artem

On Wed, Aug 23, 2023 at 4:50 PM Roger Hoover  wrote:

> Thanks.  I like that you're moving Kafka toward supporting this dual-write
> pattern.  Each use case needs to consider the tradeoffs.  You already
> summarized the pros very well in the KIP.  I would summarize the cons
> as follows:
>
> - you sacrifice availability - each write requires both DB and Kafka to be
> available so I think your overall application availability is 1 - p(DB is
> unavailable)*p(Kafka is unavailable).
> - latency will be higher and throughput lower - each write requires both
> writes to DB and Kafka while holding an exclusive lock in DB.
> - you need to create a producer per unit of concurrency in your app which
> has some overhead in the app and Kafka side (number of connections, poor
> batching).  I assume the producers would need to be configured for low
> latency (linger.ms=0)
> - there's some complexity in managing stable transactional ids for each
> producer/concurrency unit in your application.  With k8s deployment, you
> may need to switch to something like a StatefulSet that gives each pod a
> stable identity across restarts.  On top of that pod identity which you can
> use as a prefix, you then assign unique transactional ids to each
> concurrency unit (thread/goroutine).
>
> On Wed, Aug 23, 2023 at 12:53 PM Artem Livshits
>  wrote:
>
> > Hi Roger,
> >
> > Thank you for the feedback.  You make a very good point that we also
> > discussed internally.  Adding support for multiple concurrent
> > transactions in one producer could be valuable but it seems to be a
> fairly
> > large and independent change that would deserve a separate KIP.  If such
> > support is added we could modify 2PC functionality to incorporate that.
> >
> > > Maybe not too bad but a bit of pain to manage these ids inside each
> > process and across all application processes.
> >
> > I'm not sure if supporting multiple transactions in one producer would
> make
> > id management simpler: we'd need to store a piece of data per
> transaction,
> > so whether it's N producers with a single transaction or N transactions
> > with a single producer, it's still roughly the same amount of data to
> > manage.  In fact, managing transactional ids (current proposal) might be
> > easier, because the id is controlled by the application and it knows how
> to
> > complete the transaction after crash / restart; while a TID would be
> > generated by Kafka and that would create a question of starting Kafka
> > transaction, but not saving its TID and then crashing, then figuring out
> > which transactions to abort and etc.
> >
> > > 2) creating a separate producer for each concurrency slot in the
> > application
> >
> > This is a very valid concern.  Maybe we'd need to have some multiplexing
> of
> > transactional logical "streams" over the same connection.  Seems like a
> > separate KIP, though.
> >
> > > Otherwise, it seems you're left with single-threaded model per
> > application process?
> >
> > That's a fair assessment.  Not necessarily exactly single-threaded per
> > application, but a single producer per thread model (i.e. an application
> > could have a pool of threads + producers to increase concurrency).
> >
> > -Artem
> >
> > On Tue, Aug 22, 2023 at 7:22 PM Roger Hoover 
> > wrote:
> >
> > > Artem,
> > >
> > > Thanks for the reply.
> > >
> > > If I understand correctly, Kafka does not support concurrent
> transactions
> > > from the same producer (transactional id).  I think this means that
> > > applications that want to support in-process concurrency (say
> > thread-level
> > > concurrency with row-level DB locking) would need to manage separate
> > > transactional ids and producers per thread and then store txn state
> > > accordingly.   The potential usability downsides I see are
> > > 1) managing a set of transactional ids for each application process
> that
> > > scales up to it's max concurrency.  Maybe not too bad but a bit of pain
> > to
> > > manage these ids inside each process and across all application
> > processes.
> > > 2) creating a separate producer for each concurrency slot in the
> > > application - this could create a lot more producers and resultant
> > > connections to Kafka than the typical model of a single producer per
> > > process.
> > >
> > > Otherwise, it seems you're left with single-threaded model per
> > application
> > > process?
> > >
> > > Thanks,
> > >
> > > Roger
> > >
> > > On Tue, Aug 22, 2023 at 5:11 PM Artem Livshits
> > >  wrote:
> > >
> > > > Hi Roger, Arjun,
> > > >
> > > > Thank you for the questions.
> > > > > It 

Re: [DISCUSS] KIP-939: Support Participation in 2PC

2023-08-24 Thread Artem Livshits
Hi Guy,

You raise a very good point.  Supporting XA sounds like a good way to
integrate Kafka and it's something that I think we should support at
some point in the future.  For this KIP, though, we thought we focus on a
more basic functionality keeping the following in mind:

1. XA is not universally supported and it would be good to integrate with
systems that just have local transactions support (which would include
systems that are not "proper" databases, like Zookeeper, RocksDB, etc.).
2. More advanced functionality, like XA, we should be able to implement on
top of KIP-939 as a library or a recipe.

I would like to hear your thoughts on #2 specifically -- do you think that
we actually need to amend KIP-939 to enable XA in the future, or could the
XA changes be done incrementally on top of KIP-939?

-Artem

On Wed, Aug 23, 2023 at 5:23 AM  wrote:

> Hi,
>
> Nice idea, but you could maximise compatibility if you adhere to XA
> standard APIs rather than Kafka internal APIs.
>
> We at Atomikos offer 2PC coordination and recovery and we are happy to
> help you design this, it's a service we usually offer for free to backend
> vendors / systems.
>
> Let me know if you'd like to explore?
>
> Guy
>
>
> On 2023/08/17 06:39:57 Artem Livshits wrote:
> > Hello,
> >
> >  This is a discussion thread for
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-939%3A+Support+Participation+in+2PC
> >  .
> >
> >  The KIP proposes extending Kafka transaction support (that already uses
> 2PC
> >  under the hood) to enable atomicity of dual writes to Kafka and an
> external
> >  database, and helps to fix a long standing Flink issue.
> >
> >  An example of code that uses the dual write recipe with JDBC and should
> >  work for most SQL databases is here
> >  https://github.com/apache/kafka/pull/14231.
> >
> >  The FLIP for the sister fix in Flink is here
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255071710
> >
> >  -Artem
>
>
> Sent with Spark
>


Re: [DISCUSS] KIP-939: Support Participation in 2PC

2023-08-23 Thread Roger Hoover
Thanks.  I like that you're moving Kafka toward supporting this dual-write
pattern.  Each use case needs to consider the tradeoffs.  You already
summarized the pros very well in the KIP.  I would summarize the cons
as follows:

- you sacrifice availability - each write requires both DB and Kafka to be
available so I think your overall application availability is 1 - p(DB is
unavailable)*p(Kafka is unavailable).
- latency will be higher and throughput lower - each write requires both
writes to DB and Kafka while holding an exclusive lock in DB.
- you need to create a producer per unit of concurrency in your app which
has some overhead in the app and Kafka side (number of connections, poor
batching).  I assume the producers would need to be configured for low
latency (linger.ms=0)
- there's some complexity in managing stable transactional ids for each
producer/concurrency unit in your application.  With k8s deployment, you
may need to switch to something like a StatefulSet that gives each pod a
stable identity across restarts.  On top of that pod identity which you can
use as a prefix, you then assign unique transactional ids to each
concurrency unit (thread/goroutine).

On Wed, Aug 23, 2023 at 12:53 PM Artem Livshits
 wrote:

> Hi Roger,
>
> Thank you for the feedback.  You make a very good point that we also
> discussed internally.  Adding support for multiple concurrent
> transactions in one producer could be valuable but it seems to be a fairly
> large and independent change that would deserve a separate KIP.  If such
> support is added we could modify 2PC functionality to incorporate that.
>
> > Maybe not too bad but a bit of pain to manage these ids inside each
> process and across all application processes.
>
> I'm not sure if supporting multiple transactions in one producer would make
> id management simpler: we'd need to store a piece of data per transaction,
> so whether it's N producers with a single transaction or N transactions
> with a single producer, it's still roughly the same amount of data to
> manage.  In fact, managing transactional ids (current proposal) might be
> easier, because the id is controlled by the application and it knows how to
> complete the transaction after crash / restart; while a TID would be
> generated by Kafka and that would create a question of starting Kafka
> transaction, but not saving its TID and then crashing, then figuring out
> which transactions to abort and etc.
>
> > 2) creating a separate producer for each concurrency slot in the
> application
>
> This is a very valid concern.  Maybe we'd need to have some multiplexing of
> transactional logical "streams" over the same connection.  Seems like a
> separate KIP, though.
>
> > Otherwise, it seems you're left with single-threaded model per
> application process?
>
> That's a fair assessment.  Not necessarily exactly single-threaded per
> application, but a single producer per thread model (i.e. an application
> could have a pool of threads + producers to increase concurrency).
>
> -Artem
>
> On Tue, Aug 22, 2023 at 7:22 PM Roger Hoover 
> wrote:
>
> > Artem,
> >
> > Thanks for the reply.
> >
> > If I understand correctly, Kafka does not support concurrent transactions
> > from the same producer (transactional id).  I think this means that
> > applications that want to support in-process concurrency (say
> thread-level
> > concurrency with row-level DB locking) would need to manage separate
> > transactional ids and producers per thread and then store txn state
> > accordingly.   The potential usability downsides I see are
> > 1) managing a set of transactional ids for each application process that
> > scales up to it's max concurrency.  Maybe not too bad but a bit of pain
> to
> > manage these ids inside each process and across all application
> processes.
> > 2) creating a separate producer for each concurrency slot in the
> > application - this could create a lot more producers and resultant
> > connections to Kafka than the typical model of a single producer per
> > process.
> >
> > Otherwise, it seems you're left with single-threaded model per
> application
> > process?
> >
> > Thanks,
> >
> > Roger
> >
> > On Tue, Aug 22, 2023 at 5:11 PM Artem Livshits
> >  wrote:
> >
> > > Hi Roger, Arjun,
> > >
> > > Thank you for the questions.
> > > > It looks like the application must have stable transactional ids over
> > > time?
> > >
> > > The transactional id should uniquely identify a producer instance and
> > needs
> > > to be stable across the restarts.  If the transactional id is not
> stable
> > > across restarts, then zombie messages from a previous incarnation of
> the
> > > producer may violate atomicity.  If there are 2 producer instances
> > > concurrently producing data with the same transactional id, they are
> > going
> > > to constantly fence each other and most likely make little or no
> > progress.
> > >
> > > The name might be a little bit confusing as it may be mistaken for a
> > > transaction 

Re: [DISCUSS] KIP-939: Support Participation in 2PC

2023-08-23 Thread Artem Livshits
Hi Roger,

Thank you for the feedback.  You make a very good point that we also
discussed internally.  Adding support for multiple concurrent
transactions in one producer could be valuable but it seems to be a fairly
large and independent change that would deserve a separate KIP.  If such
support is added we could modify 2PC functionality to incorporate that.

> Maybe not too bad but a bit of pain to manage these ids inside each
process and across all application processes.

I'm not sure if supporting multiple transactions in one producer would make
id management simpler: we'd need to store a piece of data per transaction,
so whether it's N producers with a single transaction or N transactions
with a single producer, it's still roughly the same amount of data to
manage.  In fact, managing transactional ids (current proposal) might be
easier, because the id is controlled by the application and it knows how to
complete the transaction after crash / restart; while a TID would be
generated by Kafka and that would create a question of starting Kafka
transaction, but not saving its TID and then crashing, then figuring out
which transactions to abort and etc.

> 2) creating a separate producer for each concurrency slot in the
application

This is a very valid concern.  Maybe we'd need to have some multiplexing of
transactional logical "streams" over the same connection.  Seems like a
separate KIP, though.

> Otherwise, it seems you're left with single-threaded model per
application process?

That's a fair assessment.  Not necessarily exactly single-threaded per
application, but a single producer per thread model (i.e. an application
could have a pool of threads + producers to increase concurrency).

-Artem

On Tue, Aug 22, 2023 at 7:22 PM Roger Hoover  wrote:

> Artem,
>
> Thanks for the reply.
>
> If I understand correctly, Kafka does not support concurrent transactions
> from the same producer (transactional id).  I think this means that
> applications that want to support in-process concurrency (say thread-level
> concurrency with row-level DB locking) would need to manage separate
> transactional ids and producers per thread and then store txn state
> accordingly.   The potential usability downsides I see are
> 1) managing a set of transactional ids for each application process that
> scales up to it's max concurrency.  Maybe not too bad but a bit of pain to
> manage these ids inside each process and across all application processes.
> 2) creating a separate producer for each concurrency slot in the
> application - this could create a lot more producers and resultant
> connections to Kafka than the typical model of a single producer per
> process.
>
> Otherwise, it seems you're left with single-threaded model per application
> process?
>
> Thanks,
>
> Roger
>
> On Tue, Aug 22, 2023 at 5:11 PM Artem Livshits
>  wrote:
>
> > Hi Roger, Arjun,
> >
> > Thank you for the questions.
> > > It looks like the application must have stable transactional ids over
> > time?
> >
> > The transactional id should uniquely identify a producer instance and
> needs
> > to be stable across the restarts.  If the transactional id is not stable
> > across restarts, then zombie messages from a previous incarnation of the
> > producer may violate atomicity.  If there are 2 producer instances
> > concurrently producing data with the same transactional id, they are
> going
> > to constantly fence each other and most likely make little or no
> progress.
> >
> > The name might be a little bit confusing as it may be mistaken for a
> > transaction id / TID that uniquely identifies every transaction.  The
> name
> > and the semantics were defined in the original exactly-once-semantics
> (EoS)
> > proposal (
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging
> > )
> > and KIP-939 just build on top of that.
> >
> > > I'm curious to understand what happens if the producer dies, and does
> not
> > come up and recover the pending transaction within the transaction
> timeout
> > interval.
> >
> > If the producer / application never comes back, the transaction will
> remain
> > in prepared (a.k.a. "in-doubt") state until an operator forcefully
> > terminates the transaction.  That's why there is a new ACL is defined in
> > this proposal -- this functionality should only provided to applications
> > that implement proper recovery logic.
> >
> > -Artem
> >
> > On Tue, Aug 22, 2023 at 12:52 AM Arjun Satish 
> > wrote:
> >
> > > Hello Artem,
> > >
> > > Thanks for the KIP.
> > >
> > > I have the same question as Roger on concurrent writes, and an
> additional
> > > one on consumer behavior. Typically, transactions will timeout if not
> > > committed within some time interval. With the proposed changes in this
> > KIP,
> > > consumers cannot consume past the ongoing transaction. I'm curious to
> > > understand what happens if the producer dies, and does not come up and
> > > recover the 

RE: [DISCUSS] KIP-939: Support Participation in 2PC

2023-08-23 Thread guy
Hi,

Nice idea, but you could maximise compatibility if you adhere to XA standard 
APIs rather than Kafka internal APIs.

We at Atomikos offer 2PC coordination and recovery and we are happy to help you 
design this, it's a service we usually offer for free to backend vendors / 
systems.

Let me know if you'd like to explore?

Guy


On 2023/08/17 06:39:57 Artem Livshits wrote:
> Hello,
>
>  This is a discussion thread for
>  
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-939%3A+Support+Participation+in+2PC
>  .
>
>  The KIP proposes extending Kafka transaction support (that already uses 2PC
>  under the hood) to enable atomicity of dual writes to Kafka and an external
>  database, and helps to fix a long standing Flink issue.
>
>  An example of code that uses the dual write recipe with JDBC and should
>  work for most SQL databases is here
>  https://github.com/apache/kafka/pull/14231.
>
>  The FLIP for the sister fix in Flink is here
>  https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255071710
>
>  -Artem


Sent with Spark


Re: [DISCUSS] KIP-939: Support Participation in 2PC

2023-08-22 Thread Roger Hoover
Artem,

Thanks for the reply.

If I understand correctly, Kafka does not support concurrent transactions
from the same producer (transactional id).  I think this means that
applications that want to support in-process concurrency (say thread-level
concurrency with row-level DB locking) would need to manage separate
transactional ids and producers per thread and then store txn state
accordingly.   The potential usability downsides I see are
1) managing a set of transactional ids for each application process that
scales up to it's max concurrency.  Maybe not too bad but a bit of pain to
manage these ids inside each process and across all application processes.
2) creating a separate producer for each concurrency slot in the
application - this could create a lot more producers and resultant
connections to Kafka than the typical model of a single producer per
process.

Otherwise, it seems you're left with single-threaded model per application
process?

Thanks,

Roger

On Tue, Aug 22, 2023 at 5:11 PM Artem Livshits
 wrote:

> Hi Roger, Arjun,
>
> Thank you for the questions.
> > It looks like the application must have stable transactional ids over
> time?
>
> The transactional id should uniquely identify a producer instance and needs
> to be stable across the restarts.  If the transactional id is not stable
> across restarts, then zombie messages from a previous incarnation of the
> producer may violate atomicity.  If there are 2 producer instances
> concurrently producing data with the same transactional id, they are going
> to constantly fence each other and most likely make little or no progress.
>
> The name might be a little bit confusing as it may be mistaken for a
> transaction id / TID that uniquely identifies every transaction.  The name
> and the semantics were defined in the original exactly-once-semantics (EoS)
> proposal (
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging
> )
> and KIP-939 just build on top of that.
>
> > I'm curious to understand what happens if the producer dies, and does not
> come up and recover the pending transaction within the transaction timeout
> interval.
>
> If the producer / application never comes back, the transaction will remain
> in prepared (a.k.a. "in-doubt") state until an operator forcefully
> terminates the transaction.  That's why there is a new ACL is defined in
> this proposal -- this functionality should only provided to applications
> that implement proper recovery logic.
>
> -Artem
>
> On Tue, Aug 22, 2023 at 12:52 AM Arjun Satish 
> wrote:
>
> > Hello Artem,
> >
> > Thanks for the KIP.
> >
> > I have the same question as Roger on concurrent writes, and an additional
> > one on consumer behavior. Typically, transactions will timeout if not
> > committed within some time interval. With the proposed changes in this
> KIP,
> > consumers cannot consume past the ongoing transaction. I'm curious to
> > understand what happens if the producer dies, and does not come up and
> > recover the pending transaction within the transaction timeout interval.
> Or
> > are we saying that when used in this 2PC context, we should configure
> these
> > transaction timeouts to very large durations?
> >
> > Thanks in advance!
> >
> > Best,
> > Arjun
> >
> >
> > On Mon, Aug 21, 2023 at 1:06 PM Roger Hoover 
> > wrote:
> >
> > > Hi Artem,
> > >
> > > Thanks for writing this KIP.  Can you clarify the requirements a bit
> more
> > > for managing transaction state?  It looks like the application must
> have
> > > stable transactional ids over time?   What is the granularity of those
> > ids
> > > and producers?  Say the application is a multi-threaded Java web
> server,
> > > can/should all the concurrent threads share a transactional id and
> > > producer?  That doesn't seem right to me unless the application is
> using
> > > global DB locks that serialize all requests.  Instead, if the
> application
> > > uses row-level DB locks, there could be multiple, concurrent,
> independent
> > > txns happening in the same JVM so it seems like the granularity
> managing
> > > transactional ids and txn state needs to line up with granularity of
> the
> > DB
> > > locking.
> > >
> > > Does that make sense or am I misunderstanding?
> > >
> > > Thanks,
> > >
> > > Roger
> > >
> > > On Wed, Aug 16, 2023 at 11:40 PM Artem Livshits
> > >  wrote:
> > >
> > > > Hello,
> > > >
> > > > This is a discussion thread for
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-939%3A+Support+Participation+in+2PC
> > > > .
> > > >
> > > > The KIP proposes extending Kafka transaction support (that already
> uses
> > > 2PC
> > > > under the hood) to enable atomicity of dual writes to Kafka and an
> > > external
> > > > database, and helps to fix a long standing Flink issue.
> > > >
> > > > An example of code that uses the dual write recipe with JDBC and
> should
> > > > work for most SQL databases is here
> > > > 

Re: [DISCUSS] KIP-939: Support Participation in 2PC

2023-08-22 Thread Artem Livshits
Hi Roger, Arjun,

Thank you for the questions.
> It looks like the application must have stable transactional ids over
time?

The transactional id should uniquely identify a producer instance and needs
to be stable across the restarts.  If the transactional id is not stable
across restarts, then zombie messages from a previous incarnation of the
producer may violate atomicity.  If there are 2 producer instances
concurrently producing data with the same transactional id, they are going
to constantly fence each other and most likely make little or no progress.

The name might be a little bit confusing as it may be mistaken for a
transaction id / TID that uniquely identifies every transaction.  The name
and the semantics were defined in the original exactly-once-semantics (EoS)
proposal (
https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging)
and KIP-939 just build on top of that.

> I'm curious to understand what happens if the producer dies, and does not
come up and recover the pending transaction within the transaction timeout
interval.

If the producer / application never comes back, the transaction will remain
in prepared (a.k.a. "in-doubt") state until an operator forcefully
terminates the transaction.  That's why there is a new ACL is defined in
this proposal -- this functionality should only provided to applications
that implement proper recovery logic.

-Artem

On Tue, Aug 22, 2023 at 12:52 AM Arjun Satish 
wrote:

> Hello Artem,
>
> Thanks for the KIP.
>
> I have the same question as Roger on concurrent writes, and an additional
> one on consumer behavior. Typically, transactions will timeout if not
> committed within some time interval. With the proposed changes in this KIP,
> consumers cannot consume past the ongoing transaction. I'm curious to
> understand what happens if the producer dies, and does not come up and
> recover the pending transaction within the transaction timeout interval. Or
> are we saying that when used in this 2PC context, we should configure these
> transaction timeouts to very large durations?
>
> Thanks in advance!
>
> Best,
> Arjun
>
>
> On Mon, Aug 21, 2023 at 1:06 PM Roger Hoover 
> wrote:
>
> > Hi Artem,
> >
> > Thanks for writing this KIP.  Can you clarify the requirements a bit more
> > for managing transaction state?  It looks like the application must have
> > stable transactional ids over time?   What is the granularity of those
> ids
> > and producers?  Say the application is a multi-threaded Java web server,
> > can/should all the concurrent threads share a transactional id and
> > producer?  That doesn't seem right to me unless the application is using
> > global DB locks that serialize all requests.  Instead, if the application
> > uses row-level DB locks, there could be multiple, concurrent, independent
> > txns happening in the same JVM so it seems like the granularity managing
> > transactional ids and txn state needs to line up with granularity of the
> DB
> > locking.
> >
> > Does that make sense or am I misunderstanding?
> >
> > Thanks,
> >
> > Roger
> >
> > On Wed, Aug 16, 2023 at 11:40 PM Artem Livshits
> >  wrote:
> >
> > > Hello,
> > >
> > > This is a discussion thread for
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-939%3A+Support+Participation+in+2PC
> > > .
> > >
> > > The KIP proposes extending Kafka transaction support (that already uses
> > 2PC
> > > under the hood) to enable atomicity of dual writes to Kafka and an
> > external
> > > database, and helps to fix a long standing Flink issue.
> > >
> > > An example of code that uses the dual write recipe with JDBC and should
> > > work for most SQL databases is here
> > > https://github.com/apache/kafka/pull/14231.
> > >
> > > The FLIP for the sister fix in Flink is here
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255071710
> > >
> > > -Artem
> > >
> >
>


Re: [DISCUSS] KIP-939: Support Participation in 2PC

2023-08-22 Thread Arjun Satish
Hello Artem,

Thanks for the KIP.

I have the same question as Roger on concurrent writes, and an additional
one on consumer behavior. Typically, transactions will timeout if not
committed within some time interval. With the proposed changes in this KIP,
consumers cannot consume past the ongoing transaction. I'm curious to
understand what happens if the producer dies, and does not come up and
recover the pending transaction within the transaction timeout interval. Or
are we saying that when used in this 2PC context, we should configure these
transaction timeouts to very large durations?

Thanks in advance!

Best,
Arjun


On Mon, Aug 21, 2023 at 1:06 PM Roger Hoover  wrote:

> Hi Artem,
>
> Thanks for writing this KIP.  Can you clarify the requirements a bit more
> for managing transaction state?  It looks like the application must have
> stable transactional ids over time?   What is the granularity of those ids
> and producers?  Say the application is a multi-threaded Java web server,
> can/should all the concurrent threads share a transactional id and
> producer?  That doesn't seem right to me unless the application is using
> global DB locks that serialize all requests.  Instead, if the application
> uses row-level DB locks, there could be multiple, concurrent, independent
> txns happening in the same JVM so it seems like the granularity managing
> transactional ids and txn state needs to line up with granularity of the DB
> locking.
>
> Does that make sense or am I misunderstanding?
>
> Thanks,
>
> Roger
>
> On Wed, Aug 16, 2023 at 11:40 PM Artem Livshits
>  wrote:
>
> > Hello,
> >
> > This is a discussion thread for
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-939%3A+Support+Participation+in+2PC
> > .
> >
> > The KIP proposes extending Kafka transaction support (that already uses
> 2PC
> > under the hood) to enable atomicity of dual writes to Kafka and an
> external
> > database, and helps to fix a long standing Flink issue.
> >
> > An example of code that uses the dual write recipe with JDBC and should
> > work for most SQL databases is here
> > https://github.com/apache/kafka/pull/14231.
> >
> > The FLIP for the sister fix in Flink is here
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255071710
> >
> > -Artem
> >
>


Re: [DISCUSS] KIP-939: Support Participation in 2PC

2023-08-21 Thread Roger Hoover
Hi Artem,

Thanks for writing this KIP.  Can you clarify the requirements a bit more
for managing transaction state?  It looks like the application must have
stable transactional ids over time?   What is the granularity of those ids
and producers?  Say the application is a multi-threaded Java web server,
can/should all the concurrent threads share a transactional id and
producer?  That doesn't seem right to me unless the application is using
global DB locks that serialize all requests.  Instead, if the application
uses row-level DB locks, there could be multiple, concurrent, independent
txns happening in the same JVM so it seems like the granularity managing
transactional ids and txn state needs to line up with granularity of the DB
locking.

Does that make sense or am I misunderstanding?

Thanks,

Roger

On Wed, Aug 16, 2023 at 11:40 PM Artem Livshits
 wrote:

> Hello,
>
> This is a discussion thread for
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-939%3A+Support+Participation+in+2PC
> .
>
> The KIP proposes extending Kafka transaction support (that already uses 2PC
> under the hood) to enable atomicity of dual writes to Kafka and an external
> database, and helps to fix a long standing Flink issue.
>
> An example of code that uses the dual write recipe with JDBC and should
> work for most SQL databases is here
> https://github.com/apache/kafka/pull/14231.
>
> The FLIP for the sister fix in Flink is here
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255071710
>
> -Artem
>


[DISCUSS] KIP-939: Support Participation in 2PC

2023-08-17 Thread Artem Livshits
Hello,

This is a discussion thread for
https://cwiki.apache.org/confluence/display/KAFKA/KIP-939%3A+Support+Participation+in+2PC
.

The KIP proposes extending Kafka transaction support (that already uses 2PC
under the hood) to enable atomicity of dual writes to Kafka and an external
database, and helps to fix a long standing Flink issue.

An example of code that uses the dual write recipe with JDBC and should
work for most SQL databases is here
https://github.com/apache/kafka/pull/14231.

The FLIP for the sister fix in Flink is here
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255071710

-Artem