denis-chudov commented on code in PR #2856:
URL: https://github.com/apache/ignite-3/pull/2856#discussion_r1425912261


##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -505,46 +506,74 @@ private CompletableFuture<?> 
processRequest(ReplicaRequest request, @Nullable Bo
      * @param request Tx recovery request.
      * @return The future is complete when the transaction state is finalized.
      */
-    private CompletableFuture<Void> processTxRecoveryMessage(TxRecoveryMessage 
request) {
+    private CompletableFuture<Void> processTxRecoveryMessage(TxRecoveryMessage 
request, String senderId) {
         UUID txId = request.txId();
 
         TxMeta txMeta = txStateStorage.get(txId);
 
         // Check whether a transaction has already been finished.
-        boolean transactionAlreadyFinished = txMeta != null && 
isFinalState(txMeta.txState());
+        if (txMeta != null && isFinalState(txMeta.txState())) {
+            return recoverFinishedTx(txId, txMeta)
+                    // If the sender has sent a recovery message, it failed to 
handle it on its own,
+                    // so sending cleanup to the sender for the transaction we 
know is finished.
+                    .whenComplete((v, ex) -> runCleanupOnNode(txId, senderId));
+        }
 
-        if (transactionAlreadyFinished) {
+        LOG.info("Orphan transaction has to be aborted [tx={}].", txId);
+
+        return triggerTxRecovery(txId, senderId);
+    }
+
+    private CompletableFuture<Void> recoverFinishedTx(UUID txId, TxMeta 
txMeta) {
+        if (txMeta.locksReleased() || txMeta.enlistedPartitions().isEmpty()) {
+            // Nothing to do if the locks have been released already or there 
are no enlistedPartitions available.
             return nullCompletedFuture();
         }
 
-        LOG.info("Orphan transaction has to be aborted [tx={}].", txId);
+        // Otherwise run a cleanup on the known set of partitions.
+        return (CompletableFuture<Void>) durableCleanup(txId, txMeta);
+    }
+
+    /**
+     * Run cleanup on a node.
+     *
+     * @param txId Transaction id.
+     * @param nodeId Node id (inconsistent).
+     */
+    private CompletableFuture<Void> runCleanupOnNode(UUID txId, String nodeId) 
{
+        // Get node id of the sender to send back cleanup requests.
+        String nodeConsistentId = 
clusterNodeResolver.getConsistentIdById(nodeId);
+
+        return nodeConsistentId == null ? nullCompletedFuture() : 
txManager.cleanup(nodeConsistentId, txId);
+    }
 
-        return triggerTxRecovery(txId)
-                .thenApply(v -> null);
+    private CompletableFuture<Void> triggerTxRecovery(UUID txId, String 
senderId) {
+        // If the transaction state is pending, then the transaction should be 
rolled back,
+        // meaning that the state is changed to aborted and a corresponding 
cleanup request
+        // is sent in a common durable manner to a partition that have 
initiated recovery.
+        return txManager.finish(
+                        new HybridTimestampTracker(),
+                        replicationGroupId,
+                        false,
+                        Map.of(replicationGroupId, 0L), // term is not 
required for the rollback.
+                        txId
+                )
+                .thenCompose(unused -> runCleanupOnNode(txId, senderId));
     }
 
     /**
      * Starts tx recovery process. Returns the future containing transaction 
meta with its final state that completes
      * when the recovery is completed.
      *
      * @param txId Transaction id.
+     * @param txStateMeta Transaction meta.
+     * @param senderId Sender inconsistent id.
      * @return Tx recovery future, or failed future if the tx recovery is not 
possible.
      */
-    private CompletableFuture<TransactionMeta> triggerTxRecovery(UUID txId) {
-        // TODO: IGNITE-20735 Implement initiate recovery handling logic. This 
part has to be fully rewritten.
-        TxStateMeta txStateMeta = txManager.stateMeta(txId);
-
+    private CompletableFuture<TransactionMeta> triggerTxRecoveryWithState(UUID 
txId, @Nullable TxStateMeta txStateMeta, String senderId) {
         if (txStateMeta == null || txStateMeta.txState() == ABANDONED) {
-            txStateStorage.put(txId, new TxMeta(ABORTED, List.of(), null));
-            return completedFuture(
-                    txManager.updateTxMeta(txId, old -> {
-                        if (old == null) {
-                            return new TxStateMeta(ABORTED, null, 
replicationGroupId, null);
-                        } else {
-                            return new TxStateMeta(ABORTED, 
old.txCoordinatorId(), replicationGroupId, null);
-                        }
-                    })
-            );
+            return triggerTxRecovery(txId, senderId)
+                    .thenApply(v -> txStateStorage.get(txId));

Review Comment:
   Actually, seems that it happens in another way. Any concurrent recovery may 
fail with assertion in txManager.finish here:
   `assert !tuple0.isTxFinishing() : "Transaction is already finished [id=" + 
uuid + "].";`
   We should return, for example, failed future with IllegalStateException 
instead of throwing an assertion error.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to