[ 
https://issues.apache.org/jira/browse/KAFKA-16570?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17851101#comment-17851101
 ] 

Edoardo Comar edited comment on KAFKA-16570 at 5/31/24 2:12 PM:
----------------------------------------------------------------

[~jolshan] I agree I too think that
FenceProducersHandler.handleError
should handle the CONCURRENT_TRANSACTIONS as a success and maybe log.debug or 
log.info it

I used a simple test like
{code:java}
producer.initTransactions();
producer.beginTransaction();
producer.send(record).get();
producer.commitTransaction();

admin.fenceProducers(Collections.singleton(txId)).all().get();

producer.beginTransaction();
producer.send(record).get(); //throws ProducerFenced
{code}
while
{code:java}
producer.initTransactions();
producer.beginTransaction();
producer.send(record).get();

admin.fenceProducers(Collections.singleton(txId)).all().get(); //throws 
ConcurrentTransactionsException

producer.commitTransaction(); //2
{code}
however if the ConcurrentTransactionsException is swallowed, 
then //2 throws ProducerFencedException as expected

btw, just for the ... record, another 
producer.send().get() 
before commit would instead throw an InvalidProducerEpochException


was (Author: ecomar):
[~jolshan] I agree I too think that
FenceProducersHandler.handleError
should handle the CONCURRENT_TRANSACTIONS as a success and maybe log.debug or 
log.info it

I used a simple test like
{code:java}
producer.initTransactions();
producer.beginTransaction();
producer.send(record).get();
producer.commitTransaction();

admin.fenceProducers(Collections.singleton(txId)).all().get();

producer.beginTransaction();
producer.send(record).get(); //throws ProducerFenced
{code}
while
{code:java}
producer.initTransactions();
producer.beginTransaction();
producer.send(record).get();

admin.fenceProducers(Collections.singleton(txId)).all().get(); //throws 
ConcurrentTransactionsException

producer.commitTransaction(); //2
{code}
however if the ConcurrentTransactionsException is swallowed, 
then //2 throws ProducerFencedException as expected

for the ... record, another 
producer.send().get() 
before commit would instead throw an InvalidProducerEpochException

> FenceProducers API returns "unexpected error" when successful
> -------------------------------------------------------------
>
>                 Key: KAFKA-16570
>                 URL: https://issues.apache.org/jira/browse/KAFKA-16570
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Justine Olshan
>            Assignee: Justine Olshan
>            Priority: Major
>
> When we want to fence a producer using the admin client, we send an 
> InitProducerId request.
> There is logic in that API to fence (and abort) any ongoing transactions and 
> that is what the API relies on to fence the producer. However, this handling 
> also returns CONCURRENT_TRANSACTIONS. In normal usage, this is good because 
> we want to actually get a new producer ID and want to retry until the the ID 
> is supplied or we time out.  
> [https://github.com/apache/kafka/blob/5193eb93237ba9093ae444d73a1eaa2d6abcc9c1/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala#L170]
>  
> [https://github.com/apache/kafka/blob/a3dcbd4e28a35f79f75ec1bf316ef0b39c0df164/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#L1322]
>  
> In the case of fence producer, we don't retry and instead we have no handling 
> for concurrent transactions and log a message about an unexpected error.
> [https://github.com/apache/kafka/blob/a3dcbd4e28a35f79f75ec1bf316ef0b39c0df164/clients/src/main/java/org/apache/kafka/clients/admin/internals/FenceProducersHandler.java#L112]
>  
> This is not unexpected though and the operation was successful. We should 
> just swallow this error and treat this as a successful run of the command. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to