denis-chudov commented on code in PR #4049:
URL: https://github.com/apache/ignite-3/pull/4049#discussion_r1724588128


##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/StoragePartitionMetaIo.java:
##########
@@ -50,6 +50,10 @@ public class StoragePartitionMetaIo extends PartitionMetaIo {
 
     private static final int ESTIMATED_SIZE_OFF = LEASE_START_TIME_OFF + 
Long.BYTES;
 
+    private static final int PRIMARY_REPLICA_NODE_ID_FIRST_PAGE_ID_OFF = 
ESTIMATED_SIZE_OFF + Long.BYTES;

Review Comment:
   As I understand, ESTIMATED_SIZE_OFF is the total estimated size of the meta, 
so it should be based on the offset of the last value in the page. 
PRIMARY_REPLICA_NODE_ID_FIRST_PAGE_ID_OFF in this case should be calculated 
from LEASE_START_TIME_OFF



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java:
##########
@@ -303,11 +318,50 @@ public void committedGroupConfiguration(byte[] config) {
     }
 
     @Override
-    public void updateLease(long leaseStartTime) {
+    public void updateLease(
+            long leaseStartTime,
+            String primaryReplicaNodeId,
+            String primaryReplicaNodeName
+    ) {
         busy(() -> {
             throwExceptionIfStorageNotInRunnableState();
 
-            updateMeta((lastCheckpointId, meta) -> 
meta.updateLease(lastCheckpointId, leaseStartTime));
+            updateMeta((lastCheckpointId, meta) -> {
+                primaryReplicaMetaReadWriteLock.writeLock().lock();
+                try {
+                    if (leaseStartTime <= meta.leaseStartTime()) {
+                        return;
+                    }
+
+                    if (meta.primaryReplicaNodeIdFirstPageId() == 
BlobStorage.NO_PAGE_ID) {
+                        long primaryReplicaNodeIdFirstPageId = 
blobStorage.addBlob(stringToBytes(primaryReplicaNodeId));
+
+                        meta.primaryReplicaNodeIdFirstPageId(lastCheckpointId, 
primaryReplicaNodeIdFirstPageId);
+                    } else {
+                        
blobStorage.updateBlob(meta.primaryReplicaNodeIdFirstPageId(), 
stringToBytes(primaryReplicaNodeId));
+                    }

Review Comment:
   Maybe it would be better to add method like `upsertBlob`



##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java:
##########
@@ -1032,14 +1053,29 @@ public void updateLease(long leaseStartTime) {
             AbstractWriteBatch writeBatch = 
PartitionDataHelper.requireWriteBatch();
 
             try {
-                byte[] leaseBytes = new byte[Long.BYTES];
+                ByteArrayOutputStream outputStream = new 
ByteArrayOutputStream();
+                outputStream.write(longToBytes(leaseStartTime));
+
+                byte[] primaryReplicaNodeIdBytes = 
stringToBytes(primaryReplicaNodeId);
+                assert primaryReplicaNodeIdBytes.length < Byte.MAX_VALUE :
+                        format("Primary replica node id bytes length exceeds 
the limit [length={}].",
+                                primaryReplicaNodeIdBytes.length);
+                outputStream.write((byte) primaryReplicaNodeIdBytes.length);
+                outputStream.write(primaryReplicaNodeIdBytes);
 
-                putLongToBytes(leaseStartTime, leaseBytes, 0);
+                byte[] primaryReplicaNodeNameBytes = 
stringToBytes(primaryReplicaNodeName);
+                assert primaryReplicaNodeNameBytes.length < Byte.MAX_VALUE;

Review Comment:
   looks like newly introduced limitation, should it be documented?
   Now I can successfully create a cluster of nodes having 127 characters in 
their names



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java:
##########
@@ -322,6 +376,72 @@ public long leaseStartTime() {
         });
     }
 
+    @Override
+    public @Nullable String primaryReplicaNodeId() {
+        return busy(() -> {
+            throwExceptionIfStorageNotInRunnableState();
+
+            try {
+                primaryReplicaMetaReadWriteLock.readLock().lock();
+
+                try {
+                    if (primaryReplicaNodeId == null) {
+                        long primaryReplicaNodeIdFirstPageId = 
meta.primaryReplicaNodeIdFirstPageId();
+
+                        // It's possible to face BlobStorage.NO_PAGE_ID if a 
lease information has not yet been recorded in storage,
+                        // for example, if the lease itself has not yet been 
elected.
+                        if (primaryReplicaNodeIdFirstPageId != 
BlobStorage.NO_PAGE_ID) {
+                            primaryReplicaNodeId = 
ByteUtils.stringFromBytes(blobStorage.readBlob(primaryReplicaNodeIdFirstPageId));
+                        }
+                    }
+
+                    return primaryReplicaNodeId;
+                } finally {
+                    primaryReplicaMetaReadWriteLock.readLock().unlock();
+                }
+            } catch (IgniteInternalCheckedException e) {
+                throw new StorageException(
+                        "Failed to read primary replica node id: [tableId={}, 
partitionId={}]",
+                        e,
+                        tableStorage.getTableId(), partitionId
+                );
+            }
+        });
+    }
+
+    @Override
+    public @Nullable String primaryReplicaNodeName() {
+        return busy(() -> {
+            throwExceptionIfStorageNotInRunnableState();
+
+            try {
+                primaryReplicaMetaReadWriteLock.readLock().lock();
+
+                try {
+                    if (primaryReplicaNodeName == null) {
+                        long primaryReplicaNodeNameFirstPageId = 
meta.primaryReplicaNodeNameFirstPageId();
+
+                        // It's possible to face BlobStorage.NO_PAGE_ID if a 
lease information has not yet been recorded in storage,
+                        // for example, if the lease itself has not yet been 
elected.
+                        if (primaryReplicaNodeNameFirstPageId != 
BlobStorage.NO_PAGE_ID) {
+                            primaryReplicaNodeName = 
ByteUtils.stringFromBytes(blobStorage.readBlob(primaryReplicaNodeNameFirstPageId));
+                        }
+                    }
+
+                    return primaryReplicaNodeName;
+                } finally {
+                    primaryReplicaMetaReadWriteLock.readLock().unlock();
+                }
+            } catch (IgniteInternalCheckedException e) {
+                throw new StorageException(
+                        "Failed to read primary replica node id: [tableId={}, 
partitionId={}]",

Review Comment:
   Node name should be here.
   Did you consider the option of writing a wrapper method for this logic of 
reading string from storage? These two methods are almost the same



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java:
##########
@@ -289,38 +295,40 @@ private UpdateCommandResult 
handleUpdateCommand(UpdateCommand cmd, long commandI
             long storageLeaseStartTime = storage.leaseStartTime();
 
             if (leaseStartTime != storageLeaseStartTime) {
-                return new UpdateCommandResult(false, storageLeaseStartTime);
+                return new UpdateCommandResult(
+                        false,
+                        storageLeaseStartTime,
+                        isPrimaryInGroupTopology()
+                );
             }
         }
 
         UUID txId = cmd.txId();
 
-        // TODO: https://issues.apache.org/jira/browse/IGNITE-20124 Proper 
storage/raft index handling is required.
-        synchronized (safeTime) {
-            if (cmd.safeTime().compareTo(safeTime.current()) > 0) {
-                storageUpdateHandler.handleUpdate(
-                        txId,
-                        cmd.rowUuid(),
-                        cmd.tablePartitionId().asTablePartitionId(),
-                        cmd.rowToUpdate(),
-                        !cmd.full(),
-                        () -> storage.lastApplied(commandIndex, commandTerm),
-                        cmd.full() ? cmd.safeTime() : null,
-                        cmd.lastCommitTimestamp(),
-                        indexIdsAtRwTxBeginTs(catalogService, txId, 
storage.tableId())
-                );
-
-                updateTrackerIgnoringTrackerClosedException(safeTime, 
cmd.safeTime());
-            } else {
-                // We MUST bump information about last updated index+term.
-                // See a comment in #onWrite() for explanation.
-                advanceLastAppliedIndexConsistently(commandIndex, commandTerm);
-            }
+        assert storage.primaryReplicaNodeId() != null;
+        assert localNodeId != null;
+
+        if (cmd.full() || (!cmd.full() && 
!localNodeId.equals(storage.primaryReplicaNodeId()))) {
+            storageUpdateHandler.handleUpdate(
+                    txId,
+                    cmd.rowUuid(),
+                    cmd.tablePartitionId().asTablePartitionId(),
+                    cmd.rowToUpdate(),
+                    !cmd.full(),
+                    () -> storage.lastApplied(commandIndex, commandTerm),
+                    cmd.full() ? cmd.safeTime() : null,
+                    cmd.lastCommitTimestamp(),
+                    indexIdsAtRwTxBeginTs(catalogService, txId, 
storage.tableId())
+            );
+        } else {
+            // We MUST bump information about last updated index+term.
+            // See a comment in #onWrite() for explanation.

Review Comment:
   Please also add the explanation here: if we are in this branch, then the 
local replica is primary and its storage is already updated before replication 
command.
   Now it can look extremely tricky for anyone who is not in context of tx 
protocol



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -2770,37 +2748,33 @@ private CompletableFuture<CompletableFuture<?>> 
applyUpdateCommand(
 
                 applyCmdWithExceptionHandling(cmd, resultFuture);
 
-                return resultFuture.thenApply(res -> {
+                return resultFuture.thenCompose(res -> {
                     UpdateCommandResult updateCommandResult = 
(UpdateCommandResult) res;
 
                     if (full && updateCommandResult != null && 
!updateCommandResult.isPrimaryReplicaMatch()) {
                         throw new PrimaryReplicaMissException(txId, 
cmd.leaseStartTime(), updateCommandResult.currentLeaseStartTime());
                     }
 
-                    // TODO: 
https://issues.apache.org/jira/browse/IGNITE-20124 Temporary code below
-                    // Try to avoid double write if an entry is already 
replicated.
-                    synchronized (safeTime) {
-                        if (cmd.safeTime().compareTo(safeTime.current()) > 0) {
-                            if 
(!IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_SKIP_STORAGE_UPDATE_IN_BENCHMARK))
 {
-                                // We don't need to take the partition 
snapshots read lock, see #INTERNAL_DOC_PLACEHOLDER why.
-                                storageUpdateHandler.handleUpdate(
-                                        cmd.txId(),
-                                        cmd.rowUuid(),
-                                        
cmd.tablePartitionId().asTablePartitionId(),
-                                        cmd.rowToUpdate(),
-                                        false,
-                                        null,
-                                        cmd.safeTime(),
-                                        null,
-                                        indexIdsAtRwTxBeginTs(txId)
-                                );
-                            }
-
-                            
updateTrackerIgnoringTrackerClosedException(safeTime, cmd.safeTime());
+                    if (updateCommandResult.isPrimaryInPeersAndLearners()) {
+                        return 
safeTime.waitFor(cmd.safeTime()).thenApply(ignored -> null);
+                    } else {
+                        if 
(!IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_SKIP_STORAGE_UPDATE_IN_BENCHMARK))
 {
+                            // We don't need to take the partition snapshots 
read lock, see #INTERNAL_DOC_PLACEHOLDER why.
+                            storageUpdateHandler.handleUpdate(
+                                    cmd.txId(),
+                                    cmd.rowUuid(),
+                                    
cmd.tablePartitionId().asTablePartitionId(),
+                                    cmd.rowToUpdate(),
+                                    false,
+                                    null,
+                                    cmd.safeTime(),
+                                    null,
+                                    indexIdsAtRwTxBeginTs(txId)
+                            );
                         }
-                    }
 
-                    return null;
+                        return null;

Review Comment:
   seems we now return `null` from `thenCompose` lambda, is it okay?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -2917,34 +2881,32 @@ private CompletableFuture<CompletableFuture<?>> 
applyUpdateAllCommand(
                 }
             } else {
                 return applyCmdWithExceptionHandling(cmd, new 
CompletableFuture<>())
-                        .thenApply(res -> {
+                        .thenCompose(res -> {
                             UpdateCommandResult updateCommandResult = 
(UpdateCommandResult) res;
 
-                            if (full && 
!updateCommandResult.isPrimaryReplicaMatch()) {
-                                throw new 
PrimaryReplicaMissException(cmd.txId(), cmd.leaseStartTime(),
-                                        
updateCommandResult.currentLeaseStartTime());
+                            if (!updateCommandResult.isPrimaryReplicaMatch()) {

Review Comment:
   I have read the javadoc for `UpdateCommandResult#isPrimaryReplicaMatch` and 
it looks weird and not true, could you pls fix it? I know it's my bad, but it 
would be better to be fixed



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/UpdateCommandResult.java:
##########
@@ -32,26 +33,35 @@ public class UpdateCommandResult implements Serializable {
     @Nullable
     private final Long currentLeaseStartTime;
 
+    /** {@code true} if primary replica belongs to the raft group topology, 
(@code false) otherwise. */

Review Comment:
   I would add that it also returns true if the primary replica is included 
into learners.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -2770,37 +2748,33 @@ private CompletableFuture<CompletableFuture<?>> 
applyUpdateCommand(
 
                 applyCmdWithExceptionHandling(cmd, resultFuture);
 
-                return resultFuture.thenApply(res -> {
+                return resultFuture.thenCompose(res -> {
                     UpdateCommandResult updateCommandResult = 
(UpdateCommandResult) res;
 
                     if (full && updateCommandResult != null && 
!updateCommandResult.isPrimaryReplicaMatch()) {

Review Comment:
   Do we still need `updateCommandResult != null` check?
   And you removed `full` condition from `applyUpdateAllCommand` and left it 
here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to