This is an automated email from the ASF dual-hosted git repository. mhubail pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
The following commit(s) were added to refs/heads/master by this push: new 8e51145806 [NO ISSUE][STO] Allow concurrent modification on persisted resources 8e51145806 is described below commit 8e51145806fb54ea8536487b484dd05049462f8d Author: Murtadha Hubail <mhub...@apache.org> AuthorDate: Fri Sep 29 22:09:59 2023 +0300 [NO ISSUE][STO] Allow concurrent modification on persisted resources - user model changes: no - storage format changes: no - interface changes: no Details: - To allow index's resources to be created on different partitions concurrently, replace the synchronization by read/write lock in PersistentLocalResourceRepository. - Any operation that might modify the persisted files will acquire a read lock. - Any operation that attempts to read the persisted files will acquire a write lock to wait for any on-going modifications. Change-Id: Id435bfc113a0b8e3e2a1f75712f0ded74ae0ee6f Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17824 Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Reviewed-by: Ali Alsuliman <ali.al.solai...@gmail.com> --- .../PersistentLocalResourceRepository.java | 502 +++++++++++++-------- .../storage/am/common/build/IndexBuilder.java | 82 ++-- 2 files changed, 355 insertions(+), 229 deletions(-) diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java index 1eef182c06..33cbf2dc04 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java @@ -39,6 +39,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -83,7 +84,6 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito private static final Logger LOGGER = LogManager.getLogger(); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - private static final String METADATA_FILE_MASK_NAME = StorageConstants.MASK_FILE_PREFIX + StorageConstants.METADATA_FILE_NAME; private static final FilenameFilter LSM_INDEX_FILES_FILTER = @@ -94,7 +94,6 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito (dir, name) -> name.equals(StorageConstants.METADATA_FILE_NAME); private static final FilenameFilter METADATA_MASK_FILES_FILTER = (dir, name) -> name.equals(METADATA_FILE_MASK_NAME); - private static final int MAX_CACHED_RESOURCES = 1000; // Finals @@ -102,11 +101,11 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito private final Cache<String, LocalResource> resourceCache; // Mutables private boolean isReplicationEnabled = false; - private Set<String> filesToBeReplicated; private IReplicationManager replicationManager; private final List<FileReference> storageRoots; private final IIndexCheckpointManagerProvider indexCheckpointManagerProvider; private final IPersistedResourceRegistry persistedResourceRegistry; + private final ReentrantReadWriteLock resourcesAccessLock = new ReentrantReadWriteLock(true); public PersistentLocalResourceRepository(IIOManager ioManager, IIndexCheckpointManagerProvider indexCheckpointManagerProvider, @@ -135,23 +134,29 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito } @Override - public synchronized LocalResource get(String relativePath) throws HyracksDataException { - LocalResource resource = resourceCache.getIfPresent(relativePath); - if (resource == null) { - FileReference resourceFile = getLocalResourceFileByName(ioManager, relativePath); - resource = readLocalResource(resourceFile); - if (resource != null) { - resourceCache.put(relativePath, resource); + public LocalResource get(String relativePath) throws HyracksDataException { + beforeReadAccess(); + try { + LocalResource resource = resourceCache.getIfPresent(relativePath); + if (resource == null) { + FileReference resourceFile = getLocalResourceFileByName(ioManager, relativePath); + resource = readLocalResource(resourceFile); + if (resource != null) { + resourceCache.put(relativePath, resource); + } } + return resource; + } finally { + afterReadAccess(); } - return resource; } @SuppressWarnings("squid:S1181") @Override public void insert(LocalResource resource) throws HyracksDataException { FileReference resourceFile; - synchronized (this) { + beforeWriteAccess(); + try { String relativePath = getFileName(resource.getPath()); resourceFile = ioManager.resolve(relativePath); if (resourceFile.getFile().exists()) { @@ -178,6 +183,8 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito ExitUtil.halt(ExitUtil.EC_ERROR_CREATING_RESOURCES); } resourceCache.put(resource.getPath(), resource); + } finally { + afterWriteAccess(); } // do not do the replication operation on the synchronized to avoid blocking other threads // on network operations @@ -216,7 +223,8 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito LOGGER.error("failed to delete resource file {} from replicas", resourceFile); } } - synchronized (this) { + beforeWriteAccess(); + try { try { if (resourceExists) { ioManager.delete(resourceFile); @@ -229,6 +237,8 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito } finally { invalidateResource(relativePath); } + } finally { + afterWriteAccess(); } } @@ -238,72 +248,101 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito return ioManager.resolve(fileName); } - public synchronized Map<Long, LocalResource> getResources(Predicate<LocalResource> filter, - List<FileReference> roots) throws HyracksDataException { - Map<Long, LocalResource> resourcesMap = new HashMap<>(); - for (FileReference root : roots) { - final Collection<FileReference> files = ioManager.list(root, METADATA_FILES_FILTER); - try { - for (FileReference file : files) { - final LocalResource localResource = readLocalResource(file); - if (localResource != null && filter.test(localResource)) { - LocalResource duplicate = resourcesMap.putIfAbsent(localResource.getId(), localResource); - if (duplicate != null) { - LOGGER.warn("found duplicate resource ids {} and {}", localResource, duplicate); + public Map<Long, LocalResource> getResources(Predicate<LocalResource> filter, List<FileReference> roots) + throws HyracksDataException { + beforeReadAccess(); + try { + Map<Long, LocalResource> resourcesMap = new HashMap<>(); + for (FileReference root : roots) { + final Collection<FileReference> files = ioManager.list(root, METADATA_FILES_FILTER); + try { + for (FileReference file : files) { + final LocalResource localResource = readLocalResource(file); + if (localResource != null && filter.test(localResource)) { + LocalResource duplicate = resourcesMap.putIfAbsent(localResource.getId(), localResource); + if (duplicate != null) { + LOGGER.warn("found duplicate resource ids {} and {}", localResource, duplicate); + } } } + } catch (IOException e) { + throw HyracksDataException.create(e); } - } catch (IOException e) { - throw HyracksDataException.create(e); } + return resourcesMap; + } finally { + afterReadAccess(); } - return resourcesMap; } - public synchronized Map<Long, LocalResource> getResources(Predicate<LocalResource> filter) - throws HyracksDataException { - return getResources(filter, storageRoots); + public Map<Long, LocalResource> getResources(Predicate<LocalResource> filter) throws HyracksDataException { + beforeReadAccess(); + try { + return getResources(filter, storageRoots); + } finally { + afterReadAccess(); + } } - public synchronized Map<Long, LocalResource> getResources(Predicate<LocalResource> filter, Set<Integer> partitions) + public Map<Long, LocalResource> getResources(Predicate<LocalResource> filter, Set<Integer> partitions) throws HyracksDataException { - List<FileReference> partitionsRoots = new ArrayList<>(); - for (Integer partition : partitions) { - partitionsRoots.add(getPartitionRoot(partition)); + beforeReadAccess(); + try { + List<FileReference> partitionsRoots = new ArrayList<>(); + for (Integer partition : partitions) { + partitionsRoots.add(getPartitionRoot(partition)); + } + return getResources(filter, partitionsRoots); + } finally { + afterReadAccess(); } - return getResources(filter, partitionsRoots); } - public synchronized void deleteInvalidIndexes(Predicate<LocalResource> filter) throws HyracksDataException { - IIOBulkOperation bulkDelete = ioManager.createDeleteBulkOperation(); - for (FileReference root : storageRoots) { - final Collection<FileReference> files = ioManager.list(root, METADATA_FILES_FILTER); - try { - for (FileReference file : files) { - final LocalResource localResource = readLocalResource(file); - if (localResource != null && filter.test(localResource)) { - FileReference parent = file.getParent(); - LOGGER.warn("deleting invalid metadata index {}", parent); - bulkDelete.add(parent); + public void deleteInvalidIndexes(Predicate<LocalResource> filter) throws HyracksDataException { + beforeReadAccess(); + try { + IIOBulkOperation bulkDelete = ioManager.createDeleteBulkOperation(); + for (FileReference root : storageRoots) { + final Collection<FileReference> files = ioManager.list(root, METADATA_FILES_FILTER); + try { + for (FileReference file : files) { + final LocalResource localResource = readLocalResource(file); + if (localResource != null && filter.test(localResource)) { + FileReference parent = file.getParent(); + LOGGER.warn("deleting invalid metadata index {}", parent); + bulkDelete.add(parent); + } } + } catch (IOException e) { + throw HyracksDataException.create(e); } - } catch (IOException e) { - throw HyracksDataException.create(e); } + ioManager.performBulkOperation(bulkDelete); + resourceCache.invalidateAll(); + } finally { + afterReadAccess(); } - ioManager.performBulkOperation(bulkDelete); - resourceCache.invalidateAll(); } public Map<Long, LocalResource> loadAndGetAllResources() throws HyracksDataException { - return getResources(p -> true); + beforeReadAccess(); + try { + return getResources(p -> true); + } finally { + afterReadAccess(); + } } @Override - public synchronized long maxId() throws HyracksDataException { - final Map<Long, LocalResource> allResources = loadAndGetAllResources(); - final Optional<Long> max = allResources.keySet().stream().max(Long::compare); - return max.isPresent() ? max.get() : 0; + public long maxId() throws HyracksDataException { + beforeReadAccess(); + try { + final Map<Long, LocalResource> allResources = loadAndGetAllResources(); + final Optional<Long> max = allResources.keySet().stream().max(Long::compare); + return max.isPresent() ? max.get() : 0; + } finally { + afterReadAccess(); + } } public void invalidateResource(String relativePath) { @@ -320,39 +359,42 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito } private LocalResource readLocalResource(FileReference fileRef) throws HyracksDataException { - byte[] bytes = ioManager.readAllBytes(fileRef); - if (bytes == null) { - return null; - } - + beforeReadAccess(); try { - final JsonNode jsonNode = OBJECT_MAPPER.readValue(bytes, JsonNode.class); - LocalResource resource = (LocalResource) persistedResourceRegistry.deserialize(jsonNode); - if (resource.getVersion() == ITreeIndexFrame.Constants.VERSION) { - return resource; - } else { - throw new AsterixException("Storage version mismatch."); + byte[] bytes = ioManager.readAllBytes(fileRef); + if (bytes == null) { + return null; } - } catch (Exception e) { - throw HyracksDataException.create(e); + try { + final JsonNode jsonNode = OBJECT_MAPPER.readValue(bytes, JsonNode.class); + LocalResource resource = (LocalResource) persistedResourceRegistry.deserialize(jsonNode); + if (resource.getVersion() == ITreeIndexFrame.Constants.VERSION) { + return resource; + } else { + throw new AsterixException("Storage version mismatch."); + } + } catch (Exception e) { + throw HyracksDataException.create(e); + } + } finally { + afterReadAccess(); } } - public synchronized void setReplicationManager(IReplicationManager replicationManager) { - this.replicationManager = replicationManager; - isReplicationEnabled = replicationManager.isReplicationEnabled(); - - if (isReplicationEnabled) { - filesToBeReplicated = new HashSet<>(); + public void setReplicationManager(IReplicationManager replicationManager) { + beforeWriteAccess(); + try { + this.replicationManager = replicationManager; + isReplicationEnabled = replicationManager.isReplicationEnabled(); + } finally { + afterWriteAccess(); } } private void createReplicationJob(ReplicationOperation operation, FileReference fileRef) throws HyracksDataException { - filesToBeReplicated.clear(); - filesToBeReplicated.add(fileRef.getAbsolutePath()); ReplicationJob job = new ReplicationJob(ReplicationJobType.METADATA, operation, ReplicationExecutionType.SYNC, - filesToBeReplicated); + Set.of(fileRef.getAbsolutePath())); try { replicationManager.submitJob(job); } catch (IOException e) { @@ -363,26 +405,41 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito /** * Deletes physical files of all data verses. */ - public synchronized void deleteStorageData() throws HyracksDataException { - IIOBulkOperation bulkDelete = ioManager.createDeleteBulkOperation(); - for (FileReference root : storageRoots) { - bulkDelete.add(root); + public void deleteStorageData() throws HyracksDataException { + beforeWriteAccess(); + try { + IIOBulkOperation bulkDelete = ioManager.createDeleteBulkOperation(); + for (FileReference root : storageRoots) { + bulkDelete.add(root); + } + ioManager.performBulkOperation(bulkDelete); + createStorageRoots(); + } finally { + afterWriteAccess(); } - ioManager.performBulkOperation(bulkDelete); - createStorageRoots(); } - public synchronized Set<Integer> getAllPartitions() throws HyracksDataException { - return loadAndGetAllResources().values().stream().map(LocalResource::getResource) - .map(DatasetLocalResource.class::cast).map(DatasetLocalResource::getPartition) - .collect(Collectors.toSet()); + public Set<Integer> getAllPartitions() throws HyracksDataException { + beforeReadAccess(); + try { + return loadAndGetAllResources().values().stream().map(LocalResource::getResource) + .map(DatasetLocalResource.class::cast).map(DatasetLocalResource::getPartition) + .collect(Collectors.toSet()); + } finally { + afterReadAccess(); + } } - public synchronized Optional<DatasetResourceReference> getLocalResourceReference(String absoluteFilePath) + public Optional<DatasetResourceReference> getLocalResourceReference(String absoluteFilePath) throws HyracksDataException { - final String localResourcePath = StoragePathUtil.getIndexFileRelativePath(absoluteFilePath); - final LocalResource lr = get(localResourcePath); - return lr != null ? Optional.of(DatasetResourceReference.of(lr)) : Optional.empty(); + beforeReadAccess(); + try { + final String localResourcePath = StoragePathUtil.getIndexFileRelativePath(absoluteFilePath); + final LocalResource lr = get(localResourcePath); + return lr != null ? Optional.of(DatasetResourceReference.of(lr)) : Optional.empty(); + } finally { + afterReadAccess(); + } } /** @@ -393,67 +450,92 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito * @return The set of indexes files * @throws HyracksDataException */ - public synchronized Set<FileReference> getPartitionIndexes(int partition) throws HyracksDataException { - FileReference partitionRoot = getPartitionRoot(partition); - final Map<Long, LocalResource> partitionResourcesMap = getResources(resource -> { - DatasetLocalResource dsResource = (DatasetLocalResource) resource.getResource(); - return dsResource.getPartition() == partition; - }, Collections.singletonList(partitionRoot)); - Set<FileReference> indexes = new HashSet<>(); - for (LocalResource localResource : partitionResourcesMap.values()) { - indexes.add(ioManager.resolve(localResource.getPath())); + public Set<FileReference> getPartitionIndexes(int partition) throws HyracksDataException { + beforeReadAccess(); + try { + FileReference partitionRoot = getPartitionRoot(partition); + final Map<Long, LocalResource> partitionResourcesMap = getResources(resource -> { + DatasetLocalResource dsResource = (DatasetLocalResource) resource.getResource(); + return dsResource.getPartition() == partition; + }, Collections.singletonList(partitionRoot)); + Set<FileReference> indexes = new HashSet<>(); + for (LocalResource localResource : partitionResourcesMap.values()) { + indexes.add(ioManager.resolve(localResource.getPath())); + } + return indexes; + } finally { + afterReadAccess(); } - return indexes; } - public synchronized Map<Long, LocalResource> getPartitionResources(int partition) throws HyracksDataException { - return getResources(r -> true, Collections.singleton(partition)); + public Map<Long, LocalResource> getPartitionResources(int partition) throws HyracksDataException { + beforeReadAccess(); + try { + return getResources(r -> true, Collections.singleton(partition)); + } finally { + afterReadAccess(); + } } - public synchronized Map<String, Long> getPartitionReplicatedResources(int partition, IReplicationStrategy strategy) + public Map<String, Long> getPartitionReplicatedResources(int partition, IReplicationStrategy strategy) throws HyracksDataException { - final Map<String, Long> partitionReplicatedResources = new HashMap<>(); - final Map<Long, LocalResource> partitionResources = getPartitionResources(partition); - for (LocalResource lr : partitionResources.values()) { - DatasetLocalResource datasetLocalResource = (DatasetLocalResource) lr.getResource(); - if (strategy.isMatch(datasetLocalResource.getDatasetId())) { - DatasetResourceReference drr = DatasetResourceReference.of(lr); - partitionReplicatedResources.put(drr.getFileRelativePath().toString(), lr.getId()); + beforeReadAccess(); + try { + final Map<String, Long> partitionReplicatedResources = new HashMap<>(); + final Map<Long, LocalResource> partitionResources = getPartitionResources(partition); + for (LocalResource lr : partitionResources.values()) { + DatasetLocalResource datasetLocalResource = (DatasetLocalResource) lr.getResource(); + if (strategy.isMatch(datasetLocalResource.getDatasetId())) { + DatasetResourceReference drr = DatasetResourceReference.of(lr); + partitionReplicatedResources.put(drr.getFileRelativePath().toString(), lr.getId()); + } } + return partitionReplicatedResources; + } finally { + afterReadAccess(); } - return partitionReplicatedResources; } - public synchronized List<String> getPartitionReplicatedFiles(int partition, IReplicationStrategy strategy) + public List<String> getPartitionReplicatedFiles(int partition, IReplicationStrategy strategy) throws HyracksDataException { - final List<String> partitionReplicatedFiles = new ArrayList<>(); - final Set<FileReference> replicatedIndexes = new HashSet<>(); - final Map<Long, LocalResource> partitionResources = getPartitionResources(partition); - for (LocalResource lr : partitionResources.values()) { - DatasetLocalResource datasetLocalResource = (DatasetLocalResource) lr.getResource(); - if (strategy.isMatch(datasetLocalResource.getDatasetId())) { - replicatedIndexes.add(ioManager.resolve(lr.getPath())); + beforeReadAccess(); + try { + final List<String> partitionReplicatedFiles = new ArrayList<>(); + final Set<FileReference> replicatedIndexes = new HashSet<>(); + final Map<Long, LocalResource> partitionResources = getPartitionResources(partition); + for (LocalResource lr : partitionResources.values()) { + DatasetLocalResource datasetLocalResource = (DatasetLocalResource) lr.getResource(); + if (strategy.isMatch(datasetLocalResource.getDatasetId())) { + replicatedIndexes.add(ioManager.resolve(lr.getPath())); + } } + for (FileReference indexDir : replicatedIndexes) { + partitionReplicatedFiles.addAll(getIndexFiles(indexDir)); + } + return partitionReplicatedFiles; + } finally { + afterReadAccess(); } - for (FileReference indexDir : replicatedIndexes) { - partitionReplicatedFiles.addAll(getIndexFiles(indexDir)); - } - return partitionReplicatedFiles; } - public synchronized long getReplicatedIndexesMaxComponentId(int partition, IReplicationStrategy strategy) + public long getReplicatedIndexesMaxComponentId(int partition, IReplicationStrategy strategy) throws HyracksDataException { - long maxComponentId = LSMComponentId.MIN_VALID_COMPONENT_ID; - final Map<Long, LocalResource> partitionResources = getPartitionResources(partition); - for (LocalResource lr : partitionResources.values()) { - DatasetLocalResource datasetLocalResource = (DatasetLocalResource) lr.getResource(); - if (strategy.isMatch(datasetLocalResource.getDatasetId())) { - final IIndexCheckpointManager indexCheckpointManager = - indexCheckpointManagerProvider.get(DatasetResourceReference.of(lr)); - maxComponentId = Math.max(maxComponentId, indexCheckpointManager.getLatest().getLastComponentId()); + beforeReadAccess(); + try { + long maxComponentId = LSMComponentId.MIN_VALID_COMPONENT_ID; + final Map<Long, LocalResource> partitionResources = getPartitionResources(partition); + for (LocalResource lr : partitionResources.values()) { + DatasetLocalResource datasetLocalResource = (DatasetLocalResource) lr.getResource(); + if (strategy.isMatch(datasetLocalResource.getDatasetId())) { + final IIndexCheckpointManager indexCheckpointManager = + indexCheckpointManagerProvider.get(DatasetResourceReference.of(lr)); + maxComponentId = Math.max(maxComponentId, indexCheckpointManager.getLatest().getLastComponentId()); + } } + return maxComponentId; + } finally { + afterReadAccess(); } - return maxComponentId; } private List<String> getIndexFiles(FileReference indexDir) throws HyracksDataException { @@ -469,44 +551,59 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito } } - public synchronized void cleanup(int partition) throws HyracksDataException { - final Set<FileReference> partitionIndexes = getPartitionIndexes(partition); + public void cleanup(int partition) throws HyracksDataException { + beforeReadAccess(); try { - for (FileReference index : partitionIndexes) { - deleteIndexMaskedFiles(index); - if (isValidIndex(index)) { - deleteIndexInvalidComponents(index); + final Set<FileReference> partitionIndexes = getPartitionIndexes(partition); + try { + for (FileReference index : partitionIndexes) { + deleteIndexMaskedFiles(index); + if (isValidIndex(index)) { + deleteIndexInvalidComponents(index); + } } + } catch (IOException | ParseException e) { + throw HyracksDataException.create(e); } - } catch (IOException | ParseException e) { - throw HyracksDataException.create(e); + } finally { + afterReadAccess(); } } public List<ResourceStorageStats> getStorageStats() throws HyracksDataException { - final List<DatasetResourceReference> allResources = loadAndGetAllResources().values().stream() - .map(DatasetResourceReference::of).collect(Collectors.toList()); - final List<ResourceStorageStats> resourcesStats = new ArrayList<>(); - for (DatasetResourceReference res : allResources) { - final ResourceStorageStats resourceStats = getResourceStats(res); - if (resourceStats != null) { - resourcesStats.add(resourceStats); + beforeReadAccess(); + try { + final List<DatasetResourceReference> allResources = loadAndGetAllResources().values().stream() + .map(DatasetResourceReference::of).collect(Collectors.toList()); + final List<ResourceStorageStats> resourcesStats = new ArrayList<>(); + for (DatasetResourceReference res : allResources) { + final ResourceStorageStats resourceStats = getResourceStats(res); + if (resourceStats != null) { + resourcesStats.add(resourceStats); + } } + return resourcesStats; + } finally { + afterReadAccess(); } - return resourcesStats; } - public synchronized void deleteCorruptedResources() throws HyracksDataException { - IIOBulkOperation bulkDelete = ioManager.createDeleteBulkOperation(); - for (FileReference root : storageRoots) { - final Collection<FileReference> metadataMaskFiles = ioManager.list(root, METADATA_MASK_FILES_FILTER); - for (FileReference metadataMaskFile : metadataMaskFiles) { - final FileReference resourceFile = metadataMaskFile.getParent().getChild(METADATA_FILE_NAME); - bulkDelete.add(resourceFile); - bulkDelete.add(metadataMaskFile); + public void deleteCorruptedResources() throws HyracksDataException { + beforeWriteAccess(); + try { + IIOBulkOperation bulkDelete = ioManager.createDeleteBulkOperation(); + for (FileReference root : storageRoots) { + final Collection<FileReference> metadataMaskFiles = ioManager.list(root, METADATA_MASK_FILES_FILTER); + for (FileReference metadataMaskFile : metadataMaskFiles) { + final FileReference resourceFile = metadataMaskFile.getParent().getChild(METADATA_FILE_NAME); + bulkDelete.add(resourceFile); + bulkDelete.add(metadataMaskFile); + } } + ioManager.performBulkOperation(bulkDelete); + } finally { + afterWriteAccess(); } - ioManager.performBulkOperation(bulkDelete); } private void deleteIndexMaskedFiles(FileReference index) throws IOException { @@ -601,20 +698,25 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito public long getDatasetSize(DatasetCopyIdentifier datasetIdentifier, Set<Integer> nodePartitions) throws HyracksDataException { - long totalSize = 0; - final Map<Long, LocalResource> dataverse = getResources(lr -> { - final ResourceReference resourceReference = ResourceReference.ofIndex(lr.getPath()); - return datasetIdentifier.isMatch(resourceReference); - }, nodePartitions); - final List<DatasetResourceReference> allResources = - dataverse.values().stream().map(DatasetResourceReference::of).collect(Collectors.toList()); - for (DatasetResourceReference res : allResources) { - final ResourceStorageStats resourceStats = getResourceStats(res); - if (resourceStats != null) { - totalSize += resourceStats.getTotalSize(); + beforeReadAccess(); + try { + long totalSize = 0; + final Map<Long, LocalResource> dataverse = getResources(lr -> { + final ResourceReference resourceReference = ResourceReference.ofIndex(lr.getPath()); + return datasetIdentifier.isMatch(resourceReference); + }, nodePartitions); + final List<DatasetResourceReference> allResources = + dataverse.values().stream().map(DatasetResourceReference::of).collect(Collectors.toList()); + for (DatasetResourceReference res : allResources) { + final ResourceStorageStats resourceStats = getResourceStats(res); + if (resourceStats != null) { + totalSize += resourceStats.getTotalSize(); + } } + return totalSize; + } finally { + afterReadAccess(); } - return totalSize; } private void createResourceFileMask(FileReference resourceFile) throws HyracksDataException { @@ -644,13 +746,18 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito return COMPONENT_FILES_FILTER.accept(indexDir.getFile(), fileName); } - public synchronized List<FileReference> getOnDiskPartitions() { - List<FileReference> onDiskPartitions = new ArrayList<>(); - for (FileReference root : storageRoots) { - onDiskPartitions.addAll(IoUtil.getMatchingChildren(root, (dir, name) -> dir != null && dir.isDirectory() - && name.startsWith(StorageConstants.PARTITION_DIR_PREFIX))); + public List<FileReference> getOnDiskPartitions() { + beforeReadAccess(); + try { + List<FileReference> onDiskPartitions = new ArrayList<>(); + for (FileReference root : storageRoots) { + onDiskPartitions.addAll(IoUtil.getMatchingChildren(root, (dir, name) -> dir != null && dir.isDirectory() + && name.startsWith(StorageConstants.PARTITION_DIR_PREFIX))); + } + return onDiskPartitions; + } finally { + afterReadAccess(); } - return onDiskPartitions; } public FileReference getPartitionRoot(int partition) throws HyracksDataException { @@ -660,16 +767,37 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito } public void deletePartition(int partitionId) throws HyracksDataException { - Collection<FileReference> onDiskPartitions = getOnDiskPartitions(); - IIOBulkOperation bulkDelete = ioManager.createDeleteBulkOperation(); - for (FileReference onDiskPartition : onDiskPartitions) { - int partitionNum = StoragePathUtil.getPartitionNumFromRelativePath(onDiskPartition.getAbsolutePath()); - if (partitionNum == partitionId) { - LOGGER.warn("deleting partition {}", partitionNum); - bulkDelete.add(onDiskPartition); - break; + beforeReadAccess(); + try { + Collection<FileReference> onDiskPartitions = getOnDiskPartitions(); + IIOBulkOperation bulkDelete = ioManager.createDeleteBulkOperation(); + for (FileReference onDiskPartition : onDiskPartitions) { + int partitionNum = StoragePathUtil.getPartitionNumFromRelativePath(onDiskPartition.getAbsolutePath()); + if (partitionNum == partitionId) { + LOGGER.warn("deleting partition {}", partitionNum); + bulkDelete.add(onDiskPartition); + break; + } } + ioManager.performBulkOperation(bulkDelete); + } finally { + afterReadAccess(); } - ioManager.performBulkOperation(bulkDelete); + } + + private void beforeWriteAccess() { + resourcesAccessLock.readLock().lock(); + } + + private void afterWriteAccess() { + resourcesAccessLock.readLock().unlock(); + } + + private void beforeReadAccess() { + resourcesAccessLock.writeLock().lock(); + } + + private void afterReadAccess() { + resourcesAccessLock.writeLock().unlock(); } } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/build/IndexBuilder.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/build/IndexBuilder.java index 1c0d7b49d5..8a67c71d09 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/build/IndexBuilder.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/build/IndexBuilder.java @@ -68,49 +68,47 @@ public class IndexBuilder implements IIndexBuilder { @Override public void build() throws HyracksDataException { IResourceLifecycleManager<IIndex> lcManager = storageManager.getLifecycleManager(ctx); - synchronized (lcManager) { - // The previous resource Id needs to be removed since calling IIndex.create() may possibly destroy any - // physical artifact that the LocalResourceRepository is managing (e.g. a file containing the resource Id). - // Once the index has been created, a new resource Id can be generated. - ILocalResourceRepository localResourceRepository = storageManager.getLocalResourceRepository(ctx); - LocalResource lr = localResourceRepository.get(resourceRelPath); - long resourceId = lr == null ? -1 : lr.getId(); - if (resourceId != -1) { - localResourceRepository.delete(resourceRelPath); - } - resourceId = resourceIdFactory.createId(); - IResource resource = localResourceFactory.createResource(resourceRef); - lr = new LocalResource(resourceId, ITreeIndexFrame.Constants.VERSION, durable, resource); - IIndex index = lcManager.get(resourceRelPath); - if (index != null) { - //how is this right?????????? <needs to be fixed> - //The reason for this is to handle many cases such as: - //1. Crash while delete index is running (we don't do global cleanup on restart) - //2. Node leaves and then join with old data - LOGGER.log(Level.WARN, "Removing existing index on index create for the index: " + resourceRelPath); - lcManager.unregister(resourceRelPath); - index.destroy(); - } else { - final FileReference resolvedResourceRef = ctx.getIoManager().resolve(resourceRelPath); - if (resolvedResourceRef.getFile().exists()) { - // Index is not registered but the index file exists - // This is another big problem that we need to disallow soon - // We can only disallow this if we have a global cleanup after crash - // on reboot - LOGGER.warn( - "Deleting {} on index create. The index is not registered but the file exists in the filesystem", - resolvedResourceRef); - ctx.getIoManager().delete(resolvedResourceRef); - } - index = resource.createInstance(ctx); - } - index.create(); - try { - localResourceRepository.insert(lr); - } catch (IOException e) { - throw HyracksDataException.create(e); + // The previous resource Id needs to be removed since calling IIndex.create() may possibly destroy any + // physical artifact that the LocalResourceRepository is managing (e.g. a file containing the resource Id). + // Once the index has been created, a new resource Id can be generated. + ILocalResourceRepository localResourceRepository = storageManager.getLocalResourceRepository(ctx); + LocalResource lr = localResourceRepository.get(resourceRelPath); + long resourceId = lr == null ? -1 : lr.getId(); + if (resourceId != -1) { + localResourceRepository.delete(resourceRelPath); + } + resourceId = resourceIdFactory.createId(); + IResource resource = localResourceFactory.createResource(resourceRef); + lr = new LocalResource(resourceId, ITreeIndexFrame.Constants.VERSION, durable, resource); + IIndex index = lcManager.get(resourceRelPath); + if (index != null) { + //how is this right?????????? <needs to be fixed> + //The reason for this is to handle many cases such as: + //1. Crash while delete index is running (we don't do global cleanup on restart) + //2. Node leaves and then join with old data + LOGGER.log(Level.WARN, "Removing existing index on index create for the index: " + resourceRelPath); + lcManager.unregister(resourceRelPath); + index.destroy(); + } else { + final FileReference resolvedResourceRef = ctx.getIoManager().resolve(resourceRelPath); + if (resolvedResourceRef.getFile().exists()) { + // Index is not registered but the index file exists + // This is another big problem that we need to disallow soon + // We can only disallow this if we have a global cleanup after crash + // on reboot + LOGGER.warn( + "Deleting {} on index create. The index is not registered but the file exists in the filesystem", + resolvedResourceRef); + ctx.getIoManager().delete(resolvedResourceRef); } - lcManager.register(resourceRelPath, index); + index = resource.createInstance(ctx); + } + index.create(); + try { + localResourceRepository.insert(lr); + } catch (IOException e) { + throw HyracksDataException.create(e); } + lcManager.register(resourceRelPath, index); } }