Murtadha Hubail has submitted this change and it was merged. Change subject: [NO ISSUE][REPL] Ensure Valid Component ID is Initialized On Replica Sync ......................................................................
[NO ISSUE][REPL] Ensure Valid Component ID is Initialized On Replica Sync - user model changes: no - storage format changes: no - interface changes: yes Details: - Currently, the first time a replica is synchronized from master, the valid component id on each replicated index's initial checkpoint will be the initial value of a component id (-1). This value is fixed when the the replica receives a flushed component from the index. However, if the master fails before any component is flushed to a replica and that replica is promoted to master, it will start from an invalid component id. This change ensures that the initial checkpoint of replicated indexes is initialized to the maximum component id that appears on master. This will ensure that if the replica is promoted, it will at least start from a component that wasn't previously used on master. - Replace assertion of component ids validation by illegal state. Change-Id: I85395ad823a630725c4cab4bead1c61546dc61ae Reviewed-on: https://asterix-gerrit.ics.uci.edu/2973 Sonar-Qube: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Reviewed-by: Michael Blow <[email protected]> Integration-Tests: Jenkins <[email protected]> --- M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManager.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java 8 files changed, 51 insertions(+), 18 deletions(-) Approvals: Anon. E. Moose #1000171: Jenkins: Verified; No violations found; Verified Michael Blow: Looks good to me, approved diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java index 7b08bad..420585a 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java @@ -55,7 +55,8 @@ } @Override - public synchronized void init(long validComponentSequence, long lsn) throws HyracksDataException { + public synchronized void init(long validComponentSequence, long lsn, long validComponentId) + throws HyracksDataException { List<IndexCheckpoint> checkpoints; try { checkpoints = getCheckpoints(); @@ -66,7 +67,7 @@ LOGGER.warn(() -> "Checkpoints found on initializing: " + indexPath); delete(); } - IndexCheckpoint firstCheckpoint = IndexCheckpoint.first(validComponentSequence, lsn); + IndexCheckpoint firstCheckpoint = IndexCheckpoint.first(validComponentSequence, lsn, validComponentId); persist(firstCheckpoint); } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManager.java index dd9ede5..2f0eddf 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManager.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManager.java @@ -29,9 +29,10 @@ * * @param validComponentSequence * @param lsn + * @param validComponentId * @throws HyracksDataException */ - void init(long validComponentSequence, long lsn) throws HyracksDataException; + void init(long validComponentSequence, long lsn, long validComponentId) throws HyracksDataException; /** * Called when a new LSM disk component is flushed. When called, the index checkpoint is updated diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java index 9654473..cb34600 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java @@ -23,7 +23,6 @@ import java.util.Map; import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -42,12 +41,12 @@ private long lastComponentId; private Map<Long, Long> masterNodeFlushMap; - public static IndexCheckpoint first(long lastComponentSequence, long lowWatermark) { + public static IndexCheckpoint first(long lastComponentSequence, long lowWatermark, long validComponentId) { IndexCheckpoint firstCheckpoint = new IndexCheckpoint(); firstCheckpoint.id = INITIAL_CHECKPOINT_ID; firstCheckpoint.lowWatermark = lowWatermark; firstCheckpoint.validComponentSequence = lastComponentSequence; - firstCheckpoint.lastComponentId = LSMComponentId.EMPTY_INDEX_LAST_COMPONENT_ID.getMaxId(); + firstCheckpoint.lastComponentId = validComponentId; firstCheckpoint.masterNodeFlushMap = new HashMap<>(); return firstCheckpoint; } diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java index 448613b..e778cce 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java @@ -45,9 +45,11 @@ public class CheckpointPartitionIndexesTask implements IReplicaTask { private final int partition; + private final long maxComponentId; - public CheckpointPartitionIndexesTask(int partition) { + public CheckpointPartitionIndexesTask(int partition, long maxComponentId) { this.partition = partition; + this.maxComponentId = maxComponentId; } @Override @@ -75,7 +77,7 @@ maxComponentSequence = Math.max(maxComponentSequence, IndexComponentFileReference.of(file).getSequenceEnd()); } - indexCheckpointManager.init(maxComponentSequence, currentLSN); + indexCheckpointManager.init(maxComponentSequence, currentLSN, maxComponentId); } ReplicationProtocol.sendAck(worker.getChannel(), worker.getReusableBuffer()); } @@ -90,6 +92,7 @@ try { DataOutputStream dos = new DataOutputStream(out); dos.writeInt(partition); + dos.writeLong(maxComponentId); } catch (IOException e) { throw HyracksDataException.create(e); } @@ -98,7 +101,8 @@ public static CheckpointPartitionIndexesTask create(DataInput input) throws HyracksDataException { try { int partition = input.readInt(); - return new CheckpointPartitionIndexesTask(partition); + long maxComponentId = input.readLong(); + return new CheckpointPartitionIndexesTask(partition, maxComponentId); } catch (IOException e) { throw HyracksDataException.create(e); } diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java index f53d448..ae36c13 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java @@ -40,6 +40,7 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.io.FileReference; import org.apache.hyracks.api.io.IIOManager; +import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -98,7 +99,8 @@ final IIndexCheckpointManager indexCheckpointManager = checkpointManagerProvider.get(indexRef); final long currentLSN = appCtx.getTransactionSubsystem().getLogManager().getAppendLSN(); indexCheckpointManager.delete(); - indexCheckpointManager.init(Long.MIN_VALUE, currentLSN); + indexCheckpointManager.init(Long.MIN_VALUE, currentLSN, + LSMComponentId.EMPTY_INDEX_LAST_COMPONENT_ID.getMaxId()); LOGGER.info(() -> "Checkpoint index: " + indexRef); } diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java index ef85977..09f1205 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java @@ -25,6 +25,8 @@ import org.apache.asterix.replication.api.PartitionReplica; import org.apache.asterix.replication.messaging.CheckpointPartitionIndexesTask; import org.apache.asterix.replication.messaging.ReplicationProtocol; +import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository; +import org.apache.hyracks.api.exceptions.HyracksDataException; /** * Performs the steps required to ensure any newly added replica @@ -60,9 +62,17 @@ } private void checkpointReplicaIndexes() throws IOException { + final int partition = replica.getIdentifier().getPartition(); CheckpointPartitionIndexesTask task = - new CheckpointPartitionIndexesTask(replica.getIdentifier().getPartition()); + new CheckpointPartitionIndexesTask(partition, getPartitionMaxComponentId(partition)); ReplicationProtocol.sendTo(replica, task); ReplicationProtocol.waitForAck(replica); } + + private long getPartitionMaxComponentId(int partition) throws HyracksDataException { + final IReplicationStrategy replStrategy = appCtx.getReplicationManager().getReplicationStrategy(); + final PersistentLocalResourceRepository localResourceRepository = + (PersistentLocalResourceRepository) appCtx.getLocalResourceRepository(); + return localResourceRepository.getReplicatedIndexesMaxComponentId(partition, replStrategy); + } } diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java index c0da095..8f870c0 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java @@ -66,8 +66,8 @@ import org.apache.hyracks.api.replication.IReplicationJob.ReplicationOperation; import org.apache.hyracks.api.util.IoUtil; import org.apache.hyracks.storage.am.common.api.ITreeIndexFrame; -import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager; import org.apache.hyracks.storage.am.lsm.common.impls.IndexComponentFileReference; +import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId; import org.apache.hyracks.storage.common.ILocalResourceRepository; import org.apache.hyracks.storage.common.LocalResource; import org.apache.hyracks.util.ExitUtil; @@ -196,7 +196,8 @@ byte[] bytes = OBJECT_MAPPER.writeValueAsBytes(resource.toJson(persistedResourceRegistry)); final Path path = Paths.get(resourceFile.getAbsolutePath()); Files.write(path, bytes); - indexCheckpointManagerProvider.get(DatasetResourceReference.of(resource)).init(Long.MIN_VALUE, 0); + indexCheckpointManagerProvider.get(DatasetResourceReference.of(resource)).init(Long.MIN_VALUE, 0, + LSMComponentId.EMPTY_INDEX_LAST_COMPONENT_ID.getMaxId()); deleteResourceFileMask(resourceFile); } catch (Exception e) { cleanup(resourceFile); @@ -393,6 +394,21 @@ return partitionReplicatedFiles; } + public long getReplicatedIndexesMaxComponentId(int partition, IReplicationStrategy strategy) + throws HyracksDataException { + long maxComponentId = LSMComponentId.MIN_VALID_COMPONENT_ID; + final Map<Long, LocalResource> partitionResources = getPartitionResources(partition); + for (LocalResource lr : partitionResources.values()) { + DatasetLocalResource datasetLocalResource = (DatasetLocalResource) lr.getResource(); + if (strategy.isMatch(datasetLocalResource.getDatasetId())) { + final IIndexCheckpointManager indexCheckpointManager = + indexCheckpointManagerProvider.get(DatasetResourceReference.of(lr)); + maxComponentId = Math.max(maxComponentId, indexCheckpointManager.getLatest().getLastComponentId()); + } + } + return maxComponentId; + } + private List<String> getIndexFiles(File indexDir) { final List<String> indexFiles = new ArrayList<>(); if (indexDir.isDirectory()) { diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java index 3d928a4..9199fbb 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java @@ -577,7 +577,7 @@ if (c != EmptyComponent.INSTANCE) { diskComponents.add(0, c); } - assert checkComponentIds(); + validateComponentIds(); } @Override @@ -588,7 +588,7 @@ if (newComponent != EmptyComponent.INSTANCE) { diskComponents.add(swapIndex, newComponent); } - assert checkComponentIds(); + validateComponentIds(); } /** @@ -597,16 +597,16 @@ * * @throws HyracksDataException */ - private boolean checkComponentIds() throws HyracksDataException { + private void validateComponentIds() throws HyracksDataException { for (int i = 0; i < diskComponents.size() - 1; i++) { ILSMComponentId id1 = diskComponents.get(i).getId(); ILSMComponentId id2 = diskComponents.get(i + 1).getId(); IdCompareResult cmp = id1.compareTo(id2); if (cmp != IdCompareResult.UNKNOWN && cmp != IdCompareResult.GREATER_THAN) { - return false; + throw new IllegalStateException( + "found non-decreasing component ids (" + id1 + " -> " + id2 + ") on index " + this); } } - return true; } @Override -- To view, visit https://asterix-gerrit.ics.uci.edu/2973 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: I85395ad823a630725c4cab4bead1c61546dc61ae Gerrit-PatchSet: 3 Gerrit-Project: asterixdb Gerrit-Branch: stabilization-f69489 Gerrit-Owner: Murtadha Hubail <[email protected]> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Michael Blow <[email protected]> Gerrit-Reviewer: Murtadha Hubail <[email protected]>
