artemlivshits commented on code in PR #15524:
URL: https://github.com/apache/kafka/pull/15524#discussion_r1526939166


##########
core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala:
##########
@@ -177,6 +178,86 @@ class TransactionMarkerChannelManagerTest {
       any())
   }
 
+  @Test
+  def shouldNotLoseTxnCompletionAfterLoad(): Unit = {
+    mockCache()
+
+    val expectedTransition = txnMetadata2.prepareComplete(time.milliseconds())
+
+    when(metadataCache.getPartitionLeaderEndpoint(
+      ArgumentMatchers.eq(partition1.topic),
+      ArgumentMatchers.eq(partition1.partition),
+      any())
+    ).thenReturn(Some(broker1))
+
+    // Build a successful client response.
+    val header = new RequestHeader(ApiKeys.WRITE_TXN_MARKERS, 0, "client", 1)
+    val successfulResponse = new WriteTxnMarkersResponse(
+      Collections.singletonMap(producerId2: java.lang.Long, 
Collections.singletonMap(partition1, Errors.NONE)))
+    val successfulClientResponse = new ClientResponse(header, null, null,
+      time.milliseconds(), time.milliseconds(), false, null, null,
+      successfulResponse)
+
+    // Build a disconnected client response.
+    val disconnectedClientResponse = new ClientResponse(header, null, null,
+      time.milliseconds(), time.milliseconds(), true, null, null,
+      null)
+
+    // Test matrix to cover various scenarios:
+    val clientResponses = Seq(successfulClientResponse, 
disconnectedClientResponse)
+    val getTransactionStateResponses = Seq(
+      // NOT_COORDINATOR error case
+      Left(Errors.NOT_COORDINATOR),
+      // COORDINATOR_LOAD_IN_PROGRESS
+      Left(Errors.COORDINATOR_LOAD_IN_PROGRESS),
+      // "Newly loaded" transaction state with the new epoch.
+      Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch2, 
txnMetadata2)))
+    )
+
+    clientResponses.foreach { clientResponse =>
+      getTransactionStateResponses.foreach { getTransactionStateResponse =>
+        // Reset data from previous iteration.
+        txnMetadata2.topicPartitions.add(partition1)
+        clearInvocations(txnStateManager)
+        // Send out markers for a transaction before load.
+        channelManager.addTxnMarkersToSend(coordinatorEpoch, txnResult,
+          txnMetadata2, expectedTransition)
+
+        // Drain the marker to make it "in-flight".
+        val requests1 = channelManager.generateRequests().asScala
+        assertEquals(1, requests1.size)
+
+        // Simulate a partition load:

Review Comment:
   It covers cases in transaction marker request completion handler where we 
call txnMarkerChannelManager.removeMarkersForTxn (I literally put breakpoints 
on every line that calls txnMarkerChannelManager.removeMarkersForTxn and ran 
the test removing breakpoint as it gets hit so at the end of the test there 
were no breakpoints).
   
   The idea is that previously, there were cases (some more likely, some very 
unlikely) when a "zombie" reply that was issued before load could 
unconditionally delete state that was created by newly sent markers that that 
would lead to transaction being "stuck", now all those cases would leave the 
state alone so the new transaction completes.
   
   > Is there ever a case of getTransactionStateResponse where we have the old 
epoch?
   
   in that case we won't try to unconditionally delete 
txnMarkerChannelManager.removeMarkersForTxn so we won't hit this particular 
anomaly.  The "zombie" reply will complete the transactions, and the reply for 
the send that happened to be after the load would be a no-op.
   
   We may hit a different anomaly, but assuming that we prevent loading 
transaction partition with the same coordinator epoch, we should never 
encounter that case.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to