vldpyatkov commented on code in PR #4049:
URL: https://github.com/apache/ignite-3/pull/4049#discussion_r1726828974


##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java:
##########
@@ -303,11 +318,50 @@ public void committedGroupConfiguration(byte[] config) {
     }
 
     @Override
-    public void updateLease(long leaseStartTime) {
+    public void updateLease(
+            long leaseStartTime,
+            String primaryReplicaNodeId,
+            String primaryReplicaNodeName
+    ) {
         busy(() -> {
             throwExceptionIfStorageNotInRunnableState();
 
-            updateMeta((lastCheckpointId, meta) -> 
meta.updateLease(lastCheckpointId, leaseStartTime));
+            updateMeta((lastCheckpointId, meta) -> {
+                primaryReplicaMetaReadWriteLock.writeLock().lock();

Review Comment:
   The volatile modifier on the meta variable looks excessive, because we have 
the lock in this patch.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java:
##########
@@ -289,38 +295,40 @@ private UpdateCommandResult 
handleUpdateCommand(UpdateCommand cmd, long commandI
             long storageLeaseStartTime = storage.leaseStartTime();
 
             if (leaseStartTime != storageLeaseStartTime) {
-                return new UpdateCommandResult(false, storageLeaseStartTime);
+                return new UpdateCommandResult(
+                        false,
+                        storageLeaseStartTime,
+                        isPrimaryInGroupTopology()
+                );
             }
         }
 
         UUID txId = cmd.txId();
 
-        // TODO: https://issues.apache.org/jira/browse/IGNITE-20124 Proper 
storage/raft index handling is required.
-        synchronized (safeTime) {
-            if (cmd.safeTime().compareTo(safeTime.current()) > 0) {
-                storageUpdateHandler.handleUpdate(
-                        txId,
-                        cmd.rowUuid(),
-                        cmd.tablePartitionId().asTablePartitionId(),
-                        cmd.rowToUpdate(),
-                        !cmd.full(),
-                        () -> storage.lastApplied(commandIndex, commandTerm),
-                        cmd.full() ? cmd.safeTime() : null,
-                        cmd.lastCommitTimestamp(),
-                        indexIdsAtRwTxBeginTs(catalogService, txId, 
storage.tableId())
-                );
-
-                updateTrackerIgnoringTrackerClosedException(safeTime, 
cmd.safeTime());
-            } else {
-                // We MUST bump information about last updated index+term.
-                // See a comment in #onWrite() for explanation.
-                advanceLastAppliedIndexConsistently(commandIndex, commandTerm);
-            }
+        assert storage.primaryReplicaNodeId() != null;
+        assert localNodeId != null;
+
+        if (cmd.full() || (!cmd.full() && 
!localNodeId.equals(storage.primaryReplicaNodeId()))) {

Review Comment:
   The same as:
   > cmd.full() || !localNodeId.equals(storage.primaryReplicaNodeId()



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java:
##########
@@ -342,36 +350,35 @@ private UpdateCommandResult 
handleUpdateAllCommand(UpdateAllCommand cmd, long co
             long storageLeaseStartTime = storage.leaseStartTime();
 
             if (leaseStartTime != storageLeaseStartTime) {
-                return new UpdateCommandResult(false, storageLeaseStartTime);
+                return new UpdateCommandResult(
+                        false,
+                        storageLeaseStartTime,
+                        isPrimaryInGroupTopology()
+                );
             }
         }
 
         UUID txId = cmd.txId();
 
-        // TODO: https://issues.apache.org/jira/browse/IGNITE-20124 Proper 
storage/raft index handling is required.
-        synchronized (safeTime) {
-            if (cmd.safeTime().compareTo(safeTime.current()) > 0) {
-                storageUpdateHandler.handleUpdateAll(
-                        txId,
-                        cmd.rowsToUpdate(),
-                        cmd.tablePartitionId().asTablePartitionId(),
-                        !cmd.full(),
-                        () -> storage.lastApplied(commandIndex, commandTerm),
-                        cmd.full() ? cmd.safeTime() : null,
-                        indexIdsAtRwTxBeginTs(catalogService, txId, 
storage.tableId())
-                );
-
-                updateTrackerIgnoringTrackerClosedException(safeTime, 
cmd.safeTime());
-            } else {
-                // We MUST bump information about last updated index+term.
-                // See a comment in #onWrite() for explanation.
-                advanceLastAppliedIndexConsistently(commandIndex, commandTerm);
-            }
+        if (cmd.full() || (!cmd.full() && 
!localNodeId.equals(storage.primaryReplicaNodeId()))) {

Review Comment:
   cmd.full() || !localNodeId.equals(storage.primaryReplicaNodeId()



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to