hachikuji commented on a change in pull request #9311:
URL: https://github.com/apache/kafka/pull/9311#discussion_r501204644



##########
File path: core/src/test/scala/integration/kafka/api/TransactionsTest.scala
##########
@@ -555,11 +555,11 @@ class TransactionsTest extends KafkaServerTestHarness {
     try {
       // Now that the transaction has expired, the second send should fail 
with a ProducerFencedException.
       
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 
null, "2", "2", willBeCommitted = false)).get()
-      fail("should have raised a ProducerFencedException since the transaction 
has expired")
+      fail("should have raised a TransactionTimeOutException since the 
transaction has expired")
     } catch {
-      case _: ProducerFencedException =>
+      case _: TransactionTimeOutException =>
       case e: ExecutionException =>
-      assertTrue(e.getCause.isInstanceOf[ProducerFencedException])
+      assertTrue(e.getCause.isInstanceOf[TransactionTimeOutException])

Review comment:
       The main thing we need to test here is that the producer remains usable 
after the timeout.

##########
File path: 
core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
##########
@@ -381,9 +386,16 @@ class TransactionCoordinator(brokerId: Int,
             if (txnMetadata.producerId != producerId)
               Left(Errors.INVALID_PRODUCER_ID_MAPPING)
             // Strict equality is enforced on the client side requests, as 
they shouldn't bump the producer epoch.
-            else if ((isFromClient && producerEpoch != 
txnMetadata.producerEpoch) || producerEpoch < txnMetadata.producerEpoch)
+            else if (isFromClient && producerEpoch != 
txnMetadata.producerEpoch) {
+              if (producerEpoch == txnMetadata.lastProducerEpoch) {

Review comment:
       The code assumes that if we get a receive an EndTxn request and 
`lastProducerEpoch` has been set, then it must be because the coordinator timed 
out the transaction. That is definitely true in the common case, but I'm 
wondering if it is worth adding some state to `TransactionMetadata` which 
explicitly indicates that the transaction was timed out. Not super important 
and could be done in a follow-up.

##########
File path: 
core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
##########
@@ -364,7 +368,8 @@ class TransactionCoordinator(brokerId: Int,
                              producerEpoch: Short,
                              txnMarkerResult: TransactionResult,
                              isFromClient: Boolean,
-                             responseCallback: EndTxnCallback): Unit = {
+                             responseCallback: EndTxnCallback,
+                             timeoutAbort: Boolean = false): Unit = {

Review comment:
       nit: can we avoid optional fields? it is better for the caller to be 
explicit

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
##########
@@ -1589,7 +1607,8 @@ public void handleResponse(AbstractResponse response) {
                 fatalError(error.exception());
             } else if (error == Errors.INVALID_TXN_STATE) {
                 fatalError(error.exception());
-            } else if (error == Errors.UNKNOWN_PRODUCER_ID || error == 
Errors.INVALID_PRODUCER_ID_MAPPING) {
+            } else if (error == Errors.UNKNOWN_PRODUCER_ID || error == 
Errors.INVALID_PRODUCER_ID_MAPPING
+                    || error == Errors.TRANSACTION_TIMED_OUT) {

Review comment:
       The documentation suggests that after catching 
`TransactionTimeoutException`, the user can just begin a new transaction. 
Unless I'm missing something, however, it seems like we still require an 
explicit call to `abortTransaction`. That is actually what I prefer, but we 
should clarify the expectation in the documentation. It is super important for 
the integration test to cover the full expected flow.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to