[ 
https://issues.apache.org/jira/browse/KAFKA-13843?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xuexiaoyue updated KAFKA-13843:
-------------------------------
    Description: 
When a batch's delivery timeout has expired but later the client receives 
success response from the server. Sender will call 
{{transactionManager.handleCompletedBatch}} without checking if it was 
completed before. And some states tracked in {{topicPartitionBookkeeper}} will 
be updated incorrectly.
{code:java}
private void completeBatch(ProducerBatch batch, 
ProduceResponse.PartitionResponse response) {
    if (transactionManager != null) {
        transactionManager.handleCompletedBatch(batch, response);
    }

    if (batch.complete(response.baseOffset, response.logAppendTime)) {
        maybeRemoveAndDeallocateBatch(batch);
    }
} 

public synchronized void handleCompletedBatch(ProducerBatch batch, 
ProduceResponse.PartitionResponse response) {
    int lastAckedSequence = maybeUpdateLastAckedSequence(batch.topicPartition, 
batch.lastSequence());
    log.debug("ProducerId: {}; Set last ack'd sequence number for 
topic-partition {} to {}",
            batch.producerId(),
            batch.topicPartition,
            lastAckedSequence);

    updateLastAckedOffset(response, batch);
    removeInFlightBatch(batch);
}{code}

  was:
When a batch's delivery timeout has expired but later the client receives 
success response from the server. Sender will call 
`transactionManager.handleCompletedBatch(batch, response)` without judging if 
it was completed before. And in `transactionManager.handleCompletedBatch` 
method, some states tracked in `topicPartitionBookkeeper` will be updated 
incorrectly.
{code:java}
private void completeBatch(ProducerBatch batch, 
ProduceResponse.PartitionResponse response) {
    if (transactionManager != null) {
        transactionManager.handleCompletedBatch(batch, response);
    }

    if (batch.complete(response.baseOffset, response.logAppendTime)) {
        maybeRemoveAndDeallocateBatch(batch);
    }
} 

public synchronized void handleCompletedBatch(ProducerBatch batch, 
ProduceResponse.PartitionResponse response) {
    int lastAckedSequence = maybeUpdateLastAckedSequence(batch.topicPartition, 
batch.lastSequence());
    log.debug("ProducerId: {}; Set last ack'd sequence number for 
topic-partition {} to {}",
            batch.producerId(),
            batch.topicPartition,
            lastAckedSequence);

    updateLastAckedOffset(response, batch);
    removeInFlightBatch(batch);
}{code}


> States tracked in TransactionManager should not be changed when receiving 
> success response on an expired batch
> --------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-13843
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13843
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: xuexiaoyue
>            Priority: Major
>
> When a batch's delivery timeout has expired but later the client receives 
> success response from the server. Sender will call 
> {{transactionManager.handleCompletedBatch}} without checking if it was 
> completed before. And some states tracked in {{topicPartitionBookkeeper}} 
> will be updated incorrectly.
> {code:java}
> private void completeBatch(ProducerBatch batch, 
> ProduceResponse.PartitionResponse response) {
>     if (transactionManager != null) {
>         transactionManager.handleCompletedBatch(batch, response);
>     }
>     if (batch.complete(response.baseOffset, response.logAppendTime)) {
>         maybeRemoveAndDeallocateBatch(batch);
>     }
> } 
> public synchronized void handleCompletedBatch(ProducerBatch batch, 
> ProduceResponse.PartitionResponse response) {
>     int lastAckedSequence = 
> maybeUpdateLastAckedSequence(batch.topicPartition, batch.lastSequence());
>     log.debug("ProducerId: {}; Set last ack'd sequence number for 
> topic-partition {} to {}",
>             batch.producerId(),
>             batch.topicPartition,
>             lastAckedSequence);
>     updateLastAckedOffset(response, batch);
>     removeInFlightBatch(batch);
> }{code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to