jolshan commented on code in PR #15524:
URL: https://github.com/apache/kafka/pull/15524#discussion_r1526958117


##########
core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala:
##########
@@ -177,6 +178,86 @@ class TransactionMarkerChannelManagerTest {
       any())
   }
 
+  @Test
+  def shouldNotLoseTxnCompletionAfterLoad(): Unit = {
+    mockCache()
+
+    val expectedTransition = txnMetadata2.prepareComplete(time.milliseconds())
+
+    when(metadataCache.getPartitionLeaderEndpoint(
+      ArgumentMatchers.eq(partition1.topic),
+      ArgumentMatchers.eq(partition1.partition),
+      any())
+    ).thenReturn(Some(broker1))
+
+    // Build a successful client response.
+    val header = new RequestHeader(ApiKeys.WRITE_TXN_MARKERS, 0, "client", 1)
+    val successfulResponse = new WriteTxnMarkersResponse(
+      Collections.singletonMap(producerId2: java.lang.Long, 
Collections.singletonMap(partition1, Errors.NONE)))
+    val successfulClientResponse = new ClientResponse(header, null, null,
+      time.milliseconds(), time.milliseconds(), false, null, null,
+      successfulResponse)
+
+    // Build a disconnected client response.
+    val disconnectedClientResponse = new ClientResponse(header, null, null,
+      time.milliseconds(), time.milliseconds(), true, null, null,
+      null)
+
+    // Test matrix to cover various scenarios:
+    val clientResponses = Seq(successfulClientResponse, 
disconnectedClientResponse)
+    val getTransactionStateResponses = Seq(
+      // NOT_COORDINATOR error case
+      Left(Errors.NOT_COORDINATOR),
+      // COORDINATOR_LOAD_IN_PROGRESS
+      Left(Errors.COORDINATOR_LOAD_IN_PROGRESS),
+      // "Newly loaded" transaction state with the new epoch.
+      Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch2, 
txnMetadata2)))
+    )
+
+    clientResponses.foreach { clientResponse =>
+      getTransactionStateResponses.foreach { getTransactionStateResponse =>
+        // Reset data from previous iteration.
+        txnMetadata2.topicPartitions.add(partition1)
+        clearInvocations(txnStateManager)
+        // Send out markers for a transaction before load.
+        channelManager.addTxnMarkersToSend(coordinatorEpoch, txnResult,
+          txnMetadata2, expectedTransition)
+
+        // Drain the marker to make it "in-flight".
+        val requests1 = channelManager.generateRequests().asScala
+        assertEquals(1, requests1.size)
+
+        // Simulate a partition load:

Review Comment:
   Ok cool. I'm glad we merged https://github.com/apache/kafka/pull/15139 then. 
:) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to