This is an automated email from the ASF dual-hosted git repository.

mhubail pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git


The following commit(s) were added to refs/heads/master by this push:
     new ae3c273a54 [ASTERIXDB-3196][*DB] Cluster state for compute-storage 
separation
ae3c273a54 is described below

commit ae3c273a549a92501f47ccf8713cb2aba1beee58
Author: Murtadha Hubail <mhub...@apache.org>
AuthorDate: Wed May 24 15:17:30 2023 +0300

    [ASTERIXDB-3196][*DB] Cluster state for compute-storage separation
    
    - user model changes: no
    - storage format changes: no
    - interface changes: yes
    
    Details:
    
    - Implement changes required to drive cluster state based on
      compute-storage partitions map.
    - Persist index checkpoints to cloud storage.
    - Remove eager caching from NC startup tasks.
    - Fixes for static data partitioning.
    
    Change-Id: I217da04d06884d841c4a56aee3ab9815cc659de7
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17553
    Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
    Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
    Reviewed-by: Murtadha Hubail <mhub...@apache.org>
    Reviewed-by: Ali Alsuliman <ali.al.solai...@gmail.com>
---
 .../apache/asterix/app/nc/NCAppRuntimeContext.java |  3 +-
 .../app/replication/NcLifecycleCoordinator.java    |  7 ---
 .../common/cluster/IClusterStateManager.java       |  6 +++
 .../cluster/StorageComputePartitionsMap.java       | 11 +++-
 .../metadata/declared/MetadataProvider.java        |  9 +---
 .../asterix/runtime/utils/ClusterStateManager.java | 62 +++++++++++++++++-----
 .../PersistentLocalResourceRepository.java         |  4 +-
 7 files changed, 67 insertions(+), 35 deletions(-)

diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index f60ed63872..7f0b529480 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -204,8 +204,7 @@ public class NCAppRuntimeContext implements 
INcApplicationContext {
                 storageProperties.getBufferCachePageSize(), 
storageProperties.getBufferCacheNumPages());
         lsmIOScheduler = createIoScheduler(storageProperties);
         metadataMergePolicyFactory = new ConcurrentMergePolicyFactory();
-        // TODO do we want to write checkpoints for cloud?
-        indexCheckpointManagerProvider = new 
IndexCheckpointManagerProvider(ioManager);
+        indexCheckpointManagerProvider = new 
IndexCheckpointManagerProvider(persistenceIOManager);
         ILocalResourceRepositoryFactory 
persistentLocalResourceRepositoryFactory =
                 new 
PersistentLocalResourceRepositoryFactory(persistenceIOManager, 
indexCheckpointManagerProvider,
                         persistedResourceRegistry);
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
index 4f9613d2a7..449dd27845 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
@@ -35,7 +35,6 @@ import java.util.Set;
 
 import org.apache.asterix.app.nc.task.BindMetadataNodeTask;
 import org.apache.asterix.app.nc.task.CheckpointTask;
-import org.apache.asterix.app.nc.task.CloudToLocalStorageCachingTask;
 import org.apache.asterix.app.nc.task.ExportMetadataNodeTask;
 import org.apache.asterix.app.nc.task.LocalRecoveryTask;
 import org.apache.asterix.app.nc.task.LocalStorageCleanupTask;
@@ -52,7 +51,6 @@ import 
org.apache.asterix.app.replication.message.RegistrationTasksResponseMessa
 import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
 import org.apache.asterix.common.api.INCLifecycleTask;
 import org.apache.asterix.common.cluster.IClusterStateManager;
-import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.exceptions.RuntimeDataException;
 import org.apache.asterix.common.messaging.api.ICCMessageBroker;
