vldpyatkov commented on code in PR #2751:
URL: https://github.com/apache/ignite-3/pull/2751#discussion_r1379191671


##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -1437,6 +1450,47 @@ private CompletableFuture<Void> finishAndCleanup(
             UUID txId,
             String txCoordinatorId
     ) {
+        TxMeta txMeta = txStateStorage.get(txId);
+
+        // Check that a transaction has already been finished.
+        boolean transactionAlreadyFinished = txMeta != null && 
isFinalState(txMeta.txState());
+
+        // Check locksReleased flag. If it is already set, do nothing and 
return a successful result.
+        // Even if the outcome is different (the transaction was aborted, but 
we want to commit it),
+        // we return 'success' to be in alignment with common transaction 
handling.
+        if (transactionAlreadyFinished) {
+            if (txMeta.locksReleased()) {
+                return completedFuture(null);
+            }
+            // If the locks were not released, we are likely to be in a 
recovery mode and retrying the finish request.
+            // In this case we want to check the expected outcome and the 
actual one.
+            if (commit && txMeta.txState() == ABORTED) {
+                LOG.error("Failed to commit a transaction that is already 
aborted [txId={}].", txId);
+
+                throw new TransactionException(TX_WAS_ABORTED_ERR,
+                        "Failed to change the outcome of a finished 
transaction"
+                                + " [txId=" + txId + ", txState=" + 
txMeta.txState() + "].");
+            }
+            // The transaction has already been finished, but the locks are 
not released.
+            // Waiting for the cleanup to do this.
+            // TODO: There is a risk that nobody is cleaning up this 
transaction.

Review Comment:
   Need to point out a tiket.



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java:
##########
@@ -295,139 +319,252 @@ public void finishFull(HybridTimestampTracker 
timestampTracker, UUID txId, boole
         updateTxMeta(txId, old -> new TxStateMeta(finalState, 
old.txCoordinatorId(), old.commitTimestamp()));
     }
 
+    private @Nullable HybridTimestamp getCommitTimestamp(boolean commit) {
+        return commit ? clock.now() : null;
+    }
+
+    private String coordinatorId() {
+        return localNodeId.get();
+    }
+
+    @Override
+    public CompletableFuture<Void> finishEmpty(boolean commit, UUID txId) {
+        // If there are no enlisted groups, just update local state - we 
already marked the tx as finished.
+        updateTxMeta(txId, old -> coordinatorFinalTxStateMeta(commit, 
getCommitTimestamp(commit)));
+
+        return completedFuture(null);
+    }
+
     @Override
     public CompletableFuture<Void> finish(
             HybridTimestampTracker observableTimestampTracker,
             TablePartitionId commitPartition,
-            ClusterNode recipientNode,
-            Long term,
             boolean commit,
             Map<TablePartitionId, Long> enlistedGroups,
             UUID txId
     ) {
         assert enlistedGroups != null;
+        assert !enlistedGroups.isEmpty() : "No enlisted partitions found";
 
         // Here we put finishing state meta into the local map, so that all 
concurrent operations trying to read tx state
         // with using read timestamp could see that this transaction is 
finishing, see #transactionMetaReadTimestampAware(txId, timestamp).
         // None of them now are able to update node's clock with read 
timestamp and we can create the commit timestamp that is greater
         // than all the read timestamps processed before.
         // Every concurrent operation will now use a finish future from the 
finishing state meta and get only final transaction
         // state after the transaction is finished.
-        TxStateMetaFinishing finishingStateMeta = new 
TxStateMetaFinishing(localNodeId.get());
-        updateTxMeta(txId, old -> finishingStateMeta);
-        HybridTimestamp commitTimestamp = commit ? clock.now() : null;
-
-        // If there are no enlisted groups, just return - we already marked 
the tx as finished.
-        boolean finishRequestNeeded = !enlistedGroups.isEmpty();
-
-        if (!finishRequestNeeded) {
-            updateTxMeta(txId, old -> {
-                TxStateMeta finalStateMeta = 
coordinatorFinalTxStateMeta(commit, commitTimestamp);
-
-                finishingStateMeta.txFinishFuture().complete(finalStateMeta);
-
-                return finalStateMeta;
-            });
+        TxStateMetaFinishing finishingStateMeta = new 
TxStateMetaFinishing(coordinatorId());
 
-            return completedFuture(null);
-        }
+        updateTxMeta(txId, old -> finishingStateMeta);
 
-        Function<Void, CompletableFuture<Void>> clo = ignored -> {
-            // In case of commit it's required to check whether current 
primaries are still the same that were enlisted and whether
-            // given primaries are not expired or, in other words, whether 
commitTimestamp is less or equal to the enlisted primaries
-            // expiration timestamps.
-            CompletableFuture<Void> verificationFuture =
-                    commit ? verifyCommitTimestamp(enlistedGroups, 
commitTimestamp) : completedFuture(null);
-
-            return verificationFuture.handle(
-                    (unused, throwable) -> {
-                        Collection<ReplicationGroupId> replicationGroupIds = 
new HashSet<>(enlistedGroups.keySet());
-
-                        boolean verifiedCommit = throwable == null && commit;
-
-                        TxFinishReplicaRequest req = 
FACTORY.txFinishReplicaRequest()
-                                .txId(txId)
-                                .timestampLong(clock.nowLong())
-                                .groupId(commitPartition)
-                                .groups(replicationGroupIds)
-                                // In case of verification future failure 
transaction will be rolled back.
-                                .commit(verifiedCommit)
-                                
.commitTimestampLong(hybridTimestampToLong(commitTimestamp))
-                                .term(term)
-                                .build();
-
-                        return replicaService.invoke(recipientNode, 
req).thenRun(
-                                () -> {
-                                    updateTxMeta(txId, old -> {
-                                        if (isFinalState(old.txState())) {
-                                            
finishingStateMeta.txFinishFuture().complete(old);
-
-                                            return old;
-                                        }
-
-                                        assert old instanceof 
TxStateMetaFinishing;
-
-                                        TxStateMeta finalTxStateMeta = 
coordinatorFinalTxStateMeta(verifiedCommit, commitTimestamp);
-
-                                        
finishingStateMeta.txFinishFuture().complete(finalTxStateMeta);
-
-                                        return finalTxStateMeta;
-                                    });
-
-                                    if (verifiedCommit) {
-                                        
observableTimestampTracker.update(commitTimestamp);
-                                    }
-                                });
-                    })
-                    .thenCompose(Function.identity())
-                    // verification future is added in order to share proper 
exception with the client
-                    .thenCompose(r -> verificationFuture);
-        };
-
-        AtomicReference<CompletableFuture<Void>> ref = new AtomicReference<>();
+        AtomicBoolean performingFinish = new AtomicBoolean();
         TxContext tuple = txCtxMap.compute(txId, (uuid, tuple0) -> {
             if (tuple0 == null) {
                 tuple0 = new TxContext(); // No writes enlisted.
             }
 
-            if (tuple0.finishFut == null) {
-                tuple0.finishFut = new CompletableFuture<>();
-                ref.set(tuple0.finishFut);
+            if (!tuple0.isTxFinishing()) {

Review Comment:
   Currently, it is impossible to double-finish one transaction. I would 
replace the `if` clause with `assert !tuple0.isTxFinishing()`.



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java:
##########
@@ -295,139 +319,252 @@ public void finishFull(HybridTimestampTracker 
timestampTracker, UUID txId, boole
         updateTxMeta(txId, old -> new TxStateMeta(finalState, 
old.txCoordinatorId(), old.commitTimestamp()));
     }
 
+    private @Nullable HybridTimestamp getCommitTimestamp(boolean commit) {
+        return commit ? clock.now() : null;
+    }
+
+    private String coordinatorId() {
+        return localNodeId.get();
+    }
+
+    @Override
+    public CompletableFuture<Void> finishEmpty(boolean commit, UUID txId) {
+        // If there are no enlisted groups, just update local state - we 
already marked the tx as finished.
+        updateTxMeta(txId, old -> coordinatorFinalTxStateMeta(commit, 
getCommitTimestamp(commit)));
+
+        return completedFuture(null);
+    }
+
     @Override
     public CompletableFuture<Void> finish(
             HybridTimestampTracker observableTimestampTracker,
             TablePartitionId commitPartition,
-            ClusterNode recipientNode,
-            Long term,
             boolean commit,
             Map<TablePartitionId, Long> enlistedGroups,
             UUID txId
     ) {
         assert enlistedGroups != null;
+        assert !enlistedGroups.isEmpty() : "No enlisted partitions found";
 
         // Here we put finishing state meta into the local map, so that all 
concurrent operations trying to read tx state
         // with using read timestamp could see that this transaction is 
finishing, see #transactionMetaReadTimestampAware(txId, timestamp).
         // None of them now are able to update node's clock with read 
timestamp and we can create the commit timestamp that is greater
         // than all the read timestamps processed before.
         // Every concurrent operation will now use a finish future from the 
finishing state meta and get only final transaction
         // state after the transaction is finished.
-        TxStateMetaFinishing finishingStateMeta = new 
TxStateMetaFinishing(localNodeId.get());
-        updateTxMeta(txId, old -> finishingStateMeta);
-        HybridTimestamp commitTimestamp = commit ? clock.now() : null;
-
-        // If there are no enlisted groups, just return - we already marked 
the tx as finished.
-        boolean finishRequestNeeded = !enlistedGroups.isEmpty();
-
-        if (!finishRequestNeeded) {
-            updateTxMeta(txId, old -> {
-                TxStateMeta finalStateMeta = 
coordinatorFinalTxStateMeta(commit, commitTimestamp);
-
-                finishingStateMeta.txFinishFuture().complete(finalStateMeta);
-
-                return finalStateMeta;
-            });
+        TxStateMetaFinishing finishingStateMeta = new 
TxStateMetaFinishing(coordinatorId());
 
-            return completedFuture(null);
-        }
+        updateTxMeta(txId, old -> finishingStateMeta);
 
-        Function<Void, CompletableFuture<Void>> clo = ignored -> {
-            // In case of commit it's required to check whether current 
primaries are still the same that were enlisted and whether
-            // given primaries are not expired or, in other words, whether 
commitTimestamp is less or equal to the enlisted primaries
-            // expiration timestamps.
-            CompletableFuture<Void> verificationFuture =
-                    commit ? verifyCommitTimestamp(enlistedGroups, 
commitTimestamp) : completedFuture(null);
-
-            return verificationFuture.handle(
-                    (unused, throwable) -> {
-                        Collection<ReplicationGroupId> replicationGroupIds = 
new HashSet<>(enlistedGroups.keySet());
-
-                        boolean verifiedCommit = throwable == null && commit;
-
-                        TxFinishReplicaRequest req = 
FACTORY.txFinishReplicaRequest()
-                                .txId(txId)
-                                .timestampLong(clock.nowLong())
-                                .groupId(commitPartition)
-                                .groups(replicationGroupIds)
-                                // In case of verification future failure 
transaction will be rolled back.
-                                .commit(verifiedCommit)
-                                
.commitTimestampLong(hybridTimestampToLong(commitTimestamp))
-                                .term(term)
-                                .build();
-
-                        return replicaService.invoke(recipientNode, 
req).thenRun(
-                                () -> {
-                                    updateTxMeta(txId, old -> {
-                                        if (isFinalState(old.txState())) {
-                                            
finishingStateMeta.txFinishFuture().complete(old);
-
-                                            return old;
-                                        }
-
-                                        assert old instanceof 
TxStateMetaFinishing;
-
-                                        TxStateMeta finalTxStateMeta = 
coordinatorFinalTxStateMeta(verifiedCommit, commitTimestamp);
-
-                                        
finishingStateMeta.txFinishFuture().complete(finalTxStateMeta);
-
-                                        return finalTxStateMeta;
-                                    });
-
-                                    if (verifiedCommit) {
-                                        
observableTimestampTracker.update(commitTimestamp);
-                                    }
-                                });
-                    })
-                    .thenCompose(Function.identity())
-                    // verification future is added in order to share proper 
exception with the client
-                    .thenCompose(r -> verificationFuture);
-        };
-
-        AtomicReference<CompletableFuture<Void>> ref = new AtomicReference<>();
+        AtomicBoolean performingFinish = new AtomicBoolean();
         TxContext tuple = txCtxMap.compute(txId, (uuid, tuple0) -> {
             if (tuple0 == null) {
                 tuple0 = new TxContext(); // No writes enlisted.
             }
 
-            if (tuple0.finishFut == null) {
-                tuple0.finishFut = new CompletableFuture<>();
-                ref.set(tuple0.finishFut);
+            if (!tuple0.isTxFinishing()) {
+                tuple0.finishTx();
+
+                performingFinish.set(true);
             }
 
             return tuple0;
         });
 
-        if (ref.get() != null) { // This is a finishing thread.
-            if (!commit) {
-                clo.apply(null).handle((ignored, err) -> {
+        // This is a finishing thread.
+        if (performingFinish.get()) {
+            Function<Void, CompletableFuture<Void>> finishAction = ignored ->
+                    makeFinishRequest(
+                            observableTimestampTracker,
+                            commitPartition,
+                            commit,
+                            enlistedGroups,
+                            txId,
+                            finishingStateMeta.txFinishFuture()
+                    );
+
+            runFinish(commit, tuple, finishAction);
+        }
+
+        // The method `runFinish` has a side effect on 
`finishInProgressFuture` future, it kicks off another future that will complete 
it.
+        return tuple.finishInProgressFuture;
+    }
+
+    private static void runFinish(boolean commit, TxContext tuple, 
Function<Void, CompletableFuture<Void>> finishAction) {
+        // Wait for commit acks first, then proceed with the finish request.
+        CompletableFuture<Void> finisher = commit ? tuple.waitNoInflights() : 
completedFuture(null);
+
+        finisher
+                .thenCompose(finishAction)
+                .handle((ignored, err) -> {
                     if (err == null) {
-                        tuple.finishFut.complete(null);
+                        tuple.finishInProgressFuture.complete(null);
                     } else {
-                        tuple.finishFut.completeExceptionally(err);
+                        
tuple.finishInProgressFuture.completeExceptionally(err);
                     }
                     return null;
                 });
-            } else {
+    }
 
-                // All inflights have been completed before the finish.
-                if (tuple.inflights == 0) {
-                    tuple.waitRepFut.complete(null);
-                }
+    private CompletableFuture<Void> makeFinishRequest(
+            HybridTimestampTracker observableTimestampTracker,
+            TablePartitionId commitPartition,
+            boolean commit,
+            Map<TablePartitionId, Long> enlistedGroups,
+            UUID txId,
+            CompletableFuture<TransactionMeta> txFinishFuture
+    ) {
+        HybridTimestamp commitTimestamp = getCommitTimestamp(commit);
+        // In case of commit it's required to check whether current primaries 
are still the same that were enlisted and whether
+        // given primaries are not expired or, in other words, whether 
commitTimestamp is less or equal to the enlisted primaries
+        // expiration timestamps.
+        CompletableFuture<Void> verificationFuture =
+                commit ? verifyCommitTimestamp(enlistedGroups, 
commitTimestamp) : completedFuture(null);
+
+        return verificationFuture.handle(
+                        (unused, throwable) -> {
+                            boolean verifiedCommit = throwable == null && 
commit;

Review Comment:
   Here we may get an axception close to that you declared above: 
TX_WAS_ABORTED_ERR.



##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/NodeUtils.java:
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.IntFunction;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.placementdriver.ReplicaMeta;
+import org.apache.ignite.internal.raft.Peer;
+import org.apache.ignite.internal.raft.service.RaftGroupService;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.apache.ignite.table.Tuple;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * A helper class to manipulate Ignite nodes in tests.
+ */
+public class NodeUtils {

Review Comment:
   I see you took this implementation from ItPrimaryReplicaChoiceTest, but 
there this class is not used.
   Either you use NodeUtils in both tests or it is not necessary to create it.



##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/NodeUtils.java:
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.IntFunction;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.placementdriver.ReplicaMeta;
+import org.apache.ignite.internal.raft.Peer;
+import org.apache.ignite.internal.raft.service.RaftGroupService;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.apache.ignite.table.Tuple;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * A helper class to manipulate Ignite nodes in tests.
+ */
+public class NodeUtils {

Review Comment:
   Beside the above, this method is not universal (because it does a specific 
update). I think it have to be marked with TODO which points to IGNITE-20365



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to