hachikuji commented on code in PR #14489: URL: https://github.com/apache/kafka/pull/14489#discussion_r1406616682
########## core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala: ########## @@ -514,8 +520,14 @@ class TransactionStateManager(brokerId: Int, * When this broker becomes a leader for a transaction log partition, load this partition and populate the transaction * metadata cache with the transactional ids. This operation must be resilient to any partial state left off from * the previous loading / unloading operation. + * + * If the state is already loaded (leader epoch bumps, but we have the same leader), just update the epoch in the + * metadata cache and for all the pending markers. */ - def loadTransactionsForTxnTopicPartition(partitionId: Int, coordinatorEpoch: Int, sendTxnMarkers: SendTxnMarkersCallback): Unit = { + def maybeLoadTransactionsAndBumpEpochForTxnTopicPartition(partitionId: Int, Review Comment: nit: The rename seems borderline overkill. I would consider the epoch bump part of transaction loading. ########## core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala: ########## @@ -514,8 +520,14 @@ class TransactionStateManager(brokerId: Int, * When this broker becomes a leader for a transaction log partition, load this partition and populate the transaction * metadata cache with the transactional ids. This operation must be resilient to any partial state left off from * the previous loading / unloading operation. + * + * If the state is already loaded (leader epoch bumps, but we have the same leader), just update the epoch in the + * metadata cache and for all the pending markers. */ - def loadTransactionsForTxnTopicPartition(partitionId: Int, coordinatorEpoch: Int, sendTxnMarkers: SendTxnMarkersCallback): Unit = { + def maybeLoadTransactionsAndBumpEpochForTxnTopicPartition(partitionId: Int, + coordinatorEpoch: Int, + sendTxnMarkers: SendTxnMarkersCallback, + transactionStateLoaded: Boolean): Unit = { Review Comment: As mentioned above, I don't think we should pass this as an argument. On a higher level, I'm trying to figure out the safety of this loading process. Suppose we have two epoch bumps in quick succession. Do we get a strong ordering guarantee given that it is done asynchronously? I think I would expect that we would check for the existence of the partition in `loadingPartitions` when we first acquire the write lock below. If it exists, then we need to ensure the monotonicity of the epoch. If the entry has a higher epoch, then we ignore the call. ########## core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala: ########## @@ -447,12 +447,13 @@ class TransactionCoordinator(txnConfig: TransactionConfig, info(s"Elected as the txn coordinator for partition $txnTopicPartitionId at epoch $coordinatorEpoch") // The operations performed during immigration must be resilient to any previous errors we saw or partial state we // left off during the unloading phase. Ensure we remove all associated state for this partition before we continue - // loading it. + // loading it. In the case where the state partition is already loaded, we want to remove inflight markers with the + // old epoch. txnMarkerChannelManager.removeMarkersForTxnTopicPartition(txnTopicPartitionId) // Now load the partition. - txnManager.loadTransactionsForTxnTopicPartition(txnTopicPartitionId, coordinatorEpoch, - txnMarkerChannelManager.addTxnMarkersToSend) + txnManager.maybeLoadTransactionsAndBumpEpochForTxnTopicPartition(txnTopicPartitionId, coordinatorEpoch, + txnMarkerChannelManager.addTxnMarkersToSend, txnManager.txnStateLoaded(txnTopicPartitionId)) Review Comment: It's curious that we need to pass the result of `txnStateLoaded`. Couldn't `txnManager` figure it out on its own? ########## core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala: ########## @@ -524,23 +536,35 @@ class TransactionStateManager(brokerId: Int, } def loadTransactions(startTimeMs: java.lang.Long): Unit = { - val schedulerTimeMs = time.milliseconds() - startTimeMs - info(s"Loading transaction metadata from $topicPartition at epoch $coordinatorEpoch") - validateTransactionTopicPartitionCountIsStable() - - val loadedTransactions = loadTransactionMetadata(topicPartition, coordinatorEpoch) - val endTimeMs = time.milliseconds() - val totalLoadingTimeMs = endTimeMs - startTimeMs - partitionLoadSensor.record(totalLoadingTimeMs.toDouble, endTimeMs, false) - info(s"Finished loading ${loadedTransactions.size} transaction metadata from $topicPartition in " + - s"$totalLoadingTimeMs milliseconds, of which $schedulerTimeMs milliseconds was spent in the scheduler.") + val maybeLoadedTransactions = + if (!transactionStateLoaded) { + val schedulerTimeMs = time.milliseconds() - startTimeMs + info(s"Loading transaction metadata from $topicPartition at epoch $coordinatorEpoch") + validateTransactionTopicPartitionCountIsStable() + + val loadedTransactions = loadTransactionMetadata(topicPartition, coordinatorEpoch) + val endTimeMs = time.milliseconds() + val totalLoadingTimeMs = endTimeMs - startTimeMs + partitionLoadSensor.record(totalLoadingTimeMs.toDouble, endTimeMs, false) + info(s"Finished loading ${loadedTransactions.size} transaction metadata from $topicPartition in " + + s"$totalLoadingTimeMs milliseconds, of which $schedulerTimeMs milliseconds was spent in the scheduler.") + Some(loadedTransactions) + } else { + None Review Comment: Perhaps we should have a log message in this path? ########## core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala: ########## @@ -447,12 +447,13 @@ class TransactionCoordinator(txnConfig: TransactionConfig, info(s"Elected as the txn coordinator for partition $txnTopicPartitionId at epoch $coordinatorEpoch") // The operations performed during immigration must be resilient to any previous errors we saw or partial state we // left off during the unloading phase. Ensure we remove all associated state for this partition before we continue - // loading it. + // loading it. In the case where the state partition is already loaded, we want to remove inflight markers with the + // old epoch. Review Comment: nit: remove inflight markers with the old epoch and replace them with the new epoch? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org