abbccdda commented on a change in pull request #9311: URL: https://github.com/apache/kafka/pull/9311#discussion_r504225780
########## File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java ########## @@ -1198,18 +1209,22 @@ boolean canBumpEpoch() { return coordinatorSupportsBumpingEpoch; } + private void resetTransactions() { + newPartitionsInTransaction.clear(); + pendingPartitionsInTransaction.clear(); + partitionsInTransaction.clear(); + } + private void completeTransaction() { if (epochBumpRequired) { transitionTo(State.INITIALIZING); } else { transitionTo(State.READY); } lastError = null; + abortableError = null; epochBumpRequired = false; - transactionStarted = false; Review comment: Do we miss the update for this flag? ########## File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java ########## @@ -1438,7 +1455,8 @@ public void handleResponse(AbstractResponse response) { log.debug("Did not attempt to add partition {} to transaction because other partitions in the " + "batch had errors.", topicPartition); hasPartitionErrors = true; - } else if (error == Errors.UNKNOWN_PRODUCER_ID || error == Errors.INVALID_PRODUCER_ID_MAPPING) { + } else if (error == Errors.UNKNOWN_PRODUCER_ID || error == Errors.INVALID_PRODUCER_ID_MAPPING Review comment: nit: we could get a helper for the combined error equality check ########## File path: core/src/test/scala/unit/kafka/server/KafkaApisTest.scala ########## @@ -671,6 +782,57 @@ class KafkaApisTest { } } + @Test + def shouldReplaceTxnTimeoutWithInvalidProducerEpochInEndTxnWithOlderClient(): Unit = { Review comment: Could we consolidate the tests with `shouldReplaceProducerFencedWithInvalidProducerEpochInEndTxnWithOlderClient`? Similar to other error code replacement tests. ########## File path: core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala ########## @@ -381,9 +388,14 @@ class TransactionCoordinator(brokerId: Int, if (txnMetadata.producerId != producerId) Left(Errors.INVALID_PRODUCER_ID_MAPPING) // Strict equality is enforced on the client side requests, as they shouldn't bump the producer epoch. - else if ((isFromClient && producerEpoch != txnMetadata.producerEpoch) || producerEpoch < txnMetadata.producerEpoch) - Left(Errors.PRODUCER_FENCED) - else if (txnMetadata.pendingTransitionInProgress && txnMetadata.pendingState.get != PrepareEpochFence) + else if (isFromClient && producerEpoch != txnMetadata.producerEpoch || producerEpoch < txnMetadata.producerEpoch) { Review comment: nit: I'm slightly prefer the previous condition format with () around &&. ########## File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java ########## @@ -369,7 +372,9 @@ private TransactionalRequestResult beginCompletingTransaction(TransactionResult // If the error is an INVALID_PRODUCER_ID_MAPPING error, the server will not accept an EndTxnRequest, so skip // directly to InitProducerId. Otherwise, we must first abort the transaction, because the producer will be // fenced if we directly call InitProducerId. - if (!(lastError instanceof InvalidPidMappingException)) { + boolean needEndTxn = !(abortableError instanceof InvalidPidMappingException) Review comment: If this is the case, could we just remove this intermediate variable? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org