[
https://issues.apache.org/jira/browse/KAFKA-13843?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
xuexiaoyue updated KAFKA-13843:
-------------------------------
Description:
When a batch's delivery timeout has expired but later the client receives
success response from the server. Sender will call
{{transactionManager.handleCompletedBatch}} without checking if it was
completed before. And some states tracked in {{topicPartitionBookkeeper}} will
be updated incorrectly.
{code:java}
private void completeBatch(ProducerBatch batch,
ProduceResponse.PartitionResponse response) {
if (transactionManager != null) {
transactionManager.handleCompletedBatch(batch, response);
}
if (batch.complete(response.baseOffset, response.logAppendTime)) {
maybeRemoveAndDeallocateBatch(batch);
}
}
public synchronized void handleCompletedBatch(ProducerBatch batch,
ProduceResponse.PartitionResponse response) {
int lastAckedSequence = maybeUpdateLastAckedSequence(batch.topicPartition,
batch.lastSequence());
log.debug("ProducerId: {}; Set last ack'd sequence number for
topic-partition {} to {}",
batch.producerId(),
batch.topicPartition,
lastAckedSequence);
updateLastAckedOffset(response, batch);
removeInFlightBatch(batch);
}{code}
was:
When a batch's delivery timeout has expired but later the client receives
success response from the server. Sender will call
`transactionManager.handleCompletedBatch(batch, response)` without judging if
it was completed before. And in `transactionManager.handleCompletedBatch`
method, some states tracked in `topicPartitionBookkeeper` will be updated
incorrectly.
{code:java}
private void completeBatch(ProducerBatch batch,
ProduceResponse.PartitionResponse response) {
if (transactionManager != null) {
transactionManager.handleCompletedBatch(batch, response);
}
if (batch.complete(response.baseOffset, response.logAppendTime)) {
maybeRemoveAndDeallocateBatch(batch);
}
}
public synchronized void handleCompletedBatch(ProducerBatch batch,
ProduceResponse.PartitionResponse response) {
int lastAckedSequence = maybeUpdateLastAckedSequence(batch.topicPartition,
batch.lastSequence());
log.debug("ProducerId: {}; Set last ack'd sequence number for
topic-partition {} to {}",
batch.producerId(),
batch.topicPartition,
lastAckedSequence);
updateLastAckedOffset(response, batch);
removeInFlightBatch(batch);
}{code}
> States tracked in TransactionManager should not be changed when receiving
> success response on an expired batch
> --------------------------------------------------------------------------------------------------------------
>
> Key: KAFKA-13843
> URL: https://issues.apache.org/jira/browse/KAFKA-13843
> Project: Kafka
> Issue Type: Bug
> Reporter: xuexiaoyue
> Priority: Major
>
> When a batch's delivery timeout has expired but later the client receives
> success response from the server. Sender will call
> {{transactionManager.handleCompletedBatch}} without checking if it was
> completed before. And some states tracked in {{topicPartitionBookkeeper}}
> will be updated incorrectly.
> {code:java}
> private void completeBatch(ProducerBatch batch,
> ProduceResponse.PartitionResponse response) {
> if (transactionManager != null) {
> transactionManager.handleCompletedBatch(batch, response);
> }
> if (batch.complete(response.baseOffset, response.logAppendTime)) {
> maybeRemoveAndDeallocateBatch(batch);
> }
> }
> public synchronized void handleCompletedBatch(ProducerBatch batch,
> ProduceResponse.PartitionResponse response) {
> int lastAckedSequence =
> maybeUpdateLastAckedSequence(batch.topicPartition, batch.lastSequence());
> log.debug("ProducerId: {}; Set last ack'd sequence number for
> topic-partition {} to {}",
> batch.producerId(),
> batch.topicPartition,
> lastAckedSequence);
> updateLastAckedOffset(response, batch);
> removeInFlightBatch(batch);
> }{code}
--
This message was sent by Atlassian Jira
(v8.20.7#820007)