artemlivshits commented on code in PR #15524: URL: https://github.com/apache/kafka/pull/15524#discussion_r1526939166
########## core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala: ########## @@ -177,6 +178,86 @@ class TransactionMarkerChannelManagerTest { any()) } + @Test + def shouldNotLoseTxnCompletionAfterLoad(): Unit = { + mockCache() + + val expectedTransition = txnMetadata2.prepareComplete(time.milliseconds()) + + when(metadataCache.getPartitionLeaderEndpoint( + ArgumentMatchers.eq(partition1.topic), + ArgumentMatchers.eq(partition1.partition), + any()) + ).thenReturn(Some(broker1)) + + // Build a successful client response. + val header = new RequestHeader(ApiKeys.WRITE_TXN_MARKERS, 0, "client", 1) + val successfulResponse = new WriteTxnMarkersResponse( + Collections.singletonMap(producerId2: java.lang.Long, Collections.singletonMap(partition1, Errors.NONE))) + val successfulClientResponse = new ClientResponse(header, null, null, + time.milliseconds(), time.milliseconds(), false, null, null, + successfulResponse) + + // Build a disconnected client response. + val disconnectedClientResponse = new ClientResponse(header, null, null, + time.milliseconds(), time.milliseconds(), true, null, null, + null) + + // Test matrix to cover various scenarios: + val clientResponses = Seq(successfulClientResponse, disconnectedClientResponse) + val getTransactionStateResponses = Seq( + // NOT_COORDINATOR error case + Left(Errors.NOT_COORDINATOR), + // COORDINATOR_LOAD_IN_PROGRESS + Left(Errors.COORDINATOR_LOAD_IN_PROGRESS), + // "Newly loaded" transaction state with the new epoch. + Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch2, txnMetadata2))) + ) + + clientResponses.foreach { clientResponse => + getTransactionStateResponses.foreach { getTransactionStateResponse => + // Reset data from previous iteration. + txnMetadata2.topicPartitions.add(partition1) + clearInvocations(txnStateManager) + // Send out markers for a transaction before load. + channelManager.addTxnMarkersToSend(coordinatorEpoch, txnResult, + txnMetadata2, expectedTransition) + + // Drain the marker to make it "in-flight". + val requests1 = channelManager.generateRequests().asScala + assertEquals(1, requests1.size) + + // Simulate a partition load: Review Comment: It covers cases in transaction marker request completion handler where we call txnMarkerChannelManager.removeMarkersForTxn (I literally put breakpoints on every line that calls txnMarkerChannelManager.removeMarkersForTxn and ran the test removing breakpoint as it gets hit so at the end of the test there were no breakpoints). The idea is that previously, there were cases (some more likely, some very unlikely) when a "zombie" reply that was issued before load could unconditionally delete state that was created by newly sent markers that that would lead to transaction being "stuck", now all those cases would leave the state alone so the new transaction completes. > Is there ever a case of getTransactionStateResponse where we have the old epoch? in that case we won't try to unconditionally delete txnMarkerChannelManager.removeMarkersForTxn so we won't hit this particular anomaly. The "zombie" reply will complete the transactions, and the reply for the send that happened to be after the load would be a no-op. We may hit a different anomaly, but assuming that we prevent loading transaction partition with the same coordinator epoch, we should never encounter that case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org