rpuch commented on code in PR #2820:
URL: https://github.com/apache/ignite-3/pull/2820#discussion_r1389031580


##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -1587,34 +1594,53 @@ private CompletableFuture<Object> finishTransaction(
         HybridTimestamp tsForCatalogVersion = commit ? commitTimestamp : 
hybridClock.now();
 
         return reliableCatalogVersionFor(tsForCatalogVersion)
-                .thenCompose(catalogVersion -> {
-                    synchronized (commandProcessingLinearizationMutex) {
-                        FinishTxCommandBuilder finishTxCmdBldr = 
MSG_FACTORY.finishTxCommand()
-                                .txId(txId)
-                                .commit(commit)
-                                .safeTimeLong(hybridClock.nowLong())
-                                .txCoordinatorId(txCoordinatorId)
-                                .requiredCatalogVersion(catalogVersion)
-                                .tablePartitionIds(
-                                        aggregatedGroupIds.stream()
-                                                
.map(PartitionReplicaListener::tablePartitionId)
-                                                .collect(toList())
-                                );
-
-                        if (commit) {
-                            
finishTxCmdBldr.commitTimestampLong(commitTimestamp.longValue());
-                        }
-
-                        return raftClient.run(finishTxCmdBldr.build());
-                    }
-                })
+                .thenCompose(catalogVersion -> applyFinishCommand(
+                                txId,
+                                commit,
+                                commitTimestamp,
+                                txCoordinatorId,
+                                catalogVersion,
+                                aggregatedGroupIds.stream()
+                                        
.map(PartitionReplicaListener::tablePartitionId)
+                                        .collect(toList())
+                        )
+                )
                 .whenComplete((o, throwable) -> {
                     TxState txState = commit ? COMMITED : ABORTED;
 
                     markFinished(txId, txState, commitTimestamp);
                 });
     }
 
+    private CompletableFuture<Object> applyFinishCommand(
+            UUID transactionId,
+            boolean commit,
+            HybridTimestamp commitTimestamp,
+            String txCoordinatorId,
+            int catalogVersion,
+            List<TablePartitionIdMessage> tablePartitionIds
+    ) {
+        // TODO: synchonized should go here. Not within finishTransaction but 
with all apply command.

Review Comment:
   Please reference a JIRA issue in the TODO



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java:
##########
@@ -99,6 +101,15 @@ public class PartitionListener implements RaftGroupListener 
{
     /** Storage index tracker. */
     private final PendingComparableValuesTracker<Long, Void> 
storageIndexTracker;
 
+    // TODO: https://issues.apache.org/jira/browse/IGNITE-20826 Restore on 
restart
+    /** Is used in order to detect and retry safe time reordering within 
onBeforeApply. */
+    private long maxObservableSafeTime = -1;
+
+    // TODO: https://issues.apache.org/jira/browse/IGNITE-20826 Restore on 
restart
+    /** Is used in order to assert safe time reordering within onWrite. */
+    private long maxObservableSafeTimeVerificator = -1;

Review Comment:
   verificator -> verifier?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -2383,6 +2424,35 @@ private CompletableFuture<Object> 
applyCmdWithExceptionHandling(Command cmd) {
         });
     }
 
