rpuch commented on code in PR #2689:
URL: https://github.com/apache/ignite-3/pull/2689#discussion_r1360447796


##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -246,6 +246,13 @@ public class PartitionReplicaListener implements 
ReplicaListener {
     /** Placement driver. */
     private final PlacementDriver placementDriver;
 
+    /**
+     * Mutex for command processing linearization.
+     * Some actions like update or updateAll requires strict ordering within 
their application to storage on all nodes in replication group.

Review Comment:
   ```suggestion
        * Some actions like update or updateAll require strict ordering within 
their application to storage on all nodes in replication group.
   ```



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -2038,133 +2094,195 @@ private CompletableFuture<Object> 
applyCmdWithExceptionHandling(Command cmd) {
     /**
      * Executes an Update command.
      *
-     * @param cmd Update command.
+     * @param tablePartId {@link TablePartitionId} object.
+     * @param rowUuid Row UUID.
+     * @param row Row.
+     * @param lastCommitTimestamp The timestamp of the last committed entry 
for the row.
+     * @param txId Transaction ID.
+     * @param full {@code True} if this is a full transaction.
+     * @param txCoordinatorId Transaction coordinator id.
+     * @param catalogVersion Validated catalog version associated with given 
operation.
      * @return A local update ready future, possibly having a nested 
replication future as a result for delayed ack purpose.
      */
-    private CompletableFuture<CompletableFuture<?>> 
applyUpdateCommand(UpdateCommand cmd) {
-        if (!cmd.full()) {
-            CompletableFuture<UUID> fut = 
applyCmdWithExceptionHandling(cmd).thenApply(res -> {
-                // This check guaranties the result will never be lost. 
Currently always null.
-                assert res == null : "Replication result is lost";
-
-                // Set context for delayed response.
-                return cmd.txId();
-            });
-
-            storageUpdateHandler.handleUpdate(
-                    cmd.txId(),
-                    cmd.rowUuid(),
-                    cmd.tablePartitionId().asTablePartitionId(),
-                    cmd.row(),
-                    true,
-                    null,
-                    null,
-                    cmd.lastCommitTimestamp());
-
-            // TODO: https://issues.apache.org/jira/browse/IGNITE-20124 tmp
-            synchronized (safeTime) {
-                updateTrackerIgnoringTrackerClosedException(safeTime, 
cmd.safeTime());
-            }
+    private CompletableFuture<CompletableFuture<?>> applyUpdateCommand(
+            TablePartitionId tablePartId,
+            UUID rowUuid,
+            @Nullable BinaryRow row,
+            @Nullable HybridTimestamp lastCommitTimestamp,
+            UUID txId,
+            boolean full,
+            String txCoordinatorId,
+            int catalogVersion
+    ) {
+        synchronized (commandProcessingLinearizationMutex) {
+            UpdateCommand cmd = updateCommand(
+                    tablePartId,
+                    rowUuid,
+                    row,
+                    lastCommitTimestamp,
+                    txId,
+                    full,
+                    txCoordinatorId,
+                    hybridClock.now(),
+                    catalogVersion
+            );
+
+            if (!cmd.full()) {
+                CompletableFuture<UUID> fut = 
applyCmdWithExceptionHandling(cmd).thenApply(res -> {
+                    // This check guaranties the result will never be lost. 
Currently always null.
+                    assert res == null : "Replication result is lost";
 
-            return completedFuture(fut);
-        } else {
-            return applyCmdWithExceptionHandling(cmd).thenApply(res -> {
-                // This check guaranties the result will never be lost. 
Currently always null.
-                assert res == null : "Replication result is lost";
+                    // Set context for delayed response.
+                    return cmd.txId();
+                });
 
-                // Try to avoid double write if an entry is already replicated.
+                // TODO: https://issues.apache.org/jira/browse/IGNITE-20124 tmp

Review Comment:
   `tmp`?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -1776,8 +1788,19 @@ private CompletableFuture<ReplicaResult> 
processMultiEntryAction(ReadWriteMultiR
                         return completedFuture(new ReplicaResult(result, 
null));
                     }
 
-                    return 
validateAtTimestampAndBuildUpdateAllCommand(request, rowIdsToDelete, 
lastCommitTimes, txCoordinatorId)
-                            .thenCompose(cmd -> applyUpdateAllCommand(cmd, 
request.skipDelayedAck()))
+                    return 
validateOperationAgainstSchemaAtTimestamp(request.transactionId())
+                            .thenCompose(
+                                    catalogVersion -> applyUpdateAllCommand(
+                                            rowIdsToDelete,
+                                            lastCommitTimes,
+                                            request.commitPartitionId(),

Review Comment:
   Would it make sense to create an overload of `applyUpdateAllCommand` that 
would accept a `request` and take out commitPartitionId, transactionId, full 
and skipDelayedAck from it? As this seems to always repeat. This would reduce 
duplication and also make it easier to handle new message fields in the future 
(they seem to be added again and again).



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -2213,14 +2331,19 @@ private CompletableFuture<ReplicaResult> 
processSingleEntryAction(ReadWriteSingl
                                     return completedFuture(new 
ReplicaResult(false, request.full() ? null : completedFuture(null)));
                                 }
 
-                                return 
validateAtTimestampAndBuildUpdateCommand(
-                                        request,
-                                        validatedRowId.uuid(),
-                                        null,
-                                        lastCommitTime,
-                                        txCoordinatorId
-                                )
-                                        .thenCompose(this::applyUpdateCommand)
+                                return 
validateOperationAgainstSchemaAtTimestamp(request.transactionId())
+                                        .thenCompose(
+                                                catalogVersion -> 
applyUpdateCommand(
+                                                        
request.commitPartitionId().asTablePartitionId(),

Review Comment:
   Same suggestion about an overload



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -2860,96 +3046,19 @@ private CompletableFuture<Void> 
validateAtTimestamp(UUID txId) {
     }
 
     /**
-     * Chooses operation timestamp, makes validations that require it and 
constructs an {@link UpdateCommand} object.
-     *
-     * @param request Request that is being processed.
-     * @param rowUuid Row UUID.
-     * @param row Row.
-     * @param txCoordinatorId Transaction coordinator id.
-     * @return Future that will complete with the constructed {@link 
UpdateCommand} object.
-     */
-    private CompletableFuture<UpdateCommand> 
validateAtTimestampAndBuildUpdateCommand(
-            ReadWriteSingleRowReplicaRequest request,
-            UUID rowUuid,
-            @Nullable BinaryRow row,
-            @Nullable HybridTimestamp lastCommitTimestamp,
-            String txCoordinatorId
-    ) {
-        return validateAtTimestampAndBuildUpdateCommand(
-                request.commitPartitionId().asTablePartitionId(),
-                rowUuid,
-                row,
-                lastCommitTimestamp,
-                request.transactionId(),
-                request.full(),
-                txCoordinatorId
-        );
-    }
-
-    /**
-     * Chooses operation timestamp, makes validations that require it and 
constructs an {@link UpdateCommand} object.
+     * Chooses operation timestamp and makes schema related validations.

Review Comment:
   ```suggestion
        * Chooses operation timestamp and makes schema related validations. The 
operation timestamp is only used for validation, it is NOT sent as safeTime 
timestamp.
   ```



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -2038,133 +2094,195 @@ private CompletableFuture<Object> 
applyCmdWithExceptionHandling(Command cmd) {
     /**
      * Executes an Update command.
      *
-     * @param cmd Update command.
+     * @param tablePartId {@link TablePartitionId} object.
+     * @param rowUuid Row UUID.
+     * @param row Row.
+     * @param lastCommitTimestamp The timestamp of the last committed entry 
for the row.
+     * @param txId Transaction ID.
+     * @param full {@code True} if this is a full transaction.
+     * @param txCoordinatorId Transaction coordinator id.
+     * @param catalogVersion Validated catalog version associated with given 
operation.
      * @return A local update ready future, possibly having a nested 
replication future as a result for delayed ack purpose.
      */
-    private CompletableFuture<CompletableFuture<?>> 
applyUpdateCommand(UpdateCommand cmd) {
-        if (!cmd.full()) {
-            CompletableFuture<UUID> fut = 
applyCmdWithExceptionHandling(cmd).thenApply(res -> {
-                // This check guaranties the result will never be lost. 
Currently always null.
-                assert res == null : "Replication result is lost";
-
-                // Set context for delayed response.
-                return cmd.txId();
-            });
-
-            storageUpdateHandler.handleUpdate(
-                    cmd.txId(),
-                    cmd.rowUuid(),
-                    cmd.tablePartitionId().asTablePartitionId(),
-                    cmd.row(),
-                    true,
-                    null,
-                    null,
-                    cmd.lastCommitTimestamp());
-
-            // TODO: https://issues.apache.org/jira/browse/IGNITE-20124 tmp
-            synchronized (safeTime) {
-                updateTrackerIgnoringTrackerClosedException(safeTime, 
cmd.safeTime());
-            }
+    private CompletableFuture<CompletableFuture<?>> applyUpdateCommand(
+            TablePartitionId tablePartId,
+            UUID rowUuid,
+            @Nullable BinaryRow row,
+            @Nullable HybridTimestamp lastCommitTimestamp,
+            UUID txId,
+            boolean full,
+            String txCoordinatorId,
+            int catalogVersion
+    ) {
+        synchronized (commandProcessingLinearizationMutex) {
+            UpdateCommand cmd = updateCommand(
+                    tablePartId,
+                    rowUuid,
+                    row,
+                    lastCommitTimestamp,
+                    txId,
+                    full,
+                    txCoordinatorId,
+                    hybridClock.now(),
+                    catalogVersion
+            );
+
+            if (!cmd.full()) {
+                CompletableFuture<UUID> fut = 
applyCmdWithExceptionHandling(cmd).thenApply(res -> {
+                    // This check guaranties the result will never be lost. 
Currently always null.
+                    assert res == null : "Replication result is lost";
 
-            return completedFuture(fut);
-        } else {
-            return applyCmdWithExceptionHandling(cmd).thenApply(res -> {
-                // This check guaranties the result will never be lost. 
Currently always null.
-                assert res == null : "Replication result is lost";
+                    // Set context for delayed response.
+                    return cmd.txId();
+                });
 
-                // Try to avoid double write if an entry is already replicated.
+                // TODO: https://issues.apache.org/jira/browse/IGNITE-20124 tmp
                 synchronized (safeTime) {
                     if (cmd.safeTime().compareTo(safeTime.current()) > 0) {
                         storageUpdateHandler.handleUpdate(
                                 cmd.txId(),
                                 cmd.rowUuid(),
                                 cmd.tablePartitionId().asTablePartitionId(),
                                 cmd.row(),
-                                false,
+                                true,
+                                null,
                                 null,
-                                cmd.safeTime(),
                                 cmd.lastCommitTimestamp());
 
                         updateTrackerIgnoringTrackerClosedException(safeTime, 
cmd.safeTime());
                     }
                 }
 
-                return null;
-            });
+                return completedFuture(fut);
+            } else {
+                return applyCmdWithExceptionHandling(cmd).thenApply(res -> {
+                    // This check guaranties the result will never be lost. 
Currently always null.
+                    assert res == null : "Replication result is lost";
+
+                    // TODO: 
https://issues.apache.org/jira/browse/IGNITE-20124 tmp

Review Comment:
   tmp?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -2860,96 +3046,19 @@ private CompletableFuture<Void> 
validateAtTimestamp(UUID txId) {
     }
 
     /**
-     * Chooses operation timestamp, makes validations that require it and 
constructs an {@link UpdateCommand} object.
-     *
-     * @param request Request that is being processed.
-     * @param rowUuid Row UUID.
-     * @param row Row.
-     * @param txCoordinatorId Transaction coordinator id.
-     * @return Future that will complete with the constructed {@link 
UpdateCommand} object.
-     */
-    private CompletableFuture<UpdateCommand> 
validateAtTimestampAndBuildUpdateCommand(
-            ReadWriteSingleRowReplicaRequest request,
-            UUID rowUuid,
-            @Nullable BinaryRow row,
-            @Nullable HybridTimestamp lastCommitTimestamp,
-            String txCoordinatorId
-    ) {
-        return validateAtTimestampAndBuildUpdateCommand(
-                request.commitPartitionId().asTablePartitionId(),
-                rowUuid,
-                row,
-                lastCommitTimestamp,
-                request.transactionId(),
-                request.full(),
-                txCoordinatorId
-        );
-    }
-
-    /**
-     * Chooses operation timestamp, makes validations that require it and 
constructs an {@link UpdateCommand} object.
+     * Chooses operation timestamp and makes schema related validations.
      *
-     * @param request Request that is being processed.
-     * @param rowUuid Row UUID.
-     * @param row Row.
-     * @param txCoordinatorId Transaction coordinator id.
-     * @return Future that will complete with the constructed {@link 
UpdateCommand} object.
-     */
-    private CompletableFuture<UpdateCommand> 
validateAtTimestampAndBuildUpdateCommand(
-            ReadWriteSingleRowPkReplicaRequest request,
-            UUID rowUuid,
-            @Nullable BinaryRow row,
-            @Nullable HybridTimestamp lastCommitTimestamp,
-            String txCoordinatorId
-    ) {
-        return validateAtTimestampAndBuildUpdateCommand(
-                request.commitPartitionId().asTablePartitionId(),
-                rowUuid,
-                row,
-                lastCommitTimestamp,
-                request.transactionId(),
-                request.full(),
-                txCoordinatorId
-        );
-    }
-
-    /**
-     * Chooses operation timestamp, makes validations that require it and 
constructs an {@link UpdateCommand} object.
-     *
-     * @param tablePartId {@link TablePartitionId} object.
-     * @param rowUuid Row UUID.
-     * @param row Row.
      * @param txId Transaction ID.
-     * @param full {@code True} if this is a full transaction.
-     * @param txCoordinatorId Transaction coordinator id.
-     * @return Future that will complete with the constructed {@link 
UpdateCommand} object.
+     * @return Future that will complete with catalog version associated with 
given operation though the operation timestamp.
      */
-    private CompletableFuture<UpdateCommand> 
validateAtTimestampAndBuildUpdateCommand(
-            TablePartitionId tablePartId,
-            UUID rowUuid,
-            @Nullable BinaryRow row,
-            @Nullable HybridTimestamp lastCommitTimestamp,
-            UUID txId,
-            boolean full,
-            String txCoordinatorId
-    ) {
+    private CompletableFuture<Integer> 
validateOperationAgainstSchemaAtTimestamp(UUID txId) {

Review Comment:
   It seems that now it makes no sense to have `AtTimestamp` in the name as we 
don't pass any timestamp as a parameter. How about 
`validateOperationAgainstSchemaNow()` (or just 
`validateOperationAgainstSchema()`)?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to