@@ -222,11 +220,6 @@ public class NcLifecycleCoordinator implements 
INcLifecycleCoordinator {
         tasks.add(new UpdateNodeStatusTask(NodeStatus.BOOTING, 
nodeActivePartitions));
         int metadataPartitionId = 
clusterManager.getMetadataPartition().getPartitionId();
         tasks.add(new LocalStorageCleanupTask(metadataPartitionId));
-
-        if (((ICcApplicationContext) 
(serviceContext.getControllerService()).getApplicationContext())
-                .isCloudDeployment()) {
-            tasks.add(new CloudToLocalStorageCachingTask(activePartitions));
-        }
         if (state == SystemState.CORRUPTED) {
             // need to perform local recovery for node active partitions
             LocalRecoveryTask rt = new LocalRecoveryTask(nodeActivePartitions);
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
index 3d0fee8e22..5f714f436d 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
@@ -286,4 +286,10 @@ public interface IClusterStateManager {
      * @return the count of storage partitions
      */
     int getStoragePartitionsCount();
+
+    /**
+     * Sets the compute-storage partitions map
+     * @param map
+     */
+    void setComputeStoragePartitionsMap(StorageComputePartitionsMap map);
 }
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/StorageComputePartitionsMap.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/StorageComputePartitionsMap.java
index 48b2ea1a93..6561d05b78 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/StorageComputePartitionsMap.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/StorageComputePartitionsMap.java
@@ -23,6 +23,8 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
 
 public class StorageComputePartitionsMap {
 
@@ -53,9 +55,10 @@ public class StorageComputePartitionsMap {
             }
         }
         int[][] computerToStoArray = new 
int[computeToStoragePartitions.size()][];
+        int partitionIdx = 0;
         for (Map.Entry<Integer, List<Integer>> integerListEntry : 
computeToStoragePartitions.entrySet()) {
-            computerToStoArray[integerListEntry.getKey()] =
-                    integerListEntry.getValue().stream().mapToInt(i -> 
i).toArray();
+            computerToStoArray[partitionIdx] = 
integerListEntry.getValue().stream().mapToInt(i -> i).toArray();
+            partitionIdx++;
         }
         return computerToStoArray;
     }
@@ -94,4 +97,8 @@ public class StorageComputePartitionsMap {
         }
         return newMap;
     }
+
+    public Set<String> getComputeNodes() {
+        return 
stoToComputeLocation.values().stream().map(ComputePartition::getNodeId).collect(Collectors.toSet());
+    }
 }
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index b07a03eddc..ea1b9e675e 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -32,7 +32,6 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.stream.Collectors;
 
-import org.apache.asterix.common.cluster.IClusterStateManager;
 import org.apache.asterix.common.cluster.PartitioningProperties;
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
 import org.apache.asterix.common.config.DatasetConfig.IndexType;
@@ -891,13 +890,7 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
         } else {
             numElementsHint = Long.parseLong(numElementsHintString);
         }
-        int numPartitions = 0;
-        List<String> nodeGroup =
-                MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, 
dataset.getNodeGroupName()).getNodeNames();
-        IClusterStateManager csm = appCtx.getClusterStateManager();
-        for (String nd : nodeGroup) {
-            numPartitions += csm.getNodePartitionsCount(nd);
-        }
+        int numPartitions = 
getPartitioningProperties(dataset).getNumberOfPartitions();
         return numElementsHint / numPartitions;
     }
 
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
index 92ea173fb4..0128478647 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
@@ -30,6 +30,7 @@ import java.util.SortedMap;
 import java.util.TreeSet;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Predicate;
+import java.util.stream.Collectors;
 
 import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
 import org.apache.asterix.common.cluster.ClusterPartition;