+    private void applyCmdWithRetryOnSafeTimeReorderException(Command cmd, 
CompletableFuture<Object> resultFuture) {
+        raftClient.run(cmd).whenComplete((res, ex) -> {
+            if (ex != null) {
+                if (ex instanceof SafeTimeReorderException || ex.getCause() 
instanceof SafeTimeReorderException) {
+                    assert cmd instanceof SafeTimePropagatingCommand;
+
+                    SafeTimePropagatingCommand safeTimePropagatingCommand = 
(SafeTimePropagatingCommand) cmd;
+
+                    HybridTimestamp safeTimeForRetry = hybridClock.now();
+
+                    if ((cmd instanceof UpdateCommand && !((UpdateCommand) 
cmd).full())
+                            || (cmd instanceof UpdateAllCommand && 
!((UpdateAllCommand) cmd).full())) {
+                        synchronized (safeTime) {
+                            
updateTrackerIgnoringTrackerClosedException(safeTime, safeTimeForRetry);
+                        }
+                    }
+
+                    
safeTimePropagatingCommand.safeTimeLong(safeTimeForRetry.longValue());
+
+                    
applyCmdWithRetryOnSafeTimeReorderException(safeTimePropagatingCommand, 
resultFuture);

Review Comment:
   Is it possible to add some kind of a safeguard to check/assert that we are 
not retrying infinitely due to some bug?



##########
modules/raft/src/main/java/org/apache/ignite/raft/jraft/error/RaftError.java:
##########
@@ -238,7 +238,12 @@ public enum RaftError {
     /**
      * Permission denied
      */
-    EACCES(1016);
+    EACCES(1016),
+
+    /**
+     * Command reordering detected
+     */
+    EREORDER(1017);

Review Comment:
   If we merge JRaft improvements from dora-jraft, it might turn out that they 
added a new code, and it will conflict with our addition. Is it possible to 
leave a gap? Like start our specific codes with 2000?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -2420,34 +2490,40 @@ private CompletableFuture<CompletableFuture<?>> 
applyUpdateCommand(
             );
 
             if (!cmd.full()) {
-                CompletableFuture<UUID> fut = 
applyCmdWithExceptionHandling(cmd).thenApply(res -> {
+                // TODO: https://issues.apache.org/jira/browse/IGNITE-20124 
Temporary code below
+                synchronized (safeTime) {
+                    storageUpdateHandler.handleUpdate(

Review Comment:
   Why is it that for non-full commands, we apply the command on the Primary 
first, and only then send the Raft command, but for full commands, the order is 
opposite?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -2371,8 +2410,10 @@ private static <T> boolean allElementsAreNull(List<T> 
list) {
      * @param cmd Raft command.
      * @return Raft future.
      */
-    private CompletableFuture<Object> applyCmdWithExceptionHandling(Command 
cmd) {
-        return raftClient.run(cmd).exceptionally(throwable -> {
+    private void applyCmdWithExceptionHandling(Command cmd, 
CompletableFuture<Object> resultFuture) {
+        applyCmdWithRetryOnSafeTimeReorderException(cmd, resultFuture);
+
+        resultFuture.exceptionally(throwable -> {

Review Comment:
   Also this probably means that there are no tests that make sure that 
timeouts are wrapped in `ReplicationTimeoutException`s (otherwise, they would 
have failed)



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -2371,8 +2410,10 @@ private static <T> boolean allElementsAreNull(List<T> 
list) {
      * @param cmd Raft command.
      * @return Raft future.
      */
-    private CompletableFuture<Object> applyCmdWithExceptionHandling(Command 
cmd) {
-        return raftClient.run(cmd).exceptionally(throwable -> {
+    private void applyCmdWithExceptionHandling(Command cmd, 
CompletableFuture<Object> resultFuture) {
+        applyCmdWithRetryOnSafeTimeReorderException(cmd, resultFuture);
+
+        resultFuture.exceptionally(throwable -> {

Review Comment:
   Here, a new stage is added *on top* of `resultFuture`, but this new stage is 
never returned. It can only 'do something' using its side effects, but it has 
none. This doesn't look right.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -1677,32 +1703,45 @@ private CompletableFuture<Void> 
processTxCleanupAction(TxCleanupReplicaRequest r
 
             return reliableCatalogVersionFor(commandTimestamp)
                     .thenCompose(catalogVersion -> {
-                        synchronized (commandProcessingLinearizationMutex) {
-                            TxCleanupCommand txCleanupCmd = 
MSG_FACTORY.txCleanupCommand()
-                                    .txId(request.txId())
-                                    .commit(request.commit())
-                                    
.commitTimestampLong(request.commitTimestampLong())
-                                    .safeTimeLong(hybridClock.nowLong())
-                                    
.txCoordinatorId(getTxCoordinatorId(request.txId()))
-                                    .requiredCatalogVersion(catalogVersion)
-                                    .build();
-
-                            
storageUpdateHandler.handleTransactionCleanup(request.txId(), request.commit(), 
request.commitTimestamp());
-
-                            raftClient.run(txCleanupCmd)
-                                    .exceptionally(e -> {
-                                        LOG.warn("Failed to complete 
transaction cleanup command [txId=" + request.txId() + ']', e);
-
-                                        return completedFuture(null);
-                                    });
-                        }
+                        applyCleanupCommand(
+                                request.txId(),
+                                request.commit(),
+                                request.commitTimestamp(),
+                                request.commitTimestampLong(),
+                                catalogVersion
+                        );
 
                         return allOffFuturesExceptionIgnored(txReadFutures, 
request)
                                 .thenRun(() -> releaseTxLocks(request.txId()));
                     });
         });
     }
 
+    private CompletableFuture<Void> applyCleanupCommand(
+            UUID transactionId,
+            boolean commit,
+            HybridTimestamp commitTimestamp,
+            long commitTimestampLong,
+            int catalogVersion
+    ) {
+        TxCleanupCommand txCleanupCmd = MSG_FACTORY.txCleanupCommand()
+                .txId(transactionId)
+                .commit(commit)
+                .commitTimestampLong(commitTimestampLong)
+                .safeTimeLong(hybridClock.nowLong())
+                .txCoordinatorId(getTxCoordinatorId(transactionId))
+                .requiredCatalogVersion(catalogVersion)
+                .build();
+
+        storageUpdateHandler.handleTransactionCleanup(transactionId, commit, 
commitTimestamp);
+
+        CompletableFuture<Object> resultFuture = new CompletableFuture<>();
+
+        applyCmdWithRetryOnSafeTimeReorderException(txCleanupCmd, 
resultFuture);
+
+        return resultFuture.thenApply(res -> null);

Review Comment:
   Code 'before this change' had `.exceptionally()` to mute an exception 
happing during cleanup application, now there is no such muting. Is this 
intentional?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -2420,34 +2490,40 @@ private CompletableFuture<CompletableFuture<?>> 
applyUpdateCommand(
             );
 
             if (!cmd.full()) {
-                CompletableFuture<UUID> fut = 
applyCmdWithExceptionHandling(cmd).thenApply(res -> {
+                // TODO: https://issues.apache.org/jira/browse/IGNITE-20124 
Temporary code below
+                synchronized (safeTime) {
+                    storageUpdateHandler.handleUpdate(
+                            cmd.txId(),
+                            cmd.rowUuid(),
+                            cmd.tablePartitionId().asTablePartitionId(),
+                            cmd.rowToUpdate(),
+                            true,
+                            null,
+                            null,
+                            null);
+
+                    updateTrackerIgnoringTrackerClosedException(safeTime, 
cmd.safeTime());
+                }
+
+                CompletableFuture<Object> resultFuture = new 
CompletableFuture<>();
+
+                applyCmdWithExceptionHandling(cmd, resultFuture);

Review Comment:
   So we just applied the command on the Primary, and then we send it to Raft 
with the same SafeTime? Will not this mean that the command will be rejected 
with probability 100%?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -2383,6 +2424,35 @@ private CompletableFuture<Object> 
applyCmdWithExceptionHandling(Command cmd) {
         });
     }
 
+    private void applyCmdWithRetryOnSafeTimeReorderException(Command cmd, 
CompletableFuture<Object> resultFuture) {
+        raftClient.run(cmd).whenComplete((res, ex) -> {
+            if (ex != null) {
+                if (ex instanceof SafeTimeReorderException || ex.getCause() 
instanceof SafeTimeReorderException) {
+                    assert cmd instanceof SafeTimePropagatingCommand;
+
+                    SafeTimePropagatingCommand safeTimePropagatingCommand = 
(SafeTimePropagatingCommand) cmd;
+
+                    HybridTimestamp safeTimeForRetry = hybridClock.now();
+
+                    if ((cmd instanceof UpdateCommand && !((UpdateCommand) 
cmd).full())

Review Comment:
   Also, this check seems to be fragile. What if we add an analogous request in 
the future, but forget to mention it here?
   
   How about adding a marker interface common for both update commands? It 
might be more difficult to forget about it when adding a third 'update' command 
in the future.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -2383,6 +2424,35 @@ private CompletableFuture<Object> 
applyCmdWithExceptionHandling(Command cmd) {
         });
     }
 
+    private void applyCmdWithRetryOnSafeTimeReorderException(Command cmd, 
CompletableFuture<Object> resultFuture) {
+        raftClient.run(cmd).whenComplete((res, ex) -> {
+            if (ex != null) {
+                if (ex instanceof SafeTimeReorderException || ex.getCause() 
instanceof SafeTimeReorderException) {
+                    assert cmd instanceof SafeTimePropagatingCommand;

Review Comment:
   This line probably means that the type of the parameter can be safely 
changed to `SafeTimePropagatingCommand`, making both the assertion and the 
following cast obsolete



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java:
##########
@@ -455,6 +478,21 @@ public void onShutdown() {
         storage.close();
     }
 
+    @Override
+    public void onBeforeApply(Command command) {
+        // Given method is is synchronized by replication group specific 
monitor, see ActionRequestProcessor#handleRequest.

Review Comment:
   ```suggestion
           // This method is synchronized by replication group specific 
monitor, see ActionRequestProcessor#handleRequest.
   ```



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -2383,6 +2424,35 @@ private CompletableFuture<Object> 
applyCmdWithExceptionHandling(Command cmd) {
         });
     }
 
+    private void applyCmdWithRetryOnSafeTimeReorderException(Command cmd, 
CompletableFuture<Object> resultFuture) {
+        raftClient.run(cmd).whenComplete((res, ex) -> {
+            if (ex != null) {
+                if (ex instanceof SafeTimeReorderException || ex.getCause() 
instanceof SafeTimeReorderException) {
+                    assert cmd instanceof SafeTimePropagatingCommand;
+
+                    SafeTimePropagatingCommand safeTimePropagatingCommand = 
(SafeTimePropagatingCommand) cmd;
+
+                    HybridTimestamp safeTimeForRetry = hybridClock.now();
+
+                    if ((cmd instanceof UpdateCommand && !((UpdateCommand) 
cmd).full())

Review Comment:
   Why is this condition needed? Could you please add a comment explaining this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to