[ 
https://issues.apache.org/jira/browse/KAFKA-18401?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17910864#comment-17910864
 ] 

Kuan Po Tseng commented on KAFKA-18401:
---------------------------------------

bq. Discussed this with a teammate and we have a way for this to work without 
changing the server. In the case where we call commitTransaction and the 
producer never called send, we can return successfully without sending the 
EndTxn request to the server. This should require minimal changes and allow for 
the backwards compatibilty.

Thanks, [~jolshan]. This is exactly what I'm trying to implement. Over the past 
few days, I’ve been struggling with whether "calling commitTransaction without 
calling send" is the right way to address this issue. There’s a corner case 
where if the acknowledgment (ack) is 0, the send might fail, but the producer 
wouldn’t be able to detect that failure. I'm not sure how to handle this 
case... Alternatively, perhaps we could ignore this side effect and simply 
detect whether {{send}} was called within a transaction and do not check if 
response is ok or not and then check if we can call {{{}commitTransaction{}}}?

> Transaction version 2 does not support commit transaction without records
> -------------------------------------------------------------------------
>
>                 Key: KAFKA-18401
>                 URL: https://issues.apache.org/jira/browse/KAFKA-18401
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Kuan Po Tseng
>            Assignee: Kuan Po Tseng
>            Priority: Blocker
>
> This issue was observed when implementing 
> https://issues.apache.org/jira/browse/KAFKA-18206.
> In short, under transaction version 2, it doesn't support commit transaction 
> without sending any records while transaction version 0 & 1 do support this 
> kind of scenario.
> Commit transactions without sending any records is fine when using 
> transaction versions 0 or 1 because the producer won't send EndTxnRequest to 
> the broker [0]. However, with transaction version 2, the producer still sends 
> an EndTxnRequest to the broker while in transaction coordinator, the txn 
> state is still in EMPTY, resulting in an error from the broker.
> This issue can be reproduced with the test in below. I'm unsure if this 
> behavior is expected. If it's not, one potential fix could be to follow the 
> approach used in TV_0 and TV_1, where the EndTxnRequest is not sent if no 
> partitions or offsets have been successfully added to the transaction. If 
> this behavior is expected, we should document it and let user know this 
> change.
> {code:java}
>     @ClusterTests({
>         @ClusterTest(brokers = 3, features = {
>             @ClusterFeature(feature = Feature.TRANSACTION_VERSION, version = 
> 0)}),
>         @ClusterTest(brokers = 3, features = {
>             @ClusterFeature(feature = Feature.TRANSACTION_VERSION, version = 
> 1)}),
>         @ClusterTest(brokers = 3, features = {
>             @ClusterFeature(feature = Feature.TRANSACTION_VERSION, version = 
> 2)})
>     })
>     public void testProducerEndTransaction2(ClusterInstance cluster) throws 
> InterruptedException {
>         Map<String, Object> properties = new HashMap<>();
>         properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "foobar");
>         properties.put(ProducerConfig.CLIENT_ID_CONFIG, "test");
>         properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
>         try (Producer<byte[], byte[]> producer1 = 
> cluster.producer(properties)) {
>             producer1.initTransactions();
>             producer1.beginTransaction();
>             producer1.commitTransaction(); // In TV_2, we'll get 
> InvalidTxnStateException
>         }
>     }
> {code}
> Another test case, which is essentially the same as the previous one, starts 
> with a transaction that includes records, and then proceeds to start the next 
> transaction. When using transaction version 2, we encounter an error, but 
> this time it's a different error from the one seen in the previous case.
> {code:java}
>     @ClusterTests({
>         @ClusterTest(brokers = 3, features = {
>             @ClusterFeature(feature = Feature.TRANSACTION_VERSION, version = 
> 0)}),
>         @ClusterTest(brokers = 3, features = {
>             @ClusterFeature(feature = Feature.TRANSACTION_VERSION, version = 
> 1)}),
>         @ClusterTest(brokers = 3, features = {
>             @ClusterFeature(feature = Feature.TRANSACTION_VERSION, version = 
> 2)})
>     })
>     public void testProducerEndTransaction(ClusterInstance cluster) {
>         Map<String, Object> properties = new HashMap<>();
>         properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "foobar");
>         properties.put(ProducerConfig.CLIENT_ID_CONFIG, "test");
>         properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
>         try (Producer<byte[], byte[]> producer1 = 
> cluster.producer(properties)) {
>             producer1.initTransactions();
>             producer1.beginTransaction();
>             producer1.send(new ProducerRecord<>("test", "key".getBytes(), 
> "value".getBytes()));
>             producer1.commitTransaction();
>             producer1.beginTransaction();
>             producer1.commitTransaction(); // In TV_2, we'll get 
> ProducerFencedException
>         }
>     }
> {code}
>  
> [0]: 
> [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#L857-L865]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to