This is an automated email from the ASF dual-hosted git repository.

mhubail pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git


The following commit(s) were added to refs/heads/master by this push:
     new d6adadc  [NO ISSUE][STO] Ensure resources file operations are 
synchronized
d6adadc is described below

commit d6adadc8fd4fb5c04f41097aca8a433528a560d6
Author: Murtadha Hubail <murtadha.hub...@couchbase.com>
AuthorDate: Sun Sep 12 01:35:58 2021 +0300

    [NO ISSUE][STO] Ensure resources file operations are synchronized
    
    - user model changes: no
    - storage format changes: no
    - interface changes: no
    
    Details:
    
    - To avoid an operation reading a partially written resource file,
      ensure all such operations are synchronized.
    - Limit partition resources search to the partition's root directory.
    
    Change-Id: I95f8565780675798393d7d43c0051c04e2b0a98c
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/13164
    Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
    Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
    Reviewed-by: Murtadha Hubail <mhub...@apache.org>
    Reviewed-by: Ali Alsuliman <ali.al.solai...@gmail.com>
---
 .../org/apache/asterix/app/nc/RecoveryManager.java |  5 +-
 .../replication/logging/RemoteLogsNotifier.java    |  4 +-
 .../replication/messaging/ReplicateFileTask.java   |  5 +-
 .../PersistentLocalResourceRepository.java         | 80 ++++++++++++++--------
 4 files changed, 60 insertions(+), 34 deletions(-)

diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
index 4f5a9f8..6736642 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
@@ -291,7 +291,7 @@ public class RecoveryManager implements IRecoveryManager, 
ILifeCycleComponent {
         final IIndexCheckpointManagerProvider indexCheckpointManagerProvider =
                 ((INcApplicationContext) 
(serviceCtx.getApplicationContext())).getIndexCheckpointManagerProvider();
 
-        Map<Long, LocalResource> resourcesMap = 
localResourceRepository.loadAndGetAllResources();
+        Map<Long, LocalResource> resourcesMap = 
localResourceRepository.getResources(r -> true, partitions);
         final Map<Long, Long> resourceId2MaxLSNMap = new HashMap<>();
         TxnEntityId tempKeyTxnEntityId = new TxnEntityId(-1, -1, -1, null, -1, 
false);
 
@@ -503,7 +503,8 @@ public class RecoveryManager implements IRecoveryManager, 
ILifeCycleComponent {
             final List<DatasetResourceReference> partitionResources = 
localResourceRepository.getResources(resource -> {
                 DatasetLocalResource dsResource = (DatasetLocalResource) 
resource.getResource();
                 return dsResource.getPartition() == partition;
-            
}).values().stream().map(DatasetResourceReference::of).collect(Collectors.toList());
+            }, 
Collections.singleton(partition)).values().stream().map(DatasetResourceReference::of)
+                    .collect(Collectors.toList());
             for (DatasetResourceReference indexRef : partitionResources) {
                 try {
                     final IIndexCheckpointManager idxCheckpointMgr = 
idxCheckpointMgrProvider.get(indexRef);
diff --git 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsNotifier.java
 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsNotifier.java
index 57463cb..004b640 100644
--- 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsNotifier.java
+++ 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsNotifier.java
@@ -19,6 +19,7 @@
 package org.apache.asterix.replication.logging;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -91,7 +92,8 @@ class RemoteLogsNotifier implements Runnable {
             return dls.getDatasetId() == datasetId && dls.getPartition() == 
resourcePartition
                     && !masterPartitions.contains(dls.getPartition());
         };
-        final Map<Long, LocalResource> resources = 
localResourceRep.getResources(replicaIndexesPredicate);
+        final Map<Long, LocalResource> resources =
+                localResourceRep.getResources(replicaIndexesPredicate, 
Collections.singleton(resourcePartition));
         final List<DatasetResourceReference> replicaIndexesRef =
                 
resources.values().stream().map(DatasetResourceReference::of).collect(Collectors.toList());
         for (DatasetResourceReference replicaIndexRef : replicaIndexesRef) {
diff --git 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java
 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java
index 3ee3094..7f26b96 100644
--- 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java
+++ 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java
@@ -67,7 +67,7 @@ public class ReplicateFileTask implements IReplicaTask {
     @Override
     public void perform(INcApplicationContext appCtx, IReplicationWorker 
worker) {
         try {
-            LOGGER.info("attempting to replicate {}", this);
+            LOGGER.debug("attempting to receive file {} from master", this);
             final IIOManager ioManager = appCtx.getIoManager();
             // resolve path
             final FileReference localPath = ioManager.resolve(file);
@@ -76,7 +76,6 @@ public class ReplicateFileTask implements IReplicaTask {
             final Path maskPath = Paths.get(resourceDir.toString(),
                     StorageConstants.MASK_FILE_PREFIX + 
localPath.getFile().getName());
             Files.createFile(maskPath);
-
             // receive actual file
             final Path filePath = Paths.get(resourceDir.toString(), 
localPath.getFile().getName());
             Files.createFile(filePath);
@@ -91,7 +90,7 @@ public class ReplicateFileTask implements IReplicaTask {
             }
             //delete mask
             Files.delete(maskPath);
-            LOGGER.info(() -> "Replicated file: " + localPath);
+            LOGGER.info("received file {} from master", localPath);
             ReplicationProtocol.sendAck(worker.getChannel(), 
worker.getReusableBuffer());
         } catch (IOException e) {
             throw new ReplicationException(e);
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index 4d15385..ee5b16e 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -35,6 +35,7 @@ import java.text.ParseException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -138,7 +139,7 @@ public class PersistentLocalResourceRepository implements 
ILocalResourceReposito
     private boolean isReplicationEnabled = false;
     private Set<String> filesToBeReplicated;
     private IReplicationManager replicationManager;
-    private final Path[] storageRoots;
+    private final List<Path> storageRoots;
     private final IIndexCheckpointManagerProvider 
indexCheckpointManagerProvider;
     private final IPersistedResourceRegistry persistedResourceRegistry;
 
@@ -148,11 +149,11 @@ public class PersistentLocalResourceRepository implements 
ILocalResourceReposito
         this.ioManager = ioManager;
         this.indexCheckpointManagerProvider = indexCheckpointManagerProvider;
         this.persistedResourceRegistry = persistedResourceRegistry;
-        storageRoots = new Path[ioManager.getIODevices().size()];
+        storageRoots = new ArrayList<>();
         final List<IODeviceHandle> ioDevices = ioManager.getIODevices();
         for (int i = 0; i < ioDevices.size(); i++) {
-            storageRoots[i] =
-                    Paths.get(ioDevices.get(i).getMount().getAbsolutePath(), 
StorageConstants.STORAGE_ROOT_DIR_NAME);
+            storageRoots.add(
+                    Paths.get(ioDevices.get(i).getMount().getAbsolutePath(), 
StorageConstants.STORAGE_ROOT_DIR_NAME));
         }
         createStorageRoots();
         resourceCache = 
CacheBuilder.newBuilder().maximumSize(MAX_CACHED_RESOURCES).build();
@@ -262,10 +263,13 @@ public class PersistentLocalResourceRepository implements 
ILocalResourceReposito
         return ioManager.resolve(fileName);
     }
 
-    public synchronized Map<Long, LocalResource> 
getResources(Predicate<LocalResource> filter)
+    public synchronized Map<Long, LocalResource> 
getResources(Predicate<LocalResource> filter, List<Path> roots)
             throws HyracksDataException {
         Map<Long, LocalResource> resourcesMap = new HashMap<>();
-        for (Path root : storageRoots) {
+        for (Path root : roots) {
+            if (!Files.exists(root) || !Files.isDirectory(root)) {
+                continue;
+            }
             final Collection<File> files = FileUtils.listFiles(root.toFile(), 
METADATA_FILES_FILTER, ALL_DIR_FILTER);
             try {
                 for (File file : files) {
@@ -281,6 +285,20 @@ public class PersistentLocalResourceRepository implements 
ILocalResourceReposito
         return resourcesMap;
     }
 
+    public synchronized Map<Long, LocalResource> 
getResources(Predicate<LocalResource> filter)
+            throws HyracksDataException {
+        return getResources(filter, storageRoots);
+    }
+
+    public synchronized Map<Long, LocalResource> 
getResources(Predicate<LocalResource> filter, Set<Integer> partitions)
+            throws HyracksDataException {
+        List<Path> partitionsRoots = new ArrayList<>();
+        for (Integer partition : partitions) {
+            partitionsRoots.add(getPartitionRoot(partition));
+        }
+        return getResources(filter, partitionsRoots);
+    }
+
     public synchronized void deleteInvalidIndexes(Predicate<LocalResource> 
filter) throws HyracksDataException {
         for (Path root : storageRoots) {
             final Collection<File> files = FileUtils.listFiles(root.toFile(), 
METADATA_FILES_FILTER, ALL_DIR_FILTER);
@@ -304,7 +322,7 @@ public class PersistentLocalResourceRepository implements 
ILocalResourceReposito
     }
 
     @Override
-    public long maxId() throws HyracksDataException {
+    public synchronized long maxId() throws HyracksDataException {
         final Map<Long, LocalResource> allResources = loadAndGetAllResources();
         final Optional<Long> max = 
allResources.keySet().stream().max(Long::compare);
         return max.isPresent() ? max.get() : 0;
@@ -330,7 +348,7 @@ public class PersistentLocalResourceRepository implements 
ILocalResourceReposito
         }
     }
 
-    public void setReplicationManager(IReplicationManager replicationManager) {
+    public synchronized void setReplicationManager(IReplicationManager 
replicationManager) {
         this.replicationManager = replicationManager;
         isReplicationEnabled = replicationManager.isReplicationEnabled();
 
@@ -357,7 +375,7 @@ public class PersistentLocalResourceRepository implements 
ILocalResourceReposito
      *
      * @throws IOException
      */
-    public void deleteStorageData() throws IOException {
+    public synchronized void deleteStorageData() throws IOException {
         for (Path root : storageRoots) {
             final File rootFile = root.toFile();
             if (rootFile.exists()) {
@@ -367,13 +385,13 @@ public class PersistentLocalResourceRepository implements 
ILocalResourceReposito
         createStorageRoots();
     }
 
-    public Set<Integer> getAllPartitions() throws HyracksDataException {
+    public synchronized Set<Integer> getAllPartitions() throws 
HyracksDataException {
         return 
loadAndGetAllResources().values().stream().map(LocalResource::getResource)
                 
.map(DatasetLocalResource.class::cast).map(DatasetLocalResource::getPartition)
                 .collect(Collectors.toSet());
     }
 
-    public Optional<DatasetResourceReference> getLocalResourceReference(String 
absoluteFilePath)
+    public synchronized Optional<DatasetResourceReference> 
getLocalResourceReference(String absoluteFilePath)
             throws HyracksDataException {
         final String localResourcePath = 
StoragePathUtil.getIndexFileRelativePath(absoluteFilePath);
         final LocalResource lr = get(localResourcePath);
@@ -388,11 +406,12 @@ public class PersistentLocalResourceRepository implements 
ILocalResourceReposito
      * @return The set of indexes files
      * @throws HyracksDataException
      */
-    public Set<File> getPartitionIndexes(int partition) throws 
HyracksDataException {
+    public synchronized Set<File> getPartitionIndexes(int partition) throws 
HyracksDataException {
+        Path partitionRoot = getPartitionRoot(partition);
         final Map<Long, LocalResource> partitionResourcesMap = 
getResources(resource -> {
             DatasetLocalResource dsResource = (DatasetLocalResource) 
resource.getResource();
             return dsResource.getPartition() == partition;
-        });
+        }, Collections.singletonList(partitionRoot));
         Set<File> indexes = new HashSet<>();
         for (LocalResource localResource : partitionResourcesMap.values()) {
             indexes.add(ioManager.resolve(localResource.getPath()).getFile());
@@ -400,14 +419,11 @@ public class PersistentLocalResourceRepository implements 
ILocalResourceReposito
         return indexes;
     }
 
-    public Map<Long, LocalResource> getPartitionResources(int partition) 
throws HyracksDataException {
-        return getResources(resource -> {
-            DatasetLocalResource dsResource = (DatasetLocalResource) 
resource.getResource();
-            return dsResource.getPartition() == partition;
-        });
+    public synchronized Map<Long, LocalResource> getPartitionResources(int 
partition) throws HyracksDataException {
+        return getResources(r -> true, Collections.singleton(partition));
     }
 
-    public Map<String, Long> getPartitionReplicatedResources(int partition, 
IReplicationStrategy strategy)
+    public synchronized Map<String, Long> getPartitionReplicatedResources(int 
partition, IReplicationStrategy strategy)
             throws HyracksDataException {
         final Map<String, Long> partitionReplicatedResources = new HashMap<>();
         final Map<Long, LocalResource> partitionResources = 
getPartitionResources(partition);
@@ -421,7 +437,7 @@ public class PersistentLocalResourceRepository implements 
ILocalResourceReposito
         return partitionReplicatedResources;
     }
 
-    public List<String> getPartitionReplicatedFiles(int partition, 
IReplicationStrategy strategy)
+    public synchronized List<String> getPartitionReplicatedFiles(int 
partition, IReplicationStrategy strategy)
             throws HyracksDataException {
         final List<String> partitionReplicatedFiles = new ArrayList<>();
         final Set<File> replicatedIndexes = new HashSet<>();
@@ -438,7 +454,7 @@ public class PersistentLocalResourceRepository implements 
ILocalResourceReposito
         return partitionReplicatedFiles;
     }
 
-    public long getReplicatedIndexesMaxComponentId(int partition, 
IReplicationStrategy strategy)
+    public synchronized long getReplicatedIndexesMaxComponentId(int partition, 
IReplicationStrategy strategy)
             throws HyracksDataException {
         long maxComponentId = LSMComponentId.MIN_VALID_COMPONENT_ID;
         final Map<Long, LocalResource> partitionResources = 
getPartitionResources(partition);
@@ -474,7 +490,7 @@ public class PersistentLocalResourceRepository implements 
ILocalResourceReposito
         }
     }
 
-    public void cleanup(int partition) throws HyracksDataException {
+    public synchronized void cleanup(int partition) throws 
HyracksDataException {
         final Set<File> partitionIndexes = getPartitionIndexes(partition);
         try {
             for (File index : partitionIndexes) {
@@ -501,7 +517,7 @@ public class PersistentLocalResourceRepository implements 
ILocalResourceReposito
         return resourcesStats;
     }
 
-    public void deleteCorruptedResources() throws HyracksDataException {
+    public synchronized void deleteCorruptedResources() throws 
HyracksDataException {
         for (Path root : storageRoots) {
             final Collection<File> metadataMaskFiles =
                     FileUtils.listFiles(root.toFile(), 
METADATA_MASK_FILES_FILTER, ALL_DIR_FILTER);
@@ -601,12 +617,13 @@ public class PersistentLocalResourceRepository implements 
ILocalResourceReposito
         return null;
     }
 
-    public long getDatasetSize(DatasetCopyIdentifier datasetIdentifier) throws 
HyracksDataException {
+    public long getDatasetSize(DatasetCopyIdentifier datasetIdentifier, 
Set<Integer> nodePartitions)
+            throws HyracksDataException {
         long totalSize = 0;
         final Map<Long, LocalResource> dataverse = getResources(lr -> {
             final ResourceReference resourceReference = 
ResourceReference.ofIndex(lr.getPath());
             return datasetIdentifier.isMatch(resourceReference);
-        });
+        }, nodePartitions);
         final List<DatasetResourceReference> allResources =
                 
dataverse.values().stream().map(DatasetResourceReference::of).collect(Collectors.toList());
         for (DatasetResourceReference res : allResources) {
@@ -644,11 +661,11 @@ public class PersistentLocalResourceRepository implements 
ILocalResourceReposito
         return COMPONENT_FILES_FILTER.accept(indexDir, fileName);
     }
 
-    public Path[] getStorageRoots() {
+    public List<Path> getStorageRoots() {
         return storageRoots;
     }
 
-    public void keepPartitions(Set<Integer> keepPartitions) {
+    public synchronized void keepPartitions(Set<Integer> keepPartitions) {
         List<File> onDiskPartitions = getOnDiskPartitions();
         for (File onDiskPartition : onDiskPartitions) {
             int partitionNum = 
StoragePathUtil.getPartitionNumFromRelativePath(onDiskPartition.getAbsolutePath());
@@ -660,7 +677,7 @@ public class PersistentLocalResourceRepository implements 
ILocalResourceReposito
         }
     }
 
-    public List<File> getOnDiskPartitions() {
+    public synchronized List<File> getOnDiskPartitions() {
         List<File> onDiskPartitions = new ArrayList<>();
         for (Path root : storageRoots) {
             File[] partitions = root.toFile().listFiles(
@@ -671,4 +688,11 @@ public class PersistentLocalResourceRepository implements 
ILocalResourceReposito
         }
         return onDiskPartitions;
     }
+
+    public Path getPartitionRoot(int partition) throws HyracksDataException {
+        Path path =
+                Paths.get(StorageConstants.STORAGE_ROOT_DIR_NAME, 
StorageConstants.PARTITION_DIR_PREFIX + partition);
+        FileReference resolve = ioManager.resolve(path.toString());
+        return resolve.getFile().toPath();
+    }
 }

Reply via email to