[ 
https://issues.apache.org/jira/browse/KAFKA-18206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17907678#comment-17907678
 ] 

Kuan Po Tseng commented on KAFKA-18206:
---------------------------------------

During the investigation of the failed tests, I found that some tests (e.g., 
{{OffsetApiIntegrationTest.testResetSourceConnectorOffsetsExactlyOnceSupportEnabled}})
 commit transactions without sending any records. This behavior is fine when 
using transaction versions 0 or 1 because the producer won't send EndTxnRequest 
to the broker [0]. However, with transaction version 2, the producer still 
sends an EndTxnRequest to the broker while in transaction coordinator, the txn 
state is still in EMPTY, resulting in an error from the broker.

This issue can be reproduced with the test in below. I'm unsure if this 
behavior is expected. If it's not, one potential fix could be to follow the 
approach used in TV_0 and TV_1, where the EndTxnRequest is not sent if no 
partitions or offsets have been successfully added to the transaction. I'm 
happy to work on this fix if it's acceptable.
{code:java}
    @ClusterTests({
        @ClusterTest(brokers = 3, features = {
            @ClusterFeature(feature = Feature.TRANSACTION_VERSION, version = 
0)}),
        @ClusterTest(brokers = 3, features = {
            @ClusterFeature(feature = Feature.TRANSACTION_VERSION, version = 
1)}),
        @ClusterTest(brokers = 3, features = {
            @ClusterFeature(feature = Feature.TRANSACTION_VERSION, version = 
2)})
    })
    public void testProducerEndTransaction2(ClusterInstance cluster) throws 
InterruptedException {
        Map<String, Object> properties = new HashMap<>();
        properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "foobar");
        properties.put(ProducerConfig.CLIENT_ID_CONFIG, "test");
        properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        try (Producer<byte[], byte[]> producer1 = cluster.producer(properties)) 
{

            producer1.initTransactions();
            producer1.beginTransaction();
            producer1.commitTransaction(); // In TV_2, we'll get 
InvalidTxnStateException
        }
    }
{code}
Another test case, which is essentially the same as the previous one, starts 
with a transaction that includes records, and then proceeds to start the next 
transaction. When using transaction version 2, we encounter an error, but this 
time it’s a different error from the one seen in the previous case.{code:java}
    @ClusterTests({
        @ClusterTest(brokers = 3, features = {
            @ClusterFeature(feature = Feature.TRANSACTION_VERSION, version = 
0)}),
        @ClusterTest(brokers = 3, features = {
            @ClusterFeature(feature = Feature.TRANSACTION_VERSION, version = 
1)}),
        @ClusterTest(brokers = 3, features = {
            @ClusterFeature(feature = Feature.TRANSACTION_VERSION, version = 
2)})
    })
    public void testProducerEndTransaction(ClusterInstance cluster) {
        Map<String, Object> properties = new HashMap<>();
        properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "foobar");
        properties.put(ProducerConfig.CLIENT_ID_CONFIG, "test");
        properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        try (Producer<byte[], byte[]> producer1 = cluster.producer(properties)) 
{

            producer1.initTransactions();
            producer1.beginTransaction();
            producer1.send(new ProducerRecord<>("test", "key".getBytes(), 
"value".getBytes()));
            producer1.commitTransaction();

            producer1.beginTransaction();
            producer1.commitTransaction(); // In TV_2, we'll get 
ProducerFencedException
        }
    }
{code}
 

[0]: 
[https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#L857-L865]

> EmbeddedKafkaCluster must set features
> --------------------------------------
>
>                 Key: KAFKA-18206
>                 URL: https://issues.apache.org/jira/browse/KAFKA-18206
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams, unit tests
>            Reporter: David Jacot
>            Assignee: Kuan Po Tseng
>            Priority: Blocker
>             Fix For: 4.0.0
>
>
> The EmbeddedKafkaCluster classes respectively used by Streams and Connect 
> relies on 
> KafkaClusterTestKit. We just found out that they do not set the features at 
> all. They should.
>  
> The other integration tests rely on classes wrapping KafkaClusterTestKit and 
> they take care of setting the features. This is not ideal. I wonder whether 
> we could push that functionality to KafkaClusterTestKit.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to