[ 
https://issues.apache.org/jira/browse/KAFKA-19082?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ritika Reddy updated KAFKA-19082:
---------------------------------
    Description: 
h2. KafkaProducer API Changes

New {{KafkaProducer.PreparedTxnState}} class is going to be defined as 
following:
{panel}
{panel}
|{{static}} {{public}} {{class}} {{PreparedTxnState {}}
{{  }}{{public}} {{String toString();}}
{{  }}{{public}} {{PreparedTxnState(String serializedState);}}
{{  }}{{public}} {{PreparedTxnState();}}
{{}}}|

The objects of this class can serialize to / deserialize from a string value 
and can be written to / read from a database.  The implementation is going to 
store {{producerId}} and {{{}epoch{}}}.

New overloaded method will be added to {{{}KafkaProducer{}}}:

{{public void initTransactions(boolean keepPreparedTxn)}} 

If the value is 'true' then the corresponding field is set in the 
{{InitProducerIdRequest}} and the {{KafkaProducer}} object is set into a state 
which only allows calling  {{{}.commitTransaction{}}}, {{.abortTransaction,}} 
or {{.completeTransaction.}}

New method will be added to {{{}KafkaProducer{}}}:

{{public PreparedTxnState prepareTransaction()}} 

This would flush all the pending messages and transition the producer into a 
mode where only {{{}.commitTransaction{}}}, {{.abortTransaction,}} or 
{{.completeTransaction}} could be called (calling other methods,  e.g. 
{{.send}} , in that mode would result in {{IllegalStateException}} being 
thrown).  If the call is successful (all messages successfully got flushed to 
all partitions) the transaction is prepared.  If the 2PC is not enabled, we 
return the {{INVALID_TXN_STATE}} error.

New method would be added to {{{}KafkaProducer{}}}:

{{public void completeTransaction(PreparedTxnState preparedTxnState)}}

The method would compare the currently prepared transaction state and the state 
passed in the argument and either commit or abort the transaction.  If the 
producer is not in prepared state (i.e. neither prepareTransaction was called 
nor initTransaction(true) was called) we return an INVALID_TXN_STATE error.
h2. Client Configuration Changes

*transaction.two.phase.commit.enable* The default would be ‘false’.  If set to 
‘true’, then the broker is informed that the client is participating in two 
phase commit protocol and transactions that this client starts never expire.

*transaction.timeout.ms* The semantics is not changed, but it would be an error 
to set *transaction.timeout.ms* when *two.phase.commit.enable* is set to 'true’.

 

 

  was:
h2. Client Configuration Changes

*transaction.two.phase.commit.enable* The default would be ‘false’.  If set to 
‘true’, then the broker is informed that the client is participating in two 
phase commit protocol and transactions that this client starts never expire.

*transaction.timeout.ms* The semantics is not changed, but it would be an error 
to set *transaction.timeout.ms* when *two.phase.commit.enable* is set to 'true’.

 

 


> Client side changes to enable 2PC
> ---------------------------------
>
>                 Key: KAFKA-19082
>                 URL: https://issues.apache.org/jira/browse/KAFKA-19082
>             Project: Kafka
>          Issue Type: Sub-task
>            Reporter: Ritika Reddy
>            Assignee: Ritika Reddy
>            Priority: Major
>
> h2. KafkaProducer API Changes
> New {{KafkaProducer.PreparedTxnState}} class is going to be defined as 
> following:
> {panel}
> {panel}
> |{{static}} {{public}} {{class}} {{PreparedTxnState {}}
> {{  }}{{public}} {{String toString();}}
> {{  }}{{public}} {{PreparedTxnState(String serializedState);}}
> {{  }}{{public}} {{PreparedTxnState();}}
> {{}}}|
> The objects of this class can serialize to / deserialize from a string value 
> and can be written to / read from a database.  The implementation is going to 
> store {{producerId}} and {{{}epoch{}}}.
> New overloaded method will be added to {{{}KafkaProducer{}}}:
> {{public void initTransactions(boolean keepPreparedTxn)}} 
> If the value is 'true' then the corresponding field is set in the 
> {{InitProducerIdRequest}} and the {{KafkaProducer}} object is set into a 
> state which only allows calling  {{{}.commitTransaction{}}}, 
> {{.abortTransaction,}} or {{.completeTransaction.}}
> New method will be added to {{{}KafkaProducer{}}}:
> {{public PreparedTxnState prepareTransaction()}} 
> This would flush all the pending messages and transition the producer into a 
> mode where only {{{}.commitTransaction{}}}, {{.abortTransaction,}} or 
> {{.completeTransaction}} could be called (calling other methods,  e.g. 
> {{.send}} , in that mode would result in {{IllegalStateException}} being 
> thrown).  If the call is successful (all messages successfully got flushed to 
> all partitions) the transaction is prepared.  If the 2PC is not enabled, we 
> return the {{INVALID_TXN_STATE}} error.
> New method would be added to {{{}KafkaProducer{}}}:
> {{public void completeTransaction(PreparedTxnState preparedTxnState)}}
> The method would compare the currently prepared transaction state and the 
> state passed in the argument and either commit or abort the transaction.  If 
> the producer is not in prepared state (i.e. neither prepareTransaction was 
> called nor initTransaction(true) was called) we return an INVALID_TXN_STATE 
> error.
> h2. Client Configuration Changes
> *transaction.two.phase.commit.enable* The default would be ‘false’.  If set 
> to ‘true’, then the broker is informed that the client is participating in 
> two phase commit protocol and transactions that this client starts never 
> expire.
> *transaction.timeout.ms* The semantics is not changed, but it would be an 
> error to set *transaction.timeout.ms* when *two.phase.commit.enable* is set 
> to 'true’.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to