abbccdda commented on a change in pull request #9311:
URL: https://github.com/apache/kafka/pull/9311#discussion_r501155923



##########
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
##########
@@ -1198,18 +1209,23 @@ boolean canBumpEpoch() {
         return coordinatorSupportsBumpingEpoch;
     }
 
+    private void resetTransactions() {
+        newPartitionsInTransaction.clear();
+        pendingPartitionsInTransaction.clear();
+        partitionsInTransaction.clear();
+

Review comment:
       nit: remove new line

##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java
##########
@@ -40,6 +40,7 @@
  *   - {@link Errors#INVALID_PRODUCER_ID_MAPPING}
  *   - {@link Errors#INVALID_PRODUCER_EPOCH} // for version <=1
  *   - {@link Errors#PRODUCER_FENCED}
+ *   - {@link Errors#TRANSACTION_TIMED_OUT}

Review comment:
       We should also update the comment in corresponding JSON files to mention 
about the new error code, such as AddPartition/AddOffsets/EndTxn 
request/response.json

##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/EndTxnResponse.java
##########
@@ -34,6 +34,7 @@
  *   - {@link Errors#INVALID_PRODUCER_ID_MAPPING}
  *   - {@link Errors#INVALID_PRODUCER_EPOCH} // for version <=1
  *   - {@link Errors#PRODUCER_FENCED}
+ *   - {@link Errors#TRANSACTION_TIMED_OUT}

Review comment:
       We should also add this to the `AddOffsetsToTxnResponse`

##########
File path: 
core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
##########
@@ -381,9 +386,16 @@ class TransactionCoordinator(brokerId: Int,
             if (txnMetadata.producerId != producerId)
               Left(Errors.INVALID_PRODUCER_ID_MAPPING)
             // Strict equality is enforced on the client side requests, as 
they shouldn't bump the producer epoch.
-            else if ((isFromClient && producerEpoch != 
txnMetadata.producerEpoch) || producerEpoch < txnMetadata.producerEpoch)
+            else if (isFromClient && producerEpoch != 
txnMetadata.producerEpoch) {

Review comment:
       I'm not sure the logic here is actually simplified since we still return 
`PRODUCER_FENCED` twice. If not, we could still just add the inner if-else to 
the existing condition ` ((isFromClient && producerEpoch != 
txnMetadata.producerEpoch) || producerEpoch < txnMetadata.producerEpoch)`

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
##########
@@ -369,7 +372,9 @@ private TransactionalRequestResult 
beginCompletingTransaction(TransactionResult
         // If the error is an INVALID_PRODUCER_ID_MAPPING error, the server 
will not accept an EndTxnRequest, so skip
         // directly to InitProducerId. Otherwise, we must first abort the 
transaction, because the producer will be
         // fenced if we directly call InitProducerId.
-        if (!(lastError instanceof InvalidPidMappingException)) {
+        boolean needEndTxn = !(abortableError instanceof 
InvalidPidMappingException)

Review comment:
       We should also update the comment above. And I think we could extract 
`needEndTxn` as a helper function, if it is true universally to check 
abortableError for txn ending.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to