hachikuji commented on code in PR #14489:
URL: https://github.com/apache/kafka/pull/14489#discussion_r1406616682


##########
core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala:
##########
@@ -514,8 +520,14 @@ class TransactionStateManager(brokerId: Int,
    * When this broker becomes a leader for a transaction log partition, load 
this partition and populate the transaction
    * metadata cache with the transactional ids. This operation must be 
resilient to any partial state left off from
    * the previous loading / unloading operation.
+   *
+   * If the state is already loaded (leader epoch bumps, but we have the same 
leader), just update the epoch in the
+   * metadata cache and for all the pending markers.
    */
-  def loadTransactionsForTxnTopicPartition(partitionId: Int, coordinatorEpoch: 
Int, sendTxnMarkers: SendTxnMarkersCallback): Unit = {
+  def maybeLoadTransactionsAndBumpEpochForTxnTopicPartition(partitionId: Int,

Review Comment:
   nit: The rename seems borderline overkill. I would consider the epoch bump 
part of transaction loading.



##########
core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala:
##########
@@ -514,8 +520,14 @@ class TransactionStateManager(brokerId: Int,
    * When this broker becomes a leader for a transaction log partition, load 
this partition and populate the transaction
    * metadata cache with the transactional ids. This operation must be 
resilient to any partial state left off from
    * the previous loading / unloading operation.
+   *
+   * If the state is already loaded (leader epoch bumps, but we have the same 
leader), just update the epoch in the
+   * metadata cache and for all the pending markers.
    */
-  def loadTransactionsForTxnTopicPartition(partitionId: Int, coordinatorEpoch: 
Int, sendTxnMarkers: SendTxnMarkersCallback): Unit = {
+  def maybeLoadTransactionsAndBumpEpochForTxnTopicPartition(partitionId: Int,
+                                                            coordinatorEpoch: 
Int,
+                                                            sendTxnMarkers: 
SendTxnMarkersCallback,
+                                                            
transactionStateLoaded: Boolean): Unit = {

Review Comment:
   As mentioned above, I don't think we should pass this as an argument.
   
   On a higher level, I'm trying to figure out the safety of this loading 
process. Suppose we have two epoch bumps in quick succession. Do we get a 
strong ordering guarantee given that it is done asynchronously?  I think I 
would expect that we would check for the existence of the partition in 
`loadingPartitions` when we first acquire the write lock below. If it exists, 
then we need to ensure the monotonicity of the epoch. If the entry has a higher 
epoch, then we ignore the call.



##########
core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala:
##########
@@ -447,12 +447,13 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
     info(s"Elected as the txn coordinator for partition $txnTopicPartitionId 
at epoch $coordinatorEpoch")
     // The operations performed during immigration must be resilient to any 
previous errors we saw or partial state we
     // left off during the unloading phase. Ensure we remove all associated 
state for this partition before we continue
-    // loading it.
+    // loading it. In the case where the state partition is already loaded, we 
want to remove inflight markers with the
+    // old epoch.
     
txnMarkerChannelManager.removeMarkersForTxnTopicPartition(txnTopicPartitionId)
 
     // Now load the partition.
-    txnManager.loadTransactionsForTxnTopicPartition(txnTopicPartitionId, 
coordinatorEpoch,
-      txnMarkerChannelManager.addTxnMarkersToSend)
+    
txnManager.maybeLoadTransactionsAndBumpEpochForTxnTopicPartition(txnTopicPartitionId,
 coordinatorEpoch,
+      txnMarkerChannelManager.addTxnMarkersToSend, 
txnManager.txnStateLoaded(txnTopicPartitionId))

Review Comment:
   It's curious that we need to pass the result of `txnStateLoaded`. Couldn't 
`txnManager` figure it out on its own?



##########
core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala:
##########
@@ -524,23 +536,35 @@ class TransactionStateManager(brokerId: Int,
     }
 
     def loadTransactions(startTimeMs: java.lang.Long): Unit = {
-      val schedulerTimeMs = time.milliseconds() - startTimeMs
-      info(s"Loading transaction metadata from $topicPartition at epoch 
$coordinatorEpoch")
-      validateTransactionTopicPartitionCountIsStable()
-
-      val loadedTransactions = loadTransactionMetadata(topicPartition, 
coordinatorEpoch)
-      val endTimeMs = time.milliseconds()
-      val totalLoadingTimeMs = endTimeMs - startTimeMs
-      partitionLoadSensor.record(totalLoadingTimeMs.toDouble, endTimeMs, false)
-      info(s"Finished loading ${loadedTransactions.size} transaction metadata 
from $topicPartition in " +
-        s"$totalLoadingTimeMs milliseconds, of which $schedulerTimeMs 
milliseconds was spent in the scheduler.")
+      val maybeLoadedTransactions =
+        if (!transactionStateLoaded) {
+          val schedulerTimeMs = time.milliseconds() - startTimeMs
+          info(s"Loading transaction metadata from $topicPartition at epoch 
$coordinatorEpoch")
+          validateTransactionTopicPartitionCountIsStable()
+
+          val loadedTransactions = loadTransactionMetadata(topicPartition, 
coordinatorEpoch)
+          val endTimeMs = time.milliseconds()
+          val totalLoadingTimeMs = endTimeMs - startTimeMs
+          partitionLoadSensor.record(totalLoadingTimeMs.toDouble, endTimeMs, 
false)
+          info(s"Finished loading ${loadedTransactions.size} transaction 
metadata from $topicPartition in " +
+            s"$totalLoadingTimeMs milliseconds, of which $schedulerTimeMs 
milliseconds was spent in the scheduler.")
+          Some(loadedTransactions)
+        } else {
+          None

Review Comment:
   Perhaps we should have a log message in this path?



##########
core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala:
##########
@@ -447,12 +447,13 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
     info(s"Elected as the txn coordinator for partition $txnTopicPartitionId 
at epoch $coordinatorEpoch")
     // The operations performed during immigration must be resilient to any 
previous errors we saw or partial state we
     // left off during the unloading phase. Ensure we remove all associated 
state for this partition before we continue
-    // loading it.
+    // loading it. In the case where the state partition is already loaded, we 
want to remove inflight markers with the
+    // old epoch.

Review Comment:
   nit: remove inflight markers with the old epoch and replace them with the 
new epoch?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to