denis-chudov commented on code in PR #4049:
URL: https://github.com/apache/ignite-3/pull/4049#discussion_r1724588128
##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/StoragePartitionMetaIo.java:
##########
@@ -50,6 +50,10 @@ public class StoragePartitionMetaIo extends PartitionMetaIo {
private static final int ESTIMATED_SIZE_OFF = LEASE_START_TIME_OFF +
Long.BYTES;
+ private static final int PRIMARY_REPLICA_NODE_ID_FIRST_PAGE_ID_OFF =
ESTIMATED_SIZE_OFF + Long.BYTES;
Review Comment:
As I understand, ESTIMATED_SIZE_OFF is the total estimated size of the meta,
so it should be based on the offset of the last value in the page.
PRIMARY_REPLICA_NODE_ID_FIRST_PAGE_ID_OFF in this case should be calculated
from LEASE_START_TIME_OFF
##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java:
##########
@@ -303,11 +318,50 @@ public void committedGroupConfiguration(byte[] config) {
}
@Override
- public void updateLease(long leaseStartTime) {
+ public void updateLease(
+ long leaseStartTime,
+ String primaryReplicaNodeId,
+ String primaryReplicaNodeName
+ ) {
busy(() -> {
throwExceptionIfStorageNotInRunnableState();
- updateMeta((lastCheckpointId, meta) ->
meta.updateLease(lastCheckpointId, leaseStartTime));
+ updateMeta((lastCheckpointId, meta) -> {
+ primaryReplicaMetaReadWriteLock.writeLock().lock();
+ try {
+ if (leaseStartTime <= meta.leaseStartTime()) {
+ return;
+ }
+
+ if (meta.primaryReplicaNodeIdFirstPageId() ==
BlobStorage.NO_PAGE_ID) {
+ long primaryReplicaNodeIdFirstPageId =
blobStorage.addBlob(stringToBytes(primaryReplicaNodeId));
+
+ meta.primaryReplicaNodeIdFirstPageId(lastCheckpointId,
primaryReplicaNodeIdFirstPageId);
+ } else {
+
blobStorage.updateBlob(meta.primaryReplicaNodeIdFirstPageId(),
stringToBytes(primaryReplicaNodeId));
+ }
Review Comment:
Maybe it would be better to add method like `upsertBlob`
##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java:
##########
@@ -1032,14 +1053,29 @@ public void updateLease(long leaseStartTime) {
AbstractWriteBatch writeBatch =
PartitionDataHelper.requireWriteBatch();
try {
- byte[] leaseBytes = new byte[Long.BYTES];
+ ByteArrayOutputStream outputStream = new
ByteArrayOutputStream();
+ outputStream.write(longToBytes(leaseStartTime));
+
+ byte[] primaryReplicaNodeIdBytes =
stringToBytes(primaryReplicaNodeId);
+ assert primaryReplicaNodeIdBytes.length < Byte.MAX_VALUE :
+ format("Primary replica node id bytes length exceeds
the limit [length={}].",
+ primaryReplicaNodeIdBytes.length);
+ outputStream.write((byte) primaryReplicaNodeIdBytes.length);
+ outputStream.write(primaryReplicaNodeIdBytes);
- putLongToBytes(leaseStartTime, leaseBytes, 0);
+ byte[] primaryReplicaNodeNameBytes =
stringToBytes(primaryReplicaNodeName);
+ assert primaryReplicaNodeNameBytes.length < Byte.MAX_VALUE;
Review Comment:
looks like newly introduced limitation, should it be documented?
Now I can successfully create a cluster of nodes having 127 characters in
their names
##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java:
##########
@@ -322,6 +376,72 @@ public long leaseStartTime() {
});
}
+ @Override
+ public @Nullable String primaryReplicaNodeId() {
+ return busy(() -> {
+ throwExceptionIfStorageNotInRunnableState();
+
+ try {
+ primaryReplicaMetaReadWriteLock.readLock().lock();
+
+ try {
+ if (primaryReplicaNodeId == null) {
+ long primaryReplicaNodeIdFirstPageId =
meta.primaryReplicaNodeIdFirstPageId();
+
+ // It's possible to face BlobStorage.NO_PAGE_ID if a
lease information has not yet been recorded in storage,
+ // for example, if the lease itself has not yet been
elected.
+ if (primaryReplicaNodeIdFirstPageId !=
BlobStorage.NO_PAGE_ID) {
+ primaryReplicaNodeId =
ByteUtils.stringFromBytes(blobStorage.readBlob(primaryReplicaNodeIdFirstPageId));
+ }
+ }
+
+ return primaryReplicaNodeId;
+ } finally {
+ primaryReplicaMetaReadWriteLock.readLock().unlock();
+ }
+ } catch (IgniteInternalCheckedException e) {
+ throw new StorageException(
+ "Failed to read primary replica node id: [tableId={},
partitionId={}]",
+ e,
+ tableStorage.getTableId(), partitionId
+ );
+ }
+ });
+ }
+
+ @Override
+ public @Nullable String primaryReplicaNodeName() {
+ return busy(() -> {
+ throwExceptionIfStorageNotInRunnableState();
+
+ try {
+ primaryReplicaMetaReadWriteLock.readLock().lock();
+
+ try {
+ if (primaryReplicaNodeName == null) {
+ long primaryReplicaNodeNameFirstPageId =
meta.primaryReplicaNodeNameFirstPageId();
+
+ // It's possible to face BlobStorage.NO_PAGE_ID if a
lease information has not yet been recorded in storage,
+ // for example, if the lease itself has not yet been
elected.
+ if (primaryReplicaNodeNameFirstPageId !=
BlobStorage.NO_PAGE_ID) {
+ primaryReplicaNodeName =
ByteUtils.stringFromBytes(blobStorage.readBlob(primaryReplicaNodeNameFirstPageId));
+ }
+ }
+
+ return primaryReplicaNodeName;
+ } finally {
+ primaryReplicaMetaReadWriteLock.readLock().unlock();
+ }
+ } catch (IgniteInternalCheckedException e) {
+ throw new StorageException(
+ "Failed to read primary replica node id: [tableId={},
partitionId={}]",
Review Comment:
Node name should be here.
Did you consider the option of writing a wrapper method for this logic of
reading string from storage? These two methods are almost the same
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java:
##########
@@ -289,38 +295,40 @@ private UpdateCommandResult
handleUpdateCommand(UpdateCommand cmd, long commandI
long storageLeaseStartTime = storage.leaseStartTime();
if (leaseStartTime != storageLeaseStartTime) {
- return new UpdateCommandResult(false, storageLeaseStartTime);
+ return new UpdateCommandResult(
+ false,
+ storageLeaseStartTime,
+ isPrimaryInGroupTopology()
+ );
}
}
UUID txId = cmd.txId();
- // TODO: https://issues.apache.org/jira/browse/IGNITE-20124 Proper
storage/raft index handling is required.
- synchronized (safeTime) {
- if (cmd.safeTime().compareTo(safeTime.current()) > 0) {
- storageUpdateHandler.handleUpdate(
- txId,
- cmd.rowUuid(),
- cmd.tablePartitionId().asTablePartitionId(),
- cmd.rowToUpdate(),
- !cmd.full(),
- () -> storage.lastApplied(commandIndex, commandTerm),
- cmd.full() ? cmd.safeTime() : null,
- cmd.lastCommitTimestamp(),
- indexIdsAtRwTxBeginTs(catalogService, txId,
storage.tableId())
- );
-
- updateTrackerIgnoringTrackerClosedException(safeTime,
cmd.safeTime());
- } else {
- // We MUST bump information about last updated index+term.
- // See a comment in #onWrite() for explanation.
- advanceLastAppliedIndexConsistently(commandIndex, commandTerm);
- }
+ assert storage.primaryReplicaNodeId() != null;
+ assert localNodeId != null;
+
+ if (cmd.full() || (!cmd.full() &&
!localNodeId.equals(storage.primaryReplicaNodeId()))) {
+ storageUpdateHandler.handleUpdate(
+ txId,
+ cmd.rowUuid(),
+ cmd.tablePartitionId().asTablePartitionId(),
+ cmd.rowToUpdate(),
+ !cmd.full(),
+ () -> storage.lastApplied(commandIndex, commandTerm),
+ cmd.full() ? cmd.safeTime() : null,
+ cmd.lastCommitTimestamp(),
+ indexIdsAtRwTxBeginTs(catalogService, txId,
storage.tableId())
+ );
+ } else {
+ // We MUST bump information about last updated index+term.
+ // See a comment in #onWrite() for explanation.
Review Comment:
Please also add the explanation here: if we are in this branch, then the
local replica is primary and its storage is already updated before replication
command.
Now it can look extremely tricky for anyone who is not in context of tx
protocol
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -2770,37 +2748,33 @@ private CompletableFuture<CompletableFuture<?>>
applyUpdateCommand(
applyCmdWithExceptionHandling(cmd, resultFuture);
- return resultFuture.thenApply(res -> {
+ return resultFuture.thenCompose(res -> {
UpdateCommandResult updateCommandResult =
(UpdateCommandResult) res;
if (full && updateCommandResult != null &&
!updateCommandResult.isPrimaryReplicaMatch()) {
throw new PrimaryReplicaMissException(txId,
cmd.leaseStartTime(), updateCommandResult.currentLeaseStartTime());
}
- // TODO:
https://issues.apache.org/jira/browse/IGNITE-20124 Temporary code below
- // Try to avoid double write if an entry is already
replicated.
- synchronized (safeTime) {
- if (cmd.safeTime().compareTo(safeTime.current()) > 0) {
- if
(!IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_SKIP_STORAGE_UPDATE_IN_BENCHMARK))
{
- // We don't need to take the partition
snapshots read lock, see #INTERNAL_DOC_PLACEHOLDER why.
- storageUpdateHandler.handleUpdate(
- cmd.txId(),
- cmd.rowUuid(),
-
cmd.tablePartitionId().asTablePartitionId(),
- cmd.rowToUpdate(),
- false,
- null,
- cmd.safeTime(),
- null,
- indexIdsAtRwTxBeginTs(txId)
- );
- }
-
-
updateTrackerIgnoringTrackerClosedException(safeTime, cmd.safeTime());
+ if (updateCommandResult.isPrimaryInPeersAndLearners()) {
+ return
safeTime.waitFor(cmd.safeTime()).thenApply(ignored -> null);
+ } else {
+ if
(!IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_SKIP_STORAGE_UPDATE_IN_BENCHMARK))
{
+ // We don't need to take the partition snapshots
read lock, see #INTERNAL_DOC_PLACEHOLDER why.
+ storageUpdateHandler.handleUpdate(
+ cmd.txId(),
+ cmd.rowUuid(),
+
cmd.tablePartitionId().asTablePartitionId(),
+ cmd.rowToUpdate(),
+ false,
+ null,
+ cmd.safeTime(),
+ null,
+ indexIdsAtRwTxBeginTs(txId)
+ );
}
- }
- return null;
+ return null;
Review Comment:
seems we now return `null` from `thenCompose` lambda, is it okay?
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -2917,34 +2881,32 @@ private CompletableFuture<CompletableFuture<?>>
applyUpdateAllCommand(
}
} else {
return applyCmdWithExceptionHandling(cmd, new
CompletableFuture<>())
- .thenApply(res -> {
+ .thenCompose(res -> {
UpdateCommandResult updateCommandResult =
(UpdateCommandResult) res;
- if (full &&
!updateCommandResult.isPrimaryReplicaMatch()) {
- throw new
PrimaryReplicaMissException(cmd.txId(), cmd.leaseStartTime(),
-
updateCommandResult.currentLeaseStartTime());
+ if (!updateCommandResult.isPrimaryReplicaMatch()) {
Review Comment:
I have read the javadoc for `UpdateCommandResult#isPrimaryReplicaMatch` and
it looks weird and not true, could you pls fix it? I know it's my bad, but it
would be better to be fixed
##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/UpdateCommandResult.java:
##########
@@ -32,26 +33,35 @@ public class UpdateCommandResult implements Serializable {
@Nullable
private final Long currentLeaseStartTime;
+ /** {@code true} if primary replica belongs to the raft group topology,
(@code false) otherwise. */
Review Comment:
I would add that it also returns true if the primary replica is included
into learners.
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -2770,37 +2748,33 @@ private CompletableFuture<CompletableFuture<?>>
applyUpdateCommand(
applyCmdWithExceptionHandling(cmd, resultFuture);
- return resultFuture.thenApply(res -> {
+ return resultFuture.thenCompose(res -> {
UpdateCommandResult updateCommandResult =
(UpdateCommandResult) res;
if (full && updateCommandResult != null &&
!updateCommandResult.isPrimaryReplicaMatch()) {
Review Comment:
Do we still need `updateCommandResult != null` check?
And you removed `full` condition from `applyUpdateAllCommand` and left it
here
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]