This is an automated email from the ASF dual-hosted git repository.

mhubail pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 8e51145806 [NO ISSUE][STO] Allow concurrent modification on persisted 
resources
8e51145806 is described below

commit 8e51145806fb54ea8536487b484dd05049462f8d
Author: Murtadha Hubail <mhub...@apache.org>
AuthorDate: Fri Sep 29 22:09:59 2023 +0300

    [NO ISSUE][STO] Allow concurrent modification on persisted resources
    
    - user model changes: no
    - storage format changes: no
    - interface changes: no
    
    Details:
    
    - To allow index's resources to be created on different partitions
      concurrently, replace the synchronization by read/write lock in
      PersistentLocalResourceRepository.
    - Any operation that might modify the persisted files will acquire
      a read lock.
    - Any operation that attempts to read the persisted files will acquire
      a write lock to wait for any on-going modifications.
    
    Change-Id: Id435bfc113a0b8e3e2a1f75712f0ded74ae0ee6f
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17824
    Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
    Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
    Reviewed-by: Ali Alsuliman <ali.al.solai...@gmail.com>
---
 .../PersistentLocalResourceRepository.java         | 502 +++++++++++++--------
 .../storage/am/common/build/IndexBuilder.java      |  82 ++--
 2 files changed, 355 insertions(+), 229 deletions(-)

diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index 1eef182c06..33cbf2dc04 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -39,6 +39,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
@@ -83,7 +84,6 @@ public class PersistentLocalResourceRepository implements 
ILocalResourceReposito
 
     private static final Logger LOGGER = LogManager.getLogger();
     private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
-
     private static final String METADATA_FILE_MASK_NAME =
             StorageConstants.MASK_FILE_PREFIX + 
StorageConstants.METADATA_FILE_NAME;
     private static final FilenameFilter LSM_INDEX_FILES_FILTER =
@@ -94,7 +94,6 @@ public class PersistentLocalResourceRepository implements 
ILocalResourceReposito
             (dir, name) -> name.equals(StorageConstants.METADATA_FILE_NAME);
     private static final FilenameFilter METADATA_MASK_FILES_FILTER =
             (dir, name) -> name.equals(METADATA_FILE_MASK_NAME);
-
     private static final int MAX_CACHED_RESOURCES = 1000;
 
     // Finals
@@ -102,11 +101,11 @@ public class PersistentLocalResourceRepository implements 
ILocalResourceReposito
     private final Cache<String, LocalResource> resourceCache;
     // Mutables
     private boolean isReplicationEnabled = false;
-    private Set<String> filesToBeReplicated;
     private IReplicationManager replicationManager;
     private final List<FileReference> storageRoots;
     private final IIndexCheckpointManagerProvider 
indexCheckpointManagerProvider;
     private final IPersistedResourceRegistry persistedResourceRegistry;
+    private final ReentrantReadWriteLock resourcesAccessLock = new 
ReentrantReadWriteLock(true);
 
     public PersistentLocalResourceRepository(IIOManager ioManager,
             IIndexCheckpointManagerProvider indexCheckpointManagerProvider,
@@ -135,23 +134,29 @@ public class PersistentLocalResourceRepository implements 
ILocalResourceReposito
     }
 
     @Override
-    public synchronized LocalResource get(String relativePath) throws 
HyracksDataException {
-        LocalResource resource = resourceCache.getIfPresent(relativePath);
-        if (resource == null) {
-            FileReference resourceFile = getLocalResourceFileByName(ioManager, 
relativePath);
-            resource = readLocalResource(resourceFile);
-            if (resource != null) {
-                resourceCache.put(relativePath, resource);
+    public LocalResource get(String relativePath) throws HyracksDataException {
+        beforeReadAccess();
+        try {
+            LocalResource resource = resourceCache.getIfPresent(relativePath);
+            if (resource == null) {
+                FileReference resourceFile = 
getLocalResourceFileByName(ioManager, relativePath);
+                resource = readLocalResource(resourceFile);
+                if (resource != null) {
+                    resourceCache.put(relativePath, resource);
+                }
             }
+            return resource;
+        } finally {
+            afterReadAccess();
         }
-        return resource;
     }
 
     @SuppressWarnings("squid:S1181")
     @Override
     public void insert(LocalResource resource) throws HyracksDataException {
         FileReference resourceFile;
-        synchronized (this) {
+        beforeWriteAccess();
+        try {
             String relativePath = getFileName(resource.getPath());
             resourceFile = ioManager.resolve(relativePath);
             if (resourceFile.getFile().exists()) {
@@ -178,6 +183,8 @@ public class PersistentLocalResourceRepository implements 
ILocalResourceReposito
                 ExitUtil.halt(ExitUtil.EC_ERROR_CREATING_RESOURCES);
             }
             resourceCache.put(resource.getPath(), resource);
+        } finally {
+            afterWriteAccess();
         }
         // do not do the replication operation on the synchronized to avoid 
blocking other threads
         // on network operations
@@ -216,7 +223,8 @@ public class PersistentLocalResourceRepository implements 
ILocalResourceReposito
                 LOGGER.error("failed to delete resource file {} from 
replicas", resourceFile);
             }
         }
-        synchronized (this) {
+        beforeWriteAccess();
+        try {
             try {
                 if (resourceExists) {
                     ioManager.delete(resourceFile);
@@ -229,6 +237,8 @@ public class PersistentLocalResourceRepository implements 
ILocalResourceReposito
             } finally {
                 invalidateResource(relativePath);
             }
+        } finally {
+            afterWriteAccess();
         }
     }
 
@@ -238,72 +248,101 @@ public class PersistentLocalResourceRepository 
implements ILocalResourceReposito
         return ioManager.resolve(fileName);
     }
 
-    public synchronized Map<Long, LocalResource> 
getResources(Predicate<LocalResource> filter,
-            List<FileReference> roots) throws HyracksDataException {
-        Map<Long, LocalResource> resourcesMap = new HashMap<>();
-        for (FileReference root : roots) {
-            final Collection<FileReference> files = ioManager.list(root, 
METADATA_FILES_FILTER);
-            try {
-                for (FileReference file : files) {
-                    final LocalResource localResource = 
readLocalResource(file);
-                    if (localResource != null && filter.test(localResource)) {
-                        LocalResource duplicate = 
resourcesMap.putIfAbsent(localResource.getId(), localResource);
-                        if (duplicate != null) {
-                            LOGGER.warn("found duplicate resource ids {} and 
{}", localResource, duplicate);
+    public Map<Long, LocalResource> getResources(Predicate<LocalResource> 
filter, List<FileReference> roots)
+            throws HyracksDataException {
+        beforeReadAccess();
+        try {
+            Map<Long, LocalResource> resourcesMap = new HashMap<>();
+            for (FileReference root : roots) {
+                final Collection<FileReference> files = ioManager.list(root, 
METADATA_FILES_FILTER);
+                try {
+                    for (FileReference file : files) {
+                        final LocalResource localResource = 
readLocalResource(file);
+                        if (localResource != null && 
filter.test(localResource)) {
+                            LocalResource duplicate = 
resourcesMap.putIfAbsent(localResource.getId(), localResource);
+                            if (duplicate != null) {
+                                LOGGER.warn("found duplicate resource ids {} 
and {}", localResource, duplicate);
+                            }
                         }
                     }
+                } catch (IOException e) {
+                    throw HyracksDataException.create(e);
                 }
-            } catch (IOException e) {
-                throw HyracksDataException.create(e);
             }
+            return resourcesMap;
+        } finally {
+            afterReadAccess();
         }
-        return resourcesMap;
     }
 
-    public synchronized Map<Long, LocalResource> 
getResources(Predicate<LocalResource> filter)
-            throws HyracksDataException {
-        return getResources(filter, storageRoots);
+    public Map<Long, LocalResource> getResources(Predicate<LocalResource> 
filter) throws HyracksDataException {
+        beforeReadAccess();
+        try {
+            return getResources(filter, storageRoots);
+        } finally {
+            afterReadAccess();
+        }
     }
 
-    public synchronized Map<Long, LocalResource> 
getResources(Predicate<LocalResource> filter, Set<Integer> partitions)
+    public Map<Long, LocalResource> getResources(Predicate<LocalResource> 
filter, Set<Integer> partitions)
             throws HyracksDataException {
-        List<FileReference> partitionsRoots = new ArrayList<>();
-        for (Integer partition : partitions) {
-            partitionsRoots.add(getPartitionRoot(partition));
+        beforeReadAccess();
+        try {
+            List<FileReference> partitionsRoots = new ArrayList<>();
+            for (Integer partition : partitions) {
+                partitionsRoots.add(getPartitionRoot(partition));
+            }
+            return getResources(filter, partitionsRoots);
+        } finally {
+            afterReadAccess();
         }
-        return getResources(filter, partitionsRoots);
     }
 
-    public synchronized void deleteInvalidIndexes(Predicate<LocalResource> 
filter) throws HyracksDataException {
-        IIOBulkOperation bulkDelete = ioManager.createDeleteBulkOperation();
-        for (FileReference root : storageRoots) {
-            final Collection<FileReference> files = ioManager.list(root, 
METADATA_FILES_FILTER);
-            try {
-                for (FileReference file : files) {
-                    final LocalResource localResource = 
readLocalResource(file);
-                    if (localResource != null && filter.test(localResource)) {
-                        FileReference parent = file.getParent();
-                        LOGGER.warn("deleting invalid metadata index {}", 
parent);
-                        bulkDelete.add(parent);
+    public void deleteInvalidIndexes(Predicate<LocalResource> filter) throws 
HyracksDataException {
+        beforeReadAccess();
+        try {
+            IIOBulkOperation bulkDelete = 
ioManager.createDeleteBulkOperation();
+            for (FileReference root : storageRoots) {
+                final Collection<FileReference> files = ioManager.list(root, 
METADATA_FILES_FILTER);
+                try {
+                    for (FileReference file : files) {
+                        final LocalResource localResource = 
readLocalResource(file);
+                        if (localResource != null && 
filter.test(localResource)) {
+                            FileReference parent = file.getParent();
+                            LOGGER.warn("deleting invalid metadata index {}", 
parent);
+                            bulkDelete.add(parent);
+                        }
                     }
+                } catch (IOException e) {
+                    throw HyracksDataException.create(e);
                 }
-            } catch (IOException e) {
-                throw HyracksDataException.create(e);
             }
+            ioManager.performBulkOperation(bulkDelete);
+            resourceCache.invalidateAll();
+        } finally {
+            afterReadAccess();
         }
-        ioManager.performBulkOperation(bulkDelete);
-        resourceCache.invalidateAll();
     }
 
     public Map<Long, LocalResource> loadAndGetAllResources() throws 
HyracksDataException {
-        return getResources(p -> true);
+        beforeReadAccess();
+        try {
+            return getResources(p -> true);
+        } finally {
+            afterReadAccess();
+        }
     }
 
     @Override
-    public synchronized long maxId() throws HyracksDataException {
-        final Map<Long, LocalResource> allResources = loadAndGetAllResources();
-        final Optional<Long> max = 
allResources.keySet().stream().max(Long::compare);
-        return max.isPresent() ? max.get() : 0;
+    public long maxId() throws HyracksDataException {
+        beforeReadAccess();
+        try {
+            final Map<Long, LocalResource> allResources = 
loadAndGetAllResources();
+            final Optional<Long> max = 
allResources.keySet().stream().max(Long::compare);
+            return max.isPresent() ? max.get() : 0;
+        } finally {
+            afterReadAccess();
+        }
     }
 
     public void invalidateResource(String relativePath) {
@@ -320,39 +359,42 @@ public class PersistentLocalResourceRepository implements 
ILocalResourceReposito
     }
 
     private LocalResource readLocalResource(FileReference fileRef) throws 
HyracksDataException {
-        byte[] bytes = ioManager.readAllBytes(fileRef);
-        if (bytes == null) {
-            return null;
-        }
-
+        beforeReadAccess();
         try {
-            final JsonNode jsonNode = OBJECT_MAPPER.readValue(bytes, 
JsonNode.class);
-            LocalResource resource = (LocalResource) 
persistedResourceRegistry.deserialize(jsonNode);
-            if (resource.getVersion() == ITreeIndexFrame.Constants.VERSION) {
-                return resource;
-            } else {
-                throw new AsterixException("Storage version mismatch.");
+            byte[] bytes = ioManager.readAllBytes(fileRef);
+            if (bytes == null) {
+                return null;
             }
-        } catch (Exception e) {
-            throw HyracksDataException.create(e);
+            try {
+                final JsonNode jsonNode = OBJECT_MAPPER.readValue(bytes, 
JsonNode.class);
+                LocalResource resource = (LocalResource) 
persistedResourceRegistry.deserialize(jsonNode);
+                if (resource.getVersion() == 
ITreeIndexFrame.Constants.VERSION) {
+                    return resource;
+                } else {
+                    throw new AsterixException("Storage version mismatch.");
+                }
+            } catch (Exception e) {
+                throw HyracksDataException.create(e);
+            }
+        } finally {
+            afterReadAccess();
         }
     }
 
-    public synchronized void setReplicationManager(IReplicationManager 
replicationManager) {
-        this.replicationManager = replicationManager;
-        isReplicationEnabled = replicationManager.isReplicationEnabled();
-
-        if (isReplicationEnabled) {
-            filesToBeReplicated = new HashSet<>();
+    public void setReplicationManager(IReplicationManager replicationManager) {
+        beforeWriteAccess();
+        try {
+            this.replicationManager = replicationManager;
+            isReplicationEnabled = replicationManager.isReplicationEnabled();
+        } finally {
+            afterWriteAccess();
         }
     }
 
     private void createReplicationJob(ReplicationOperation operation, 
FileReference fileRef)
             throws HyracksDataException {
-        filesToBeReplicated.clear();
-        filesToBeReplicated.add(fileRef.getAbsolutePath());
         ReplicationJob job = new ReplicationJob(ReplicationJobType.METADATA, 
operation, ReplicationExecutionType.SYNC,
-                filesToBeReplicated);
+                Set.of(fileRef.getAbsolutePath()));
         try {
             replicationManager.submitJob(job);
         } catch (IOException e) {
@@ -363,26 +405,41 @@ public class PersistentLocalResourceRepository implements 
ILocalResourceReposito
     /**
      * Deletes physical files of all data verses.
      */
-    public synchronized void deleteStorageData() throws HyracksDataException {
-        IIOBulkOperation bulkDelete = ioManager.createDeleteBulkOperation();
-        for (FileReference root : storageRoots) {
-            bulkDelete.add(root);
+    public void deleteStorageData() throws HyracksDataException {
+        beforeWriteAccess();
+        try {
+            IIOBulkOperation bulkDelete = 
ioManager.createDeleteBulkOperation();
+            for (FileReference root : storageRoots) {
+                bulkDelete.add(root);
+            }
+            ioManager.performBulkOperation(bulkDelete);
+            createStorageRoots();
+        } finally {
+            afterWriteAccess();
         }
-        ioManager.performBulkOperation(bulkDelete);
-        createStorageRoots();
     }
 
-    public synchronized Set<Integer> getAllPartitions() throws 
HyracksDataException {
-        return 
loadAndGetAllResources().values().stream().map(LocalResource::getResource)
-                
.map(DatasetLocalResource.class::cast).map(DatasetLocalResource::getPartition)
-                .collect(Collectors.toSet());
+    public Set<Integer> getAllPartitions() throws HyracksDataException {
+        beforeReadAccess();
+        try {
+            return 
loadAndGetAllResources().values().stream().map(LocalResource::getResource)
+                    
.map(DatasetLocalResource.class::cast).map(DatasetLocalResource::getPartition)
+                    .collect(Collectors.toSet());
+        } finally {
+            afterReadAccess();
+        }
     }
 
-    public synchronized Optional<DatasetResourceReference> 
getLocalResourceReference(String absoluteFilePath)
+    public Optional<DatasetResourceReference> getLocalResourceReference(String 
absoluteFilePath)
             throws HyracksDataException {
-        final String localResourcePath = 
StoragePathUtil.getIndexFileRelativePath(absoluteFilePath);
-        final LocalResource lr = get(localResourcePath);
-        return lr != null ? Optional.of(DatasetResourceReference.of(lr)) : 
Optional.empty();
+        beforeReadAccess();
+        try {
+            final String localResourcePath = 
StoragePathUtil.getIndexFileRelativePath(absoluteFilePath);
+            final LocalResource lr = get(localResourcePath);
+            return lr != null ? Optional.of(DatasetResourceReference.of(lr)) : 
Optional.empty();
+        } finally {
+            afterReadAccess();
+        }
     }
 
     /**
@@ -393,67 +450,92 @@ public class PersistentLocalResourceRepository implements 
ILocalResourceReposito
      * @return The set of indexes files
      * @throws HyracksDataException
      */
-    public synchronized Set<FileReference> getPartitionIndexes(int partition) 
throws HyracksDataException {
-        FileReference partitionRoot = getPartitionRoot(partition);
-        final Map<Long, LocalResource> partitionResourcesMap = 
getResources(resource -> {
-            DatasetLocalResource dsResource = (DatasetLocalResource) 
resource.getResource();
-            return dsResource.getPartition() == partition;
-        }, Collections.singletonList(partitionRoot));
-        Set<FileReference> indexes = new HashSet<>();
-        for (LocalResource localResource : partitionResourcesMap.values()) {
-            indexes.add(ioManager.resolve(localResource.getPath()));
+    public Set<FileReference> getPartitionIndexes(int partition) throws 
HyracksDataException {
+        beforeReadAccess();
+        try {
+            FileReference partitionRoot = getPartitionRoot(partition);
+            final Map<Long, LocalResource> partitionResourcesMap = 
getResources(resource -> {
+                DatasetLocalResource dsResource = (DatasetLocalResource) 
resource.getResource();
+                return dsResource.getPartition() == partition;
+            }, Collections.singletonList(partitionRoot));
+            Set<FileReference> indexes = new HashSet<>();
+            for (LocalResource localResource : partitionResourcesMap.values()) 
{
+                indexes.add(ioManager.resolve(localResource.getPath()));
+            }
+            return indexes;
+        } finally {
+            afterReadAccess();
         }
-        return indexes;
     }
 
-    public synchronized Map<Long, LocalResource> getPartitionResources(int 
partition) throws HyracksDataException {
-        return getResources(r -> true, Collections.singleton(partition));
+    public Map<Long, LocalResource> getPartitionResources(int partition) 
throws HyracksDataException {
+        beforeReadAccess();
+        try {
+            return getResources(r -> true, Collections.singleton(partition));
+        } finally {
+            afterReadAccess();
+        }
     }
 
-    public synchronized Map<String, Long> getPartitionReplicatedResources(int 
partition, IReplicationStrategy strategy)
+    public Map<String, Long> getPartitionReplicatedResources(int partition, 
IReplicationStrategy strategy)
             throws HyracksDataException {
-        final Map<String, Long> partitionReplicatedResources = new HashMap<>();
-        final Map<Long, LocalResource> partitionResources = 
getPartitionResources(partition);
-        for (LocalResource lr : partitionResources.values()) {
-            DatasetLocalResource datasetLocalResource = (DatasetLocalResource) 
lr.getResource();
-            if (strategy.isMatch(datasetLocalResource.getDatasetId())) {
-                DatasetResourceReference drr = DatasetResourceReference.of(lr);
-                
partitionReplicatedResources.put(drr.getFileRelativePath().toString(), 
lr.getId());
+        beforeReadAccess();
+        try {
+            final Map<String, Long> partitionReplicatedResources = new 
HashMap<>();
+            final Map<Long, LocalResource> partitionResources = 
getPartitionResources(partition);
+            for (LocalResource lr : partitionResources.values()) {
+                DatasetLocalResource datasetLocalResource = 
(DatasetLocalResource) lr.getResource();
+                if (strategy.isMatch(datasetLocalResource.getDatasetId())) {
+                    DatasetResourceReference drr = 
DatasetResourceReference.of(lr);
+                    
partitionReplicatedResources.put(drr.getFileRelativePath().toString(), 
lr.getId());
+                }
             }
+            return partitionReplicatedResources;
+        } finally {
+            afterReadAccess();
         }
-        return partitionReplicatedResources;
     }
 
-    public synchronized List<String> getPartitionReplicatedFiles(int 
partition, IReplicationStrategy strategy)
+    public List<String> getPartitionReplicatedFiles(int partition, 
IReplicationStrategy strategy)
             throws HyracksDataException {
-        final List<String> partitionReplicatedFiles = new ArrayList<>();
-        final Set<FileReference> replicatedIndexes = new HashSet<>();
-        final Map<Long, LocalResource> partitionResources = 
getPartitionResources(partition);
-        for (LocalResource lr : partitionResources.values()) {
-            DatasetLocalResource datasetLocalResource = (DatasetLocalResource) 
lr.getResource();
-            if (strategy.isMatch(datasetLocalResource.getDatasetId())) {
-                replicatedIndexes.add(ioManager.resolve(lr.getPath()));
+        beforeReadAccess();
+        try {
+            final List<String> partitionReplicatedFiles = new ArrayList<>();
+            final Set<FileReference> replicatedIndexes = new HashSet<>();
+            final Map<Long, LocalResource> partitionResources = 
getPartitionResources(partition);
+            for (LocalResource lr : partitionResources.values()) {
+                DatasetLocalResource datasetLocalResource = 
(DatasetLocalResource) lr.getResource();
+                if (strategy.isMatch(datasetLocalResource.getDatasetId())) {
+                    replicatedIndexes.add(ioManager.resolve(lr.getPath()));
+                }
             }
+            for (FileReference indexDir : replicatedIndexes) {
+                partitionReplicatedFiles.addAll(getIndexFiles(indexDir));
+            }
+            return partitionReplicatedFiles;
+        } finally {
+            afterReadAccess();
         }
-        for (FileReference indexDir : replicatedIndexes) {
-            partitionReplicatedFiles.addAll(getIndexFiles(indexDir));
-        }
-        return partitionReplicatedFiles;
     }
 
-    public synchronized long getReplicatedIndexesMaxComponentId(int partition, 
IReplicationStrategy strategy)
+    public long getReplicatedIndexesMaxComponentId(int partition, 
IReplicationStrategy strategy)
             throws HyracksDataException {
-        long maxComponentId = LSMComponentId.MIN_VALID_COMPONENT_ID;
-        final Map<Long, LocalResource> partitionResources = 
getPartitionResources(partition);
-        for (LocalResource lr : partitionResources.values()) {
-            DatasetLocalResource datasetLocalResource = (DatasetLocalResource) 
lr.getResource();
-            if (strategy.isMatch(datasetLocalResource.getDatasetId())) {
-                final IIndexCheckpointManager indexCheckpointManager =
-                        
indexCheckpointManagerProvider.get(DatasetResourceReference.of(lr));
-                maxComponentId = Math.max(maxComponentId, 
indexCheckpointManager.getLatest().getLastComponentId());
+        beforeReadAccess();
+        try {
+            long maxComponentId = LSMComponentId.MIN_VALID_COMPONENT_ID;
+            final Map<Long, LocalResource> partitionResources = 
getPartitionResources(partition);
+            for (LocalResource lr : partitionResources.values()) {
+                DatasetLocalResource datasetLocalResource = 
(DatasetLocalResource) lr.getResource();
+                if (strategy.isMatch(datasetLocalResource.getDatasetId())) {
+                    final IIndexCheckpointManager indexCheckpointManager =
+                            
indexCheckpointManagerProvider.get(DatasetResourceReference.of(lr));
+                    maxComponentId = Math.max(maxComponentId, 
indexCheckpointManager.getLatest().getLastComponentId());
+                }
             }
+            return maxComponentId;
+        } finally {
+            afterReadAccess();
         }
-        return maxComponentId;
     }
 
     private List<String> getIndexFiles(FileReference indexDir) throws 
HyracksDataException {
@@ -469,44 +551,59 @@ public class PersistentLocalResourceRepository implements 
ILocalResourceReposito
         }
     }
 
-    public synchronized void cleanup(int partition) throws 
HyracksDataException {
-        final Set<FileReference> partitionIndexes = 
getPartitionIndexes(partition);
+    public void cleanup(int partition) throws HyracksDataException {
+        beforeReadAccess();
         try {
-            for (FileReference index : partitionIndexes) {
-                deleteIndexMaskedFiles(index);
-                if (isValidIndex(index)) {
-                    deleteIndexInvalidComponents(index);
+            final Set<FileReference> partitionIndexes = 
getPartitionIndexes(partition);
+            try {
+                for (FileReference index : partitionIndexes) {
+                    deleteIndexMaskedFiles(index);
+                    if (isValidIndex(index)) {
+                        deleteIndexInvalidComponents(index);
+                    }
                 }
+            } catch (IOException | ParseException e) {
+                throw HyracksDataException.create(e);
             }
-        } catch (IOException | ParseException e) {
-            throw HyracksDataException.create(e);
+        } finally {
+            afterReadAccess();
         }
     }
 
     public List<ResourceStorageStats> getStorageStats() throws 
HyracksDataException {
-        final List<DatasetResourceReference> allResources = 
loadAndGetAllResources().values().stream()
-                
.map(DatasetResourceReference::of).collect(Collectors.toList());
-        final List<ResourceStorageStats> resourcesStats = new ArrayList<>();
-        for (DatasetResourceReference res : allResources) {
-            final ResourceStorageStats resourceStats = getResourceStats(res);
-            if (resourceStats != null) {
-                resourcesStats.add(resourceStats);
+        beforeReadAccess();
+        try {
+            final List<DatasetResourceReference> allResources = 
loadAndGetAllResources().values().stream()
+                    
.map(DatasetResourceReference::of).collect(Collectors.toList());
+            final List<ResourceStorageStats> resourcesStats = new 
ArrayList<>();
+            for (DatasetResourceReference res : allResources) {
+                final ResourceStorageStats resourceStats = 
getResourceStats(res);
+                if (resourceStats != null) {
+                    resourcesStats.add(resourceStats);
+                }
             }
+            return resourcesStats;
+        } finally {
+            afterReadAccess();
         }
-        return resourcesStats;
     }
 
-    public synchronized void deleteCorruptedResources() throws 
HyracksDataException {
-        IIOBulkOperation bulkDelete = ioManager.createDeleteBulkOperation();
-        for (FileReference root : storageRoots) {
-            final Collection<FileReference> metadataMaskFiles = 
ioManager.list(root, METADATA_MASK_FILES_FILTER);
-            for (FileReference metadataMaskFile : metadataMaskFiles) {
-                final FileReference resourceFile = 
metadataMaskFile.getParent().getChild(METADATA_FILE_NAME);
-                bulkDelete.add(resourceFile);
-                bulkDelete.add(metadataMaskFile);
+    public void deleteCorruptedResources() throws HyracksDataException {
+        beforeWriteAccess();
+        try {
+            IIOBulkOperation bulkDelete = 
ioManager.createDeleteBulkOperation();
+            for (FileReference root : storageRoots) {
+                final Collection<FileReference> metadataMaskFiles = 
ioManager.list(root, METADATA_MASK_FILES_FILTER);
+                for (FileReference metadataMaskFile : metadataMaskFiles) {
+                    final FileReference resourceFile = 
metadataMaskFile.getParent().getChild(METADATA_FILE_NAME);
+                    bulkDelete.add(resourceFile);
+                    bulkDelete.add(metadataMaskFile);
+                }
             }
+            ioManager.performBulkOperation(bulkDelete);
+        } finally {
+            afterWriteAccess();
         }
-        ioManager.performBulkOperation(bulkDelete);
     }
 
     private void deleteIndexMaskedFiles(FileReference index) throws 
IOException {
@@ -601,20 +698,25 @@ public class PersistentLocalResourceRepository implements 
ILocalResourceReposito
 
     public long getDatasetSize(DatasetCopyIdentifier datasetIdentifier, 
Set<Integer> nodePartitions)
             throws HyracksDataException {
-        long totalSize = 0;
-        final Map<Long, LocalResource> dataverse = getResources(lr -> {
-            final ResourceReference resourceReference = 
ResourceReference.ofIndex(lr.getPath());
-            return datasetIdentifier.isMatch(resourceReference);
-        }, nodePartitions);
-        final List<DatasetResourceReference> allResources =
-                
dataverse.values().stream().map(DatasetResourceReference::of).collect(Collectors.toList());
-        for (DatasetResourceReference res : allResources) {
-            final ResourceStorageStats resourceStats = getResourceStats(res);
-            if (resourceStats != null) {
-                totalSize += resourceStats.getTotalSize();
+        beforeReadAccess();
+        try {
+            long totalSize = 0;
+            final Map<Long, LocalResource> dataverse = getResources(lr -> {
+                final ResourceReference resourceReference = 
ResourceReference.ofIndex(lr.getPath());
+                return datasetIdentifier.isMatch(resourceReference);
+            }, nodePartitions);
+            final List<DatasetResourceReference> allResources =
+                    
dataverse.values().stream().map(DatasetResourceReference::of).collect(Collectors.toList());
+            for (DatasetResourceReference res : allResources) {
+                final ResourceStorageStats resourceStats = 
getResourceStats(res);
+                if (resourceStats != null) {
+                    totalSize += resourceStats.getTotalSize();
+                }
             }
+            return totalSize;
+        } finally {
+            afterReadAccess();
         }
-        return totalSize;
     }
 
     private void createResourceFileMask(FileReference resourceFile) throws 
HyracksDataException {
@@ -644,13 +746,18 @@ public class PersistentLocalResourceRepository implements 
ILocalResourceReposito
         return COMPONENT_FILES_FILTER.accept(indexDir.getFile(), fileName);
     }
 
-    public synchronized List<FileReference> getOnDiskPartitions() {
-        List<FileReference> onDiskPartitions = new ArrayList<>();
-        for (FileReference root : storageRoots) {
-            onDiskPartitions.addAll(IoUtil.getMatchingChildren(root, (dir, 
name) -> dir != null && dir.isDirectory()
-                    && 
name.startsWith(StorageConstants.PARTITION_DIR_PREFIX)));
+    public List<FileReference> getOnDiskPartitions() {
+        beforeReadAccess();
+        try {
+            List<FileReference> onDiskPartitions = new ArrayList<>();
+            for (FileReference root : storageRoots) {
+                onDiskPartitions.addAll(IoUtil.getMatchingChildren(root, (dir, 
name) -> dir != null && dir.isDirectory()
+                        && 
name.startsWith(StorageConstants.PARTITION_DIR_PREFIX)));
+            }
+            return onDiskPartitions;
+        } finally {
+            afterReadAccess();
         }
-        return onDiskPartitions;
     }
 
     public FileReference getPartitionRoot(int partition) throws 
HyracksDataException {
@@ -660,16 +767,37 @@ public class PersistentLocalResourceRepository implements 
ILocalResourceReposito
     }
 
     public void deletePartition(int partitionId) throws HyracksDataException {
-        Collection<FileReference> onDiskPartitions = getOnDiskPartitions();
-        IIOBulkOperation bulkDelete = ioManager.createDeleteBulkOperation();
-        for (FileReference onDiskPartition : onDiskPartitions) {
-            int partitionNum = 
StoragePathUtil.getPartitionNumFromRelativePath(onDiskPartition.getAbsolutePath());
-            if (partitionNum == partitionId) {
-                LOGGER.warn("deleting partition {}", partitionNum);
-                bulkDelete.add(onDiskPartition);
-                break;
+        beforeReadAccess();
+        try {
+            Collection<FileReference> onDiskPartitions = getOnDiskPartitions();
+            IIOBulkOperation bulkDelete = 
ioManager.createDeleteBulkOperation();
+            for (FileReference onDiskPartition : onDiskPartitions) {
+                int partitionNum = 
StoragePathUtil.getPartitionNumFromRelativePath(onDiskPartition.getAbsolutePath());
+                if (partitionNum == partitionId) {
+                    LOGGER.warn("deleting partition {}", partitionNum);
+                    bulkDelete.add(onDiskPartition);
+                    break;
+                }
             }
+            ioManager.performBulkOperation(bulkDelete);
+        } finally {
+            afterReadAccess();
         }
-        ioManager.performBulkOperation(bulkDelete);
+    }
+
+    private void beforeWriteAccess() {
+        resourcesAccessLock.readLock().lock();
+    }
+
+    private void afterWriteAccess() {
+        resourcesAccessLock.readLock().unlock();
+    }
+
+    private void beforeReadAccess() {
+        resourcesAccessLock.writeLock().lock();
+    }
+
+    private void afterReadAccess() {
+        resourcesAccessLock.writeLock().unlock();
     }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/build/IndexBuilder.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/build/IndexBuilder.java
index 1c0d7b49d5..8a67c71d09 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/build/IndexBuilder.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/build/IndexBuilder.java
@@ -68,49 +68,47 @@ public class IndexBuilder implements IIndexBuilder {
     @Override
     public void build() throws HyracksDataException {
         IResourceLifecycleManager<IIndex> lcManager = 
storageManager.getLifecycleManager(ctx);
-        synchronized (lcManager) {
-            // The previous resource Id needs to be removed since calling 
IIndex.create() may possibly destroy any
-            // physical artifact that the LocalResourceRepository is managing 
(e.g. a file containing the resource Id).
-            // Once the index has been created, a new resource Id can be 
generated.
-            ILocalResourceRepository localResourceRepository = 
storageManager.getLocalResourceRepository(ctx);
-            LocalResource lr = localResourceRepository.get(resourceRelPath);
-            long resourceId = lr == null ? -1 : lr.getId();
-            if (resourceId != -1) {
-                localResourceRepository.delete(resourceRelPath);
-            }
-            resourceId = resourceIdFactory.createId();
-            IResource resource = 
localResourceFactory.createResource(resourceRef);
-            lr = new LocalResource(resourceId, 
ITreeIndexFrame.Constants.VERSION, durable, resource);
-            IIndex index = lcManager.get(resourceRelPath);
-            if (index != null) {
-                //how is this right?????????? <needs to be fixed>
-                //The reason for this is to handle many cases such as:
-                //1. Crash while delete index is running (we don't do global 
cleanup on restart)
-                //2. Node leaves and then join with old data
-                LOGGER.log(Level.WARN, "Removing existing index on index 
create for the index: " + resourceRelPath);
-                lcManager.unregister(resourceRelPath);
-                index.destroy();
-            } else {
-                final FileReference resolvedResourceRef = 
ctx.getIoManager().resolve(resourceRelPath);
-                if (resolvedResourceRef.getFile().exists()) {
-                    // Index is not registered but the index file exists
-                    // This is another big problem that we need to disallow 
soon
-                    // We can only disallow this if we have a global cleanup 
after crash
-                    // on reboot
-                    LOGGER.warn(
-                            "Deleting {} on index create. The index is not 
registered but the file exists in the filesystem",
-                            resolvedResourceRef);
-                    ctx.getIoManager().delete(resolvedResourceRef);
-                }
-                index = resource.createInstance(ctx);
-            }
-            index.create();
-            try {
-                localResourceRepository.insert(lr);
-            } catch (IOException e) {
-                throw HyracksDataException.create(e);
+        // The previous resource Id needs to be removed since calling 
IIndex.create() may possibly destroy any
+        // physical artifact that the LocalResourceRepository is managing 
(e.g. a file containing the resource Id).
+        // Once the index has been created, a new resource Id can be generated.
+        ILocalResourceRepository localResourceRepository = 
storageManager.getLocalResourceRepository(ctx);
+        LocalResource lr = localResourceRepository.get(resourceRelPath);
+        long resourceId = lr == null ? -1 : lr.getId();
+        if (resourceId != -1) {
+            localResourceRepository.delete(resourceRelPath);
+        }
+        resourceId = resourceIdFactory.createId();
+        IResource resource = localResourceFactory.createResource(resourceRef);
+        lr = new LocalResource(resourceId, ITreeIndexFrame.Constants.VERSION, 
durable, resource);
+        IIndex index = lcManager.get(resourceRelPath);
+        if (index != null) {
+            //how is this right?????????? <needs to be fixed>
+            //The reason for this is to handle many cases such as:
+            //1. Crash while delete index is running (we don't do global 
cleanup on restart)
+            //2. Node leaves and then join with old data
+            LOGGER.log(Level.WARN, "Removing existing index on index create 
for the index: " + resourceRelPath);
+            lcManager.unregister(resourceRelPath);
+            index.destroy();
+        } else {
+            final FileReference resolvedResourceRef = 
ctx.getIoManager().resolve(resourceRelPath);
+            if (resolvedResourceRef.getFile().exists()) {
+                // Index is not registered but the index file exists
+                // This is another big problem that we need to disallow soon
+                // We can only disallow this if we have a global cleanup after 
crash
+                // on reboot
+                LOGGER.warn(
+                        "Deleting {} on index create. The index is not 
registered but the file exists in the filesystem",
+                        resolvedResourceRef);
+                ctx.getIoManager().delete(resolvedResourceRef);
             }
-            lcManager.register(resourceRelPath, index);
+            index = resource.createInstance(ctx);
+        }
+        index.create();
+        try {
+            localResourceRepository.insert(lr);
+        } catch (IOException e) {
+            throw HyracksDataException.create(e);
         }
+        lcManager.register(resourceRelPath, index);
     }
 }


Reply via email to