@@ -155,7 +156,16 @@ public class ClusterStateManager implements 
IClusterStateManager {
         if (active) {
             updateClusterCounters(nodeId, localCounters);
             participantNodes.add(nodeId);
-            activateNodePartitions(nodeId, activePartitions);
+            if (appCtx.isCloudDeployment()) {
+                // node compute partitions never change
+                ClusterPartition[] nodePartitions = getNodePartitions(nodeId);
+                activePartitions =
+                        
Arrays.stream(nodePartitions).map(ClusterPartition::getPartitionId).collect(Collectors.toSet());
+                activateNodePartitions(nodeId, activePartitions);
+            } else {
+                activateNodePartitions(nodeId, activePartitions);
+            }
+
         } else {
             participantNodes.remove(nodeId);
             deactivateNodePartitions(nodeId);
@@ -183,16 +193,7 @@ public class ClusterStateManager implements 
IClusterStateManager {
             return;
         }
         resetClusterPartitionConstraint();
-        // if the cluster has no registered partitions or all partitions are 
pending activation -> UNUSABLE
-        if (clusterPartitions.isEmpty()
-                || 
clusterPartitions.values().stream().allMatch(ClusterPartition::isPendingActivation))
 {
-            LOGGER.info("Cluster does not have any registered partitions");
-            setState(ClusterState.UNUSABLE);
-            return;
-        }
-
-        // exclude partitions that are pending activation
-        if (clusterPartitions.values().stream().anyMatch(p -> !p.isActive() && 
!p.isPendingActivation())) {
+        if (isClusterUnusable()) {
             setState(ClusterState.UNUSABLE);
             return;
         }
@@ -310,9 +311,7 @@ public class ClusterStateManager implements 
IClusterStateManager {
         clusterActiveLocations.removeAll(pendingRemoval);
         clusterPartitionConstraint =
                 new 
AlgebricksAbsolutePartitionConstraint(clusterActiveLocations.toArray(new 
String[] {}));
-        if (appCtx.getStorageProperties().getPartitioningScheme() == 
PartitioningScheme.STATIC) {
-            storageComputePartitionsMap = 
StorageComputePartitionsMap.computePartitionsMap(this);
-        }
+        resetStorageComputeMap();
     }
 
     @Override
@@ -512,6 +511,11 @@ public class ClusterStateManager implements 
IClusterStateManager {
         return storageComputePartitionsMap;
     }
 
+    @Override
+    public synchronized void 
setComputeStoragePartitionsMap(StorageComputePartitionsMap map) {
+        this.storageComputePartitionsMap = map;
+    }
+
     private void updateClusterCounters(String nodeId, NcLocalCounters 
localCounters) {
         final IResourceIdManager resourceIdManager = 
appCtx.getResourceIdManager();
         resourceIdManager.report(nodeId, localCounters.getMaxResourceId());
@@ -543,6 +547,36 @@ public class ClusterStateManager implements 
IClusterStateManager {
                         false));
     }
 
+    private synchronized boolean isClusterUnusable() {
+        // if the cluster has no registered partitions or all partitions are 
pending activation -> UNUSABLE
+        if (clusterPartitions.isEmpty()
+                || 
clusterPartitions.values().stream().allMatch(ClusterPartition::isPendingActivation))
 {
+            LOGGER.info("Cluster does not have any registered partitions");
+            return true;
+        }
+        if (appCtx.isCloudDeployment() && storageComputePartitionsMap != null) 
{
+            Set<String> computeNodes = 
storageComputePartitionsMap.getComputeNodes();
+            if (!participantNodes.containsAll(computeNodes)) {
+                LOGGER.info("Cluster missing compute nodes; required {}, 
current {}", computeNodes, participantNodes);
+                return true;
+            }
+        } else {
+            // exclude partitions that are pending activation
+            if (clusterPartitions.values().stream().anyMatch(p -> 
!p.isActive() && !p.isPendingActivation())) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    private synchronized void resetStorageComputeMap() {
+        if (storageComputePartitionsMap == null
+                && appCtx.getStorageProperties().getPartitioningScheme() == 
PartitioningScheme.STATIC
+                && !isClusterUnusable()) {
+            storageComputePartitionsMap = 
StorageComputePartitionsMap.computePartitionsMap(this);
+        }
+    }
+
     private static InetSocketAddress getReplicaLocation(IClusterStateManager 
csm, String nodeId) {
         final Map<IOption, Object> ncConfig = 
csm.getActiveNcConfiguration().get(nodeId);
         if (ncConfig == null) {
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index 080204eb0a..25b961017c 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -646,8 +646,8 @@ public class PersistentLocalResourceRepository implements 
ILocalResourceReposito
     public synchronized List<FileReference> getOnDiskPartitions() throws 
HyracksDataException {
         List<FileReference> onDiskPartitions = new ArrayList<>();
         for (FileReference root : storageRoots) {
-            Collection<FileReference> partitions = ioManager.list(root,
-                    (dir, name) -> dir.isDirectory() && 
name.startsWith(StorageConstants.PARTITION_DIR_PREFIX));
+            Collection<FileReference> partitions = ioManager.list(root, (dir, 
name) -> dir != null && dir.isDirectory()
+                    && name.startsWith(StorageConstants.PARTITION_DIR_PREFIX));
             if (partitions != null) {
                 onDiskPartitions.addAll(partitions);
             }

Reply via email to