>From Ritik Raj <[email protected]>: Ritik Raj has submitted this change. ( https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19443 )
Change subject: [ASTERIXDB-3563][STO] Delay activation of dataset until accessed ...................................................................... [ASTERIXDB-3563][STO] Delay activation of dataset until accessed - user model changes: no - storage format changes: no - interface changes: yes Details: activate the dataset when its being accessed in cloud mode. Ext-ref: MB-63037 Change-Id: If64a7fbd9701772ffa42761d1557223a3eea95be Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19443 Integration-Tests: Jenkins <[email protected]> Reviewed-by: Ritik Raj <[email protected]> Reviewed-by: Murtadha Hubail <[email protected]> Tested-by: Ritik Raj <[email protected]> --- M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java M hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/TransientLocalResourceRepository.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java M hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/ILocalResourceRepository.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java 9 files changed, 224 insertions(+), 18 deletions(-) Approvals: Murtadha Hubail: Looks good to me, approved Ritik Raj: Looks good to me, but someone else must approve; Verified Jenkins: Verified diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java index de4a066..007826b 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java @@ -286,8 +286,9 @@ // Must start vbc now instead of by life cycle component manager (lccm) because lccm happens after // the metadata bootstrap task ((ILifeCycleComponent) virtualBufferCache).start(); - datasetLifecycleManager = new DatasetLifecycleManager(storageProperties, localResourceRepository, - txnSubsystem.getLogManager(), virtualBufferCache, indexCheckpointManagerProvider, lockNotifier); + datasetLifecycleManager = + new DatasetLifecycleManager(ncServiceContext, storageProperties, localResourceRepository, recoveryMgr, + txnSubsystem.getLogManager(), virtualBufferCache, indexCheckpointManagerProvider, lockNotifier); localResourceRepository.setDatasetLifecycleManager(datasetLifecycleManager); final String nodeId = getServiceContext().getNodeId(); final Set<Integer> nodePartitions = metadataProperties.getNodePartitions(nodeId); diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java index 179b996..269f6f7 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java @@ -901,6 +901,16 @@ return maxDiskLastLsn; } + @Override + public boolean isLazyRecoveryEnabled() { + return false; + } + + @Override + public void recoverIndexes(List<ILSMIndex> datasetPartitionIndexes) throws HyracksDataException { + // do-nothing + } + private class JobEntityCommits { private static final String PARTITION_FILE_NAME_SEPARATOR = "_"; private final long txnId; diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java index 43b5d1b..85916b0 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java @@ -36,6 +36,7 @@ import org.apache.asterix.common.dataflow.DatasetLocalResource; import org.apache.asterix.common.dataflow.LSMIndexUtil; import org.apache.asterix.common.ioopcallbacks.LSMIOOperationCallback; +import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties; import org.apache.asterix.common.replication.IReplicationStrategy; import org.apache.asterix.common.storage.DatasetResourceReference; import org.apache.asterix.common.storage.IIndexCheckpointManager; @@ -43,12 +44,16 @@ import org.apache.asterix.common.storage.ResourceReference; import org.apache.asterix.common.storage.StorageIOStats; import org.apache.asterix.common.transactions.ILogManager; +import org.apache.asterix.common.transactions.IRecoveryManager; import org.apache.asterix.common.transactions.LogRecord; import org.apache.asterix.common.transactions.LogType; import org.apache.asterix.common.utils.StoragePathUtil; +import org.apache.hyracks.api.application.INCServiceContext; import org.apache.hyracks.api.exceptions.ErrorCode; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.io.FileReference; import org.apache.hyracks.api.lifecycle.ILifeCycleComponent; +import org.apache.hyracks.storage.am.lsm.btree.dataflow.LSMBTreeLocalResource; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker; @@ -67,22 +72,26 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeCycleComponent { private static final Logger LOGGER = LogManager.getLogger(); - private final Map<Integer, DatasetResource> datasets = new ConcurrentHashMap<>(); + protected final Map<Integer, DatasetResource> datasets = new ConcurrentHashMap<>(); private final StorageProperties storageProperties; - private final ILocalResourceRepository resourceRepository; + protected final ILocalResourceRepository resourceRepository; private final IVirtualBufferCache vbc; + protected final INCServiceContext serviceCtx; + protected final IRecoveryManager recoveryMgr; private final ILogManager logManager; private final LogRecord waitLog; - private final IDiskResourceCacheLockNotifier lockNotifier; + protected final IDiskResourceCacheLockNotifier lockNotifier; private volatile boolean stopped = false; private final IIndexCheckpointManagerProvider indexCheckpointManagerProvider; // all LSM-trees share the same virtual buffer cache list private final List<IVirtualBufferCache> vbcs; - public DatasetLifecycleManager(StorageProperties storageProperties, ILocalResourceRepository resourceRepository, - ILogManager logManager, IVirtualBufferCache vbc, - IIndexCheckpointManagerProvider indexCheckpointManagerProvider, + public DatasetLifecycleManager(INCServiceContext serviceCtx, StorageProperties storageProperties, + ILocalResourceRepository resourceRepository, IRecoveryManager recoveryMgr, ILogManager logManager, + IVirtualBufferCache vbc, IIndexCheckpointManagerProvider indexCheckpointManagerProvider, IDiskResourceCacheLockNotifier lockNotifier) { + this.serviceCtx = serviceCtx; + this.recoveryMgr = recoveryMgr; this.logManager = logManager; this.storageProperties = storageProperties; this.resourceRepository = resourceRepository; @@ -130,7 +139,7 @@ datasetResource.register(resource, (ILSMIndex) index); } - private int getDIDfromResourcePath(String resourcePath) throws HyracksDataException { + protected int getDIDfromResourcePath(String resourcePath) throws HyracksDataException { LocalResource lr = resourceRepository.get(resourcePath); if (lr == null) { return -1; @@ -138,7 +147,7 @@ return ((DatasetLocalResource) lr.getResource()).getDatasetId(); } - private long getResourceIDfromResourcePath(String resourcePath) throws HyracksDataException { + protected long getResourceIDfromResourcePath(String resourcePath) throws HyracksDataException { LocalResource lr = resourceRepository.get(resourcePath); if (lr == null) { return -1; @@ -146,6 +155,14 @@ return lr.getId(); } + private DatasetLocalResource getDatasetLocalResource(String resourcePath) throws HyracksDataException { + LocalResource lr = resourceRepository.get(resourcePath); + if (lr == null) { + return null; + } + return (DatasetLocalResource) lr.getResource(); + } + @Override public synchronized void unregister(String resourcePath) throws HyracksDataException { validateDatasetLifecycleManagerState(); @@ -193,6 +210,72 @@ @Override public synchronized void open(String resourcePath) throws HyracksDataException { validateDatasetLifecycleManagerState(); + DatasetLocalResource localResource = getDatasetLocalResource(resourcePath); + if (localResource == null) { + throw HyracksDataException.create(ErrorCode.INDEX_DOES_NOT_EXIST, resourcePath); + } + int did = getDIDfromResourcePath(resourcePath); + long resourceID = getResourceIDfromResourcePath(resourcePath); + + lockNotifier.onOpen(resourceID); + try { + DatasetResource datasetResource = datasets.get(did); + int partition = localResource.getPartition(); + if (shouldRecoverLazily(datasetResource, partition)) { + performLocalRecovery(resourcePath, datasetResource, partition); + } else { + openResource(resourcePath, false); + } + } finally { + lockNotifier.onClose(resourceID); + } + } + + private void performLocalRecovery(String resourcePath, DatasetResource datasetResource, int partition) + throws HyracksDataException { + LOGGER.debug("performing local recovery for dataset {} partition {}", datasetResource.getDatasetInfo(), + partition); + FileReference indexRootRef = StoragePathUtil.getIndexRootPath(serviceCtx.getIoManager(), resourcePath); + Map<Long, LocalResource> resources = resourceRepository.getResources(r -> true, List.of(indexRootRef)); + + List<ILSMIndex> indexes = new ArrayList<>(); + for (LocalResource resource : resources.values()) { + if (shouldSkipResource(resource)) { + continue; + } + + ILSMIndex index = getOrCreateIndex(resource); + boolean undoTouch = !resourcePath.equals(resource.getPath()); + openResource(resource.getPath(), undoTouch); + indexes.add(index); + } + + if (!indexes.isEmpty()) { + recoveryMgr.recoverIndexes(indexes); + } + + datasetResource.markRecovered(partition); + } + + private boolean shouldSkipResource(LocalResource resource) { + DatasetLocalResource lr = (DatasetLocalResource) resource.getResource(); + return MetadataIndexImmutableProperties.isMetadataDataset(lr.getDatasetId()) + || (lr.getResource() instanceof LSMBTreeLocalResource + && ((LSMBTreeLocalResource) lr.getResource()).isSecondaryNoIncrementalMaintenance()); + } + + private ILSMIndex getOrCreateIndex(LocalResource resource) throws HyracksDataException { + ILSMIndex index = get(resource.getPath()); + if (index == null) { + DatasetLocalResource lr = (DatasetLocalResource) resource.getResource(); + index = (ILSMIndex) lr.createInstance(serviceCtx); + register(resource.getPath(), index); + } + return index; + } + + private void openResource(String resourcePath, boolean undoTouch) throws HyracksDataException { + validateDatasetLifecycleManagerState(); int did = getDIDfromResourcePath(resourcePath); long resourceID = getResourceIDfromResourcePath(resourcePath); @@ -214,15 +297,36 @@ dsr.open(true); dsr.touch(); - - if (!iInfo.isOpen()) { - ILSMOperationTracker opTracker = iInfo.getIndex().getOperationTracker(); - synchronized (opTracker) { - iInfo.getIndex().activate(); + boolean indexTouched = false; + try { + if (!iInfo.isOpen()) { + ILSMOperationTracker opTracker = iInfo.getIndex().getOperationTracker(); + synchronized (opTracker) { + iInfo.getIndex().activate(); + } + iInfo.setOpen(true); } - iInfo.setOpen(true); + iInfo.touch(); + indexTouched = true; + } finally { + if (undoTouch) { + dsr.untouch(); + if (indexTouched) { + iInfo.untouch(); + } + lockNotifier.onClose(resourceID); + } } - iInfo.touch(); + } + + private boolean shouldRecoverLazily(DatasetResource resource, int partition) { + // Perform lazy recovery only if the following conditions are met: + // 1. Lazy recovery is enabled. + // 2. The resource does not belong to the Metadata dataverse. + // 3. The partition is being accessed for the first time. + return recoveryMgr.isLazyRecoveryEnabled() + && !MetadataIndexImmutableProperties.isMetadataDataset(resource.getDatasetID()) + && !resource.isRecovered(partition); } public DatasetResource getDatasetLifecycle(int did) { diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java index db9eabb..8e3081d 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java @@ -20,7 +20,9 @@ import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; +import java.util.Set; import org.apache.asterix.common.dataflow.DatasetLocalResource; import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties; @@ -48,12 +50,14 @@ private final Map<Integer, PrimaryIndexOperationTracker> datasetPrimaryOpTrackers; private final Map<Integer, ILSMComponentIdGenerator> datasetComponentIdGenerators; private final Map<Integer, IRateLimiter> datasetRateLimiters; + private final Set<Integer> recoveredPartitions; public DatasetResource(DatasetInfo datasetInfo) { this.datasetInfo = datasetInfo; this.datasetPrimaryOpTrackers = new HashMap<>(); this.datasetComponentIdGenerators = new HashMap<>(); this.datasetRateLimiters = new HashMap<>(); + this.recoveredPartitions = new HashSet<>(); } public boolean isRegistered() { @@ -127,6 +131,10 @@ return datasetComponentIdGenerators.get(partition); } + public boolean isRecovered(int partitionId) { + return recoveredPartitions.contains(partitionId); + } + public IRateLimiter getRateLimiter(int partition) { return datasetRateLimiters.get(partition); } @@ -139,6 +147,14 @@ datasetPrimaryOpTrackers.put(partition, opTracker); } + public void markRecovered(int partition) { + if (recoveredPartitions.contains(partition)) { + throw new IllegalStateException( + "Index has already been recovered for dataset" + getDatasetID() + "partition " + partition); + } + recoveredPartitions.add(partition); + } + public void setIdGenerator(int partition, ILSMComponentIdGenerator idGenerator) { if (datasetComponentIdGenerators.containsKey(partition)) { throw new IllegalStateException("LSMComponentIdGenerator has already been set for partition " + partition); @@ -187,5 +203,6 @@ datasetPrimaryOpTrackers.remove(partitionId); datasetComponentIdGenerators.remove(partitionId); datasetRateLimiters.remove(partitionId); + recoveredPartitions.remove(partitionId); } } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java index a5f79ac..3437d42 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java @@ -20,10 +20,12 @@ import java.io.File; import java.io.IOException; +import java.util.List; import java.util.Set; import org.apache.asterix.common.exceptions.ACIDException; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; /** * Provides API for failure recovery. Failure could be at application level and @@ -128,4 +130,19 @@ */ void replayReplicaPartitionLogs(Set<Integer> partitions, boolean flush) throws HyracksDataException; + /** + * Ensures that {@code datasetPartitionIndexes} are consistent by performing component id level recovery + * + * @param datasetPartitionIndexes A list of indexes associated with a specific + * dataset partition that require recovery. + * @throws HyracksDataException If an error occurs during the recovery or rollback + * process, indicating a failure to achieve consistency. + */ + void recoverIndexes(List<ILSMIndex> datasetPartitionIndexes) throws HyracksDataException; + + /** + * determines if the indexes need to be recovered lazily at the time of their first access + * @return + */ + boolean isLazyRecoveryEnabled(); } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java index 28fd27e..2fdfba4 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java @@ -139,6 +139,13 @@ return ResourceReference.of(fileAbsolutePath).getFileRelativePath().toString(); } + public static FileReference getIndexRootPath(IIOManager ioManager, String relativePath) + throws HyracksDataException { + int separatorIndex = relativePath.lastIndexOf(File.separatorChar); + String parentDirectory = relativePath.substring(0, separatorIndex); + return ioManager.resolve(parentDirectory); + } + /** * Create a file * Note: this method is not thread safe. It is the responsibility of the caller to ensure no path conflict when @@ -229,7 +236,17 @@ } public static boolean isRelativeParent(FileReference parent, FileReference child) { - return child.getRelativePath().startsWith(parent.getRelativePath()); + String childPath = child.getRelativePath(); + String parentPath = parent.getRelativePath(); + boolean isMatch = childPath.startsWith(parentPath); + if (isMatch) { + int parentPathLength = parentPath.length(); + if (childPath.length() == parentPathLength) { + return true; + } + return childPath.charAt(parentPathLength) == File.separatorChar; + } + return false; } public static String getNamespacePath(INamespacePathResolver nsPathResolver, Namespace namespace, int partition) { diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java index 5c7d5ac..cb4e068 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java @@ -251,6 +251,7 @@ return ioManager.resolve(fileName); } + @Override public Map<Long, LocalResource> getResources(Predicate<LocalResource> filter, List<FileReference> roots) throws HyracksDataException { beforeReadAccess(); diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/ILocalResourceRepository.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/ILocalResourceRepository.java index 491d476..46eb1d5 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/ILocalResourceRepository.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/ILocalResourceRepository.java @@ -18,7 +18,12 @@ */ package org.apache.hyracks.storage.common; +import java.util.List; +import java.util.Map; +import java.util.function.Predicate; + import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.io.FileReference; public interface ILocalResourceRepository { @@ -28,5 +33,8 @@ void delete(String name) throws HyracksDataException; + Map<Long, LocalResource> getResources(Predicate<LocalResource> filter, List<FileReference> resourceFolderRoot) + throws HyracksDataException; + long maxId() throws HyracksDataException; } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/TransientLocalResourceRepository.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/TransientLocalResourceRepository.java index 2e756ea..5f2ccc6 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/TransientLocalResourceRepository.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/TransientLocalResourceRepository.java @@ -19,9 +19,12 @@ package org.apache.hyracks.storage.common; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.function.Predicate; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.io.FileReference; public class TransientLocalResourceRepository implements ILocalResourceRepository { @@ -55,6 +58,12 @@ } @Override + public Map<Long, LocalResource> getResources(Predicate<LocalResource> filter, List<FileReference> roots) + throws HyracksDataException { + return Map.of(); + } + + @Override public long maxId() throws HyracksDataException { long maxResourceId = 0; -- To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19443 To unsubscribe, or for help writing mail filters, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-Project: asterixdb Gerrit-Branch: ionic Gerrit-Change-Id: If64a7fbd9701772ffa42761d1557223a3eea95be Gerrit-Change-Number: 19443 Gerrit-PatchSet: 8 Gerrit-Owner: Ritik Raj <[email protected]> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Murtadha Hubail <[email protected]> Gerrit-Reviewer: Ritik Raj <[email protected]> Gerrit-MessageType: merged
