denis-chudov commented on code in PR #2856:
URL: https://github.com/apache/ignite-3/pull/2856#discussion_r1425101034


##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -505,46 +506,74 @@ private CompletableFuture<?> 
processRequest(ReplicaRequest request, @Nullable Bo
      * @param request Tx recovery request.
      * @return The future is complete when the transaction state is finalized.
      */
-    private CompletableFuture<Void> processTxRecoveryMessage(TxRecoveryMessage 
request) {
+    private CompletableFuture<Void> processTxRecoveryMessage(TxRecoveryMessage 
request, String senderId) {
         UUID txId = request.txId();
 
         TxMeta txMeta = txStateStorage.get(txId);
 
         // Check whether a transaction has already been finished.
-        boolean transactionAlreadyFinished = txMeta != null && 
isFinalState(txMeta.txState());
+        if (txMeta != null && isFinalState(txMeta.txState())) {
+            return recoverFinishedTx(txId, txMeta)
+                    // If the sender has sent a recovery message, it failed to 
handle it on its own,
+                    // so sending cleanup to the sender for the transaction we 
know is finished.
+                    .whenComplete((v, ex) -> runCleanupOnNode(txId, senderId));
+        }
 
-        if (transactionAlreadyFinished) {
+        LOG.info("Orphan transaction has to be aborted [tx={}].", txId);
+
+        return triggerTxRecovery(txId, senderId);
+    }
+
+    private CompletableFuture<Void> recoverFinishedTx(UUID txId, TxMeta 
txMeta) {
+        if (txMeta.locksReleased() || txMeta.enlistedPartitions().isEmpty()) {
+            // Nothing to do if the locks have been released already or there 
are no enlistedPartitions available.
             return nullCompletedFuture();
         }
 
-        LOG.info("Orphan transaction has to be aborted [tx={}].", txId);
+        // Otherwise run a cleanup on the known set of partitions.
+        return (CompletableFuture<Void>) durableCleanup(txId, txMeta);
+    }
+
+    /**
+     * Run cleanup on a node.
+     *
+     * @param txId Transaction id.
+     * @param nodeId Node id (inconsistent).
+     */
+    private CompletableFuture<Void> runCleanupOnNode(UUID txId, String nodeId) 
{
+        // Get node id of the sender to send back cleanup requests.
+        String nodeConsistentId = 
clusterNodeResolver.getConsistentIdById(nodeId);
+
+        return nodeConsistentId == null ? nullCompletedFuture() : 
txManager.cleanup(nodeConsistentId, txId);
+    }
 
-        return triggerTxRecovery(txId)
-                .thenApply(v -> null);
+    private CompletableFuture<Void> triggerTxRecovery(UUID txId, String 
senderId) {
+        // If the transaction state is pending, then the transaction should be 
rolled back,
+        // meaning that the state is changed to aborted and a corresponding 
cleanup request
+        // is sent in a common durable manner to a partition that have 
initiated recovery.
+        return txManager.finish(
+                        new HybridTimestampTracker(),
+                        replicationGroupId,
+                        false,
+                        Map.of(replicationGroupId, 0L), // term is not 
required for the rollback.
+                        txId
+                )
+                .thenCompose(unused -> runCleanupOnNode(txId, senderId));
     }
 
     /**
      * Starts tx recovery process. Returns the future containing transaction 
meta with its final state that completes
      * when the recovery is completed.
      *
      * @param txId Transaction id.
+     * @param txStateMeta Transaction meta.
+     * @param senderId Sender inconsistent id.
      * @return Tx recovery future, or failed future if the tx recovery is not 
possible.
      */
-    private CompletableFuture<TransactionMeta> triggerTxRecovery(UUID txId) {
-        // TODO: IGNITE-20735 Implement initiate recovery handling logic. This 
part has to be fully rewritten.
-        TxStateMeta txStateMeta = txManager.stateMeta(txId);
-
+    private CompletableFuture<TransactionMeta> triggerTxRecoveryWithState(UUID 
txId, @Nullable TxStateMeta txStateMeta, String senderId) {
         if (txStateMeta == null || txStateMeta.txState() == ABANDONED) {
-            txStateStorage.put(txId, new TxMeta(ABORTED, List.of(), null));
-            return completedFuture(
-                    txManager.updateTxMeta(txId, old -> {
-                        if (old == null) {
-                            return new TxStateMeta(ABORTED, null, 
replicationGroupId, null);
-                        } else {
-                            return new TxStateMeta(ABORTED, 
old.txCoordinatorId(), replicationGroupId, null);
-                        }
-                    })
-            );
+            return triggerTxRecovery(txId, senderId)
+                    .thenApply(v -> txStateStorage.get(txId));

Review Comment:
   There are cases when we are on the commit partition primary which is not a 
leader, so if this state in tx state storage had been modified by the 
concurrent recovery process, the finish future from our recovery could fail but 
tx state could be not replicated yet:
   - recovery A (current): start txManager.finish()
   - recovery B (concurrent): start txManager.finish(), finishes successfully, 
but the final tx state is applied only on leader (which is some remote node)
   - recovery A: txManager.finish() is failed due to recovery B, but local 
instance of tx state storage still doesn't have an actual tx meta.
   
   In this case we can check the local tx state map (because the concurrent 
recovery happens on the same commit partition replica) and it will be either 
FINISHING (if the concurrent process still wasn't able to put the final state) 
or some final state, and in case of FINISHING we can return the finish future. 
What do you think?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -505,46 +506,74 @@ private CompletableFuture<?> 
processRequest(ReplicaRequest request, @Nullable Bo
      * @param request Tx recovery request.
      * @return The future is complete when the transaction state is finalized.
      */
-    private CompletableFuture<Void> processTxRecoveryMessage(TxRecoveryMessage 
request) {
+    private CompletableFuture<Void> processTxRecoveryMessage(TxRecoveryMessage 
request, String senderId) {
         UUID txId = request.txId();
 
         TxMeta txMeta = txStateStorage.get(txId);
 
         // Check whether a transaction has already been finished.
-        boolean transactionAlreadyFinished = txMeta != null && 
isFinalState(txMeta.txState());
+        if (txMeta != null && isFinalState(txMeta.txState())) {
+            return recoverFinishedTx(txId, txMeta)
+                    // If the sender has sent a recovery message, it failed to 
handle it on its own,
+                    // so sending cleanup to the sender for the transaction we 
know is finished.
+                    .whenComplete((v, ex) -> runCleanupOnNode(txId, senderId));
+        }
 
-        if (transactionAlreadyFinished) {
+        LOG.info("Orphan transaction has to be aborted [tx={}].", txId);
+
+        return triggerTxRecovery(txId, senderId);
+    }
+
+    private CompletableFuture<Void> recoverFinishedTx(UUID txId, TxMeta 
txMeta) {
+        if (txMeta.locksReleased() || txMeta.enlistedPartitions().isEmpty()) {
+            // Nothing to do if the locks have been released already or there 
are no enlistedPartitions available.
             return nullCompletedFuture();
         }
 
-        LOG.info("Orphan transaction has to be aborted [tx={}].", txId);
+        // Otherwise run a cleanup on the known set of partitions.
+        return (CompletableFuture<Void>) durableCleanup(txId, txMeta);
+    }
+
+    /**
+     * Run cleanup on a node.
+     *
+     * @param txId Transaction id.
+     * @param nodeId Node id (inconsistent).
+     */
+    private CompletableFuture<Void> runCleanupOnNode(UUID txId, String nodeId) 
{
+        // Get node id of the sender to send back cleanup requests.
+        String nodeConsistentId = 
clusterNodeResolver.getConsistentIdById(nodeId);
+
+        return nodeConsistentId == null ? nullCompletedFuture() : 
txManager.cleanup(nodeConsistentId, txId);
+    }
 
-        return triggerTxRecovery(txId)
-                .thenApply(v -> null);
+    private CompletableFuture<Void> triggerTxRecovery(UUID txId, String 
senderId) {
+        // If the transaction state is pending, then the transaction should be 
rolled back,
+        // meaning that the state is changed to aborted and a corresponding 
cleanup request
+        // is sent in a common durable manner to a partition that have 
initiated recovery.
+        return txManager.finish(
+                        new HybridTimestampTracker(),
+                        replicationGroupId,
+                        false,
+                        Map.of(replicationGroupId, 0L), // term is not 
required for the rollback.
+                        txId
+                )
+                .thenCompose(unused -> runCleanupOnNode(txId, senderId));
     }
 
     /**
      * Starts tx recovery process. Returns the future containing transaction 
meta with its final state that completes
      * when the recovery is completed.
      *
      * @param txId Transaction id.
+     * @param txStateMeta Transaction meta.
+     * @param senderId Sender inconsistent id.
      * @return Tx recovery future, or failed future if the tx recovery is not 
possible.
      */
-    private CompletableFuture<TransactionMeta> triggerTxRecovery(UUID txId) {
-        // TODO: IGNITE-20735 Implement initiate recovery handling logic. This 
part has to be fully rewritten.
-        TxStateMeta txStateMeta = txManager.stateMeta(txId);
-
+    private CompletableFuture<TransactionMeta> triggerTxRecoveryWithState(UUID 
txId, @Nullable TxStateMeta txStateMeta, String senderId) {
         if (txStateMeta == null || txStateMeta.txState() == ABANDONED) {

Review Comment:
   Why do you need this condition? this method is called when `txStateMeta == 
null || txStateMeta.txState() == ABANDONED || txCoordinatorIsDead()` so here 
you just eliminate the last condition and return the completed future when tx 
state is pending but coordinator is missing.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -507,22 +514,59 @@ private CompletableFuture<?> 
processRequest(ReplicaRequest request, @Nullable Bo
      * @param request Tx recovery request.
      * @return The future is complete when the transaction state is finalized.
      */
-    private CompletableFuture<Void> processTxRecoveryAction(TxRecoveryMessage 
request) {
+    private CompletableFuture<Void> processTxRecoveryAction(TxRecoveryMessage 
request, String senderId) {
         UUID txId = request.txId();
 
         TxMeta txMeta = txStateStorage.get(txId);
 
         // Check whether a transaction has already been finished.
-        boolean transactionAlreadyFinished = txMeta != null && 
isFinalState(txMeta.txState());
+        if (txMeta != null && isFinalState(txMeta.txState())) {
+            return recoverFinishedTx(txId, txMeta)
+                    // If the sender has sent a recovery message, it failed to 
handle it on its own,
+                    // so sending cleanup to the sender for the transaction we 
know is finished.
+                    .thenCompose(v -> runCleanupOnNode(txId, senderId));
+        }
 
-        if (transactionAlreadyFinished) {
+        LOG.info("Orphan transaction has to be aborted [tx={}].", txId);
+
+        return triggerTxRecovery(txId, senderId);
+    }
+
+    private CompletableFuture<Void> recoverFinishedTx(UUID txId, TxMeta 
txMeta) {
+        if (txMeta.locksReleased() || txMeta.enlistedPartitions().isEmpty()) {
+            // Nothing to do if the locks have been released already or there 
are no enlistedPartitions available.
             return nullCompletedFuture();
         }
 
-        LOG.info("Orphan transaction has to be aborted [tx={}].", txId);
+        // Otherwise run a cleanup on the known set of partitions.
+        return (CompletableFuture<Void>) durableCleanup(txId, txMeta);
+    }
 
-        // TODO: IGNITE-20735 Implement initiate recovery handling logic.
-        return nullCompletedFuture();
+    /**
+     * Run cleanup on a node.
+     *
+     * @param txId Transaction id.
+     * @param nodeId Node id (inconsistent).
+     */
+    private CompletableFuture<Void> runCleanupOnNode(UUID txId, String nodeId) 
{
+        // Get node id of the sender to send back cleanup requests.
+        String nodeConsistentId = 
clusterNodeResolver.getConsistentIdById(nodeId);
+
+        return nodeConsistentId == null ? nullCompletedFuture() : 
txManager.cleanup(nodeConsistentId, txId);
+    }
+
+    private CompletableFuture<Void> triggerTxRecovery(UUID txId, String 
senderId) {
+        // If the transaction state is pending, then the transaction should be 
rolled back,
+        // meaning that the state is changed to aborted and a corresponding 
cleanup request
+        // is sent in a common durable manner to a partition that have 
initiated recovery.
+        return txManager.finish(
+                        new HybridTimestampTracker(),
+                        replicationGroupId,
+                        false,
+                        Map.of(replicationGroupId, 0L), // term is not 
required for the rollback.
+                        txId
+                )
+                .thenCompose(unused -> runCleanupOnNode(txId, senderId));

Review Comment:
   maybe I don't get something but there is still thenCompose



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to