vldpyatkov commented on code in PR #2918:
URL: https://github.com/apache/ignite-3/pull/2918#discussion_r1422393980


##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -1670,11 +1668,6 @@ private CompletableFuture<Void> 
handleChangePendingAssignmentEvent(
 
         Set<Assignment> stableAssignments = 
ByteUtils.fromBytes(stableAssignmentsEntry.value());
 
-        transactionStateResolver.updateAssignment(
-                replicaGrpId,
-                
stableAssignments.stream().filter(Assignment::isPeer).map(Assignment::consistentId).collect(toList())

Review Comment:
   What was it?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -409,15 +411,16 @@ private CompletableFuture<?> durableCleanup(UUID txId, 
TxMeta txMeta) {
                 .thenCompose(f -> f);
     }
 
-    private void markLocksReleased(
-            UUID txId,
-            Collection<TablePartitionId> enlistedPartitions,
-            TxState txState,
-            @Nullable HybridTimestamp commitTimestamp
-    ) {
-        TxMeta newTxMeta = new TxMeta(txState, enlistedPartitions, 
commitTimestamp, true);
+    private void markLocksReleased(UUID txId) {
+        reliableCatalogVersionFor(hybridClock.now()).thenAccept(catalogVersion 
-> {

Review Comment:
   I believe we can avoid waiting for the current timestamp.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -521,8 +524,35 @@ private CompletableFuture<Void> 
processTxRecoveryAction(TxRecoveryMessage reques
 
         LOG.info("Orphan transaction has to be aborted [tx={}].", txId);
 
-        // TODO: IGNITE-20735 Implement initiate recovery handling logic.
-        return nullCompletedFuture();
+        return triggerTxRecovery(txId)
+                .thenApply(v -> null);
+    }
+
+    /**
+     * Starts tx recovery process. Returns the future containing transaction 
meta with its final state that completes
+     * when the recovery is completed.
+     *
+     * @param txId Transaction id.
+     * @return Tx recovery future, or failed future if the tx recovery is not 
possible.
+     */
+    private CompletableFuture<TransactionMeta> triggerTxRecovery(UUID txId) {

Review Comment:
   I would have expected for call txStateMeta.finish() here.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java:
##########
@@ -543,6 +532,56 @@ void handleBuildIndexCommand(BuildIndexCommand cmd, long 
commandIndex, long comm
         }
     }
 
+    /**
+     * Handles the {@link MarkLocksReleasedCommand}.
+     *
+     * @param cmd Command.
+     * @param commandIndex Command index.
+     * @param commandTerm Command term.
+     */
+    private void handleMarkLocksReleasedCommand(MarkLocksReleasedCommand cmd, 
long commandIndex, long commandTerm) {
+        UUID txId = cmd.txId();
+
+        TxMeta txMetaBeforeCas = txStateStorage.get(txId);
+
+        TxMeta txMetaToSet = new TxMeta(
+                txMetaBeforeCas.txState(),
+                txMetaBeforeCas.enlistedPartitions(),
+                txMetaBeforeCas.commitTimestamp(),
+                true
+        );
+
+        boolean txStateChangeRes = txStateStorage.compareAndSet(
+                txId,
+                null,
+                txMetaToSet,
+                commandIndex,
+                commandTerm
+        );
+
+        if (!txStateChangeRes) {
+            onTxStateStorageCasFail(txId, txMetaBeforeCas, txMetaToSet);
+        }
+    }
+
+    private static void onTxStateStorageCasFail(UUID txId, TxMeta 
txMetaBeforeCas, TxMeta txMetaToSet) {
+        UUID traceId = UUID.randomUUID();

Review Comment:
   Use a constructor that does not require the trace id 
`IgniteInternalException#IgniteInternalException(int, java.lang.String)`



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -731,27 +761,70 @@ private CompletableFuture<?> processOperationRequest(
      * @param request Transaction state request.
      * @return Result future.
      */
-    private CompletableFuture<LeaderOrTxState> 
processTxStateCommitPartitionRequest(TxStateCommitPartitionRequest request) {
+    private CompletableFuture<TransactionMeta> 
processTxStateCommitPartitionRequest(TxStateCommitPartitionRequest request) {
         return placementDriver.getPrimaryReplica(replicationGroupId, 
hybridClock.now())
-                .thenCompose(primaryReplicaMeta -> {
-                    if (primaryReplicaMeta == null) {
+                .thenCompose(replicaMeta -> {

Review Comment:
   We should handle TxStateCommitPartitionRequest as it does for any other 
PrimaryReplicaRequest.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java:
##########
@@ -205,6 +207,8 @@ public void onWrite(Iterator<CommandClosure<WriteCommand>> 
iterator) {
                     handleSafeTimeSyncCommand((SafeTimeSyncCommand) command, 
commandIndex, commandTerm);
                 } else if (command instanceof BuildIndexCommand) {
                     handleBuildIndexCommand((BuildIndexCommand) command, 
commandIndex, commandTerm);
+                } else if (command instanceof MarkLocksReleasedCommand) {

Review Comment:
   We have to have one round-trip transaction committed. If we replicate the 
lock release flag, it will double the transaction latency.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -731,27 +760,70 @@ private CompletableFuture<?> processOperationRequest(
      * @param request Transaction state request.
      * @return Result future.
      */
-    private CompletableFuture<LeaderOrTxState> 
processTxStateCommitPartitionRequest(TxStateCommitPartitionRequest request) {
+    private CompletableFuture<TransactionMeta> 
processTxStateCommitPartitionRequest(TxStateCommitPartitionRequest request) {
         return placementDriver.getPrimaryReplica(replicationGroupId, 
hybridClock.now())
-                .thenCompose(primaryReplicaMeta -> {
-                    if (primaryReplicaMeta == null) {
+                .thenCompose(replicaMeta -> {
+                    if (replicaMeta == null || replicaMeta.getLeaseholder() == 
null) {
                         return failedFuture(new 
PrimaryReplicaMissException(localNode.name(), null, null, null, null));
                     }
 
-                    if (!isLocalPeer(primaryReplicaMeta.getLeaseholder())) {
-                        return completedFuture(new 
LeaderOrTxState(primaryReplicaMeta.getLeaseholder(), null));
+                    if (!isLocalPeer(replicaMeta.getLeaseholder())) {
+                        return failedFuture(
+                                new 
PrimaryReplicaMissException(localNode.name(), replicaMeta.getLeaseholder(), 
null, null, null)
+                        );
                     }
 
-                    TransactionMeta txMeta = 
txManager.stateMeta(request.txId());
+                    UUID txId = request.txId();
 
-                    if (txMeta == null) {
-                        txMeta = txStateStorage.get(request.txId());
-                    }
+                    TxStateMeta txMeta = txManager.stateMeta(txId);
 
-                    return completedFuture(new LeaderOrTxState(null, txMeta));
+                    if (txMeta != null && txMeta.txState() == FINISHING) {
+                        assert txMeta instanceof TxStateMetaFinishing : txMeta;
+                        return ((TxStateMetaFinishing) 
txMeta).txFinishFuture();
+                    } else if (txMeta == null || 
!isFinalState(txMeta.txState())) {
+                        // Try to trigger recovery, if needed. If the 
transaction will be aborted, the proper ABORTED state will be sent
+                        // in response.
+                        return 
triggerTxRecoveryOnTxStateResolutionIfNeeded(txId, txMeta);
+                    } else {
+                        assert txMeta != null && 
isFinalState(txMeta.txState()) : txMeta;

Review Comment:
   It is a senseless assertion because it has already been written in the 
condition above.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to