This is an automated email from the ASF dual-hosted git repository. mhubail pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
The following commit(s) were added to refs/heads/master by this push: new ae3c273a54 [ASTERIXDB-3196][*DB] Cluster state for compute-storage separation ae3c273a54 is described below commit ae3c273a549a92501f47ccf8713cb2aba1beee58 Author: Murtadha Hubail <mhub...@apache.org> AuthorDate: Wed May 24 15:17:30 2023 +0300 [ASTERIXDB-3196][*DB] Cluster state for compute-storage separation - user model changes: no - storage format changes: no - interface changes: yes Details: - Implement changes required to drive cluster state based on compute-storage partitions map. - Persist index checkpoints to cloud storage. - Remove eager caching from NC startup tasks. - Fixes for static data partitioning. Change-Id: I217da04d06884d841c4a56aee3ab9815cc659de7 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17553 Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Reviewed-by: Murtadha Hubail <mhub...@apache.org> Reviewed-by: Ali Alsuliman <ali.al.solai...@gmail.com> --- .../apache/asterix/app/nc/NCAppRuntimeContext.java | 3 +- .../app/replication/NcLifecycleCoordinator.java | 7 --- .../common/cluster/IClusterStateManager.java | 6 +++ .../cluster/StorageComputePartitionsMap.java | 11 +++- .../metadata/declared/MetadataProvider.java | 9 +--- .../asterix/runtime/utils/ClusterStateManager.java | 62 +++++++++++++++++----- .../PersistentLocalResourceRepository.java | 4 +- 7 files changed, 67 insertions(+), 35 deletions(-) diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java index f60ed63872..7f0b529480 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java @@ -204,8 +204,7 @@ public class NCAppRuntimeContext implements INcApplicationContext { storageProperties.getBufferCachePageSize(), storageProperties.getBufferCacheNumPages()); lsmIOScheduler = createIoScheduler(storageProperties); metadataMergePolicyFactory = new ConcurrentMergePolicyFactory(); - // TODO do we want to write checkpoints for cloud? - indexCheckpointManagerProvider = new IndexCheckpointManagerProvider(ioManager); + indexCheckpointManagerProvider = new IndexCheckpointManagerProvider(persistenceIOManager); ILocalResourceRepositoryFactory persistentLocalResourceRepositoryFactory = new PersistentLocalResourceRepositoryFactory(persistenceIOManager, indexCheckpointManagerProvider, persistedResourceRegistry); diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java index 4f9613d2a7..449dd27845 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java @@ -35,7 +35,6 @@ import java.util.Set; import org.apache.asterix.app.nc.task.BindMetadataNodeTask; import org.apache.asterix.app.nc.task.CheckpointTask; -import org.apache.asterix.app.nc.task.CloudToLocalStorageCachingTask; import org.apache.asterix.app.nc.task.ExportMetadataNodeTask; import org.apache.asterix.app.nc.task.LocalRecoveryTask; import org.apache.asterix.app.nc.task.LocalStorageCleanupTask; @@ -52,7 +51,6 @@ import org.apache.asterix.app.replication.message.RegistrationTasksResponseMessa import org.apache.asterix.common.api.IClusterManagementWork.ClusterState; import org.apache.asterix.common.api.INCLifecycleTask; import org.apache.asterix.common.cluster.IClusterStateManager; -import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.common.exceptions.ErrorCode; import org.apache.asterix.common.exceptions.RuntimeDataException; import org.apache.asterix.common.messaging.api.ICCMessageBroker; @@ -222,11 +220,6 @@ public class NcLifecycleCoordinator implements INcLifecycleCoordinator { tasks.add(new UpdateNodeStatusTask(NodeStatus.BOOTING, nodeActivePartitions)); int metadataPartitionId = clusterManager.getMetadataPartition().getPartitionId(); tasks.add(new LocalStorageCleanupTask(metadataPartitionId)); - - if (((ICcApplicationContext) (serviceContext.getControllerService()).getApplicationContext()) - .isCloudDeployment()) { - tasks.add(new CloudToLocalStorageCachingTask(activePartitions)); - } if (state == SystemState.CORRUPTED) { // need to perform local recovery for node active partitions LocalRecoveryTask rt = new LocalRecoveryTask(nodeActivePartitions); diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java index 3d0fee8e22..5f714f436d 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java @@ -286,4 +286,10 @@ public interface IClusterStateManager { * @return the count of storage partitions */ int getStoragePartitionsCount(); + + /** + * Sets the compute-storage partitions map + * @param map + */ + void setComputeStoragePartitionsMap(StorageComputePartitionsMap map); } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/StorageComputePartitionsMap.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/StorageComputePartitionsMap.java index 48b2ea1a93..6561d05b78 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/StorageComputePartitionsMap.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/StorageComputePartitionsMap.java @@ -23,6 +23,8 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; public class StorageComputePartitionsMap { @@ -53,9 +55,10 @@ public class StorageComputePartitionsMap { } } int[][] computerToStoArray = new int[computeToStoragePartitions.size()][]; + int partitionIdx = 0; for (Map.Entry<Integer, List<Integer>> integerListEntry : computeToStoragePartitions.entrySet()) { - computerToStoArray[integerListEntry.getKey()] = - integerListEntry.getValue().stream().mapToInt(i -> i).toArray(); + computerToStoArray[partitionIdx] = integerListEntry.getValue().stream().mapToInt(i -> i).toArray(); + partitionIdx++; } return computerToStoArray; } @@ -94,4 +97,8 @@ public class StorageComputePartitionsMap { } return newMap; } + + public Set<String> getComputeNodes() { + return stoToComputeLocation.values().stream().map(ComputePartition::getNodeId).collect(Collectors.toSet()); + } } diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java index b07a03eddc..ea1b9e675e 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java @@ -32,7 +32,6 @@ import java.util.Map; import java.util.Optional; import java.util.stream.Collectors; -import org.apache.asterix.common.cluster.IClusterStateManager; import org.apache.asterix.common.cluster.PartitioningProperties; import org.apache.asterix.common.config.DatasetConfig.DatasetType; import org.apache.asterix.common.config.DatasetConfig.IndexType; @@ -891,13 +890,7 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> } else { numElementsHint = Long.parseLong(numElementsHintString); } - int numPartitions = 0; - List<String> nodeGroup = - MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, dataset.getNodeGroupName()).getNodeNames(); - IClusterStateManager csm = appCtx.getClusterStateManager(); - for (String nd : nodeGroup) { - numPartitions += csm.getNodePartitionsCount(nd); - } + int numPartitions = getPartitioningProperties(dataset).getNumberOfPartitions(); return numElementsHint / numPartitions; } diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java index 92ea173fb4..0128478647 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java @@ -30,6 +30,7 @@ import java.util.SortedMap; import java.util.TreeSet; import java.util.concurrent.TimeUnit; import java.util.function.Predicate; +import java.util.stream.Collectors; import org.apache.asterix.common.api.IClusterManagementWork.ClusterState; import org.apache.asterix.common.cluster.ClusterPartition; @@ -155,7 +156,16 @@ public class ClusterStateManager implements IClusterStateManager { if (active) { updateClusterCounters(nodeId, localCounters); participantNodes.add(nodeId); - activateNodePartitions(nodeId, activePartitions); + if (appCtx.isCloudDeployment()) { + // node compute partitions never change + ClusterPartition[] nodePartitions = getNodePartitions(nodeId); + activePartitions = + Arrays.stream(nodePartitions).map(ClusterPartition::getPartitionId).collect(Collectors.toSet()); + activateNodePartitions(nodeId, activePartitions); + } else { + activateNodePartitions(nodeId, activePartitions); + } + } else { participantNodes.remove(nodeId); deactivateNodePartitions(nodeId); @@ -183,16 +193,7 @@ public class ClusterStateManager implements IClusterStateManager { return; } resetClusterPartitionConstraint(); - // if the cluster has no registered partitions or all partitions are pending activation -> UNUSABLE - if (clusterPartitions.isEmpty() - || clusterPartitions.values().stream().allMatch(ClusterPartition::isPendingActivation)) { - LOGGER.info("Cluster does not have any registered partitions"); - setState(ClusterState.UNUSABLE); - return; - } - - // exclude partitions that are pending activation - if (clusterPartitions.values().stream().anyMatch(p -> !p.isActive() && !p.isPendingActivation())) { + if (isClusterUnusable()) { setState(ClusterState.UNUSABLE); return; } @@ -310,9 +311,7 @@ public class ClusterStateManager implements IClusterStateManager { clusterActiveLocations.removeAll(pendingRemoval); clusterPartitionConstraint = new AlgebricksAbsolutePartitionConstraint(clusterActiveLocations.toArray(new String[] {})); - if (appCtx.getStorageProperties().getPartitioningScheme() == PartitioningScheme.STATIC) { - storageComputePartitionsMap = StorageComputePartitionsMap.computePartitionsMap(this); - } + resetStorageComputeMap(); } @Override @@ -512,6 +511,11 @@ public class ClusterStateManager implements IClusterStateManager { return storageComputePartitionsMap; } + @Override + public synchronized void setComputeStoragePartitionsMap(StorageComputePartitionsMap map) { + this.storageComputePartitionsMap = map; + } + private void updateClusterCounters(String nodeId, NcLocalCounters localCounters) { final IResourceIdManager resourceIdManager = appCtx.getResourceIdManager(); resourceIdManager.report(nodeId, localCounters.getMaxResourceId()); @@ -543,6 +547,36 @@ public class ClusterStateManager implements IClusterStateManager { false)); } + private synchronized boolean isClusterUnusable() { + // if the cluster has no registered partitions or all partitions are pending activation -> UNUSABLE + if (clusterPartitions.isEmpty() + || clusterPartitions.values().stream().allMatch(ClusterPartition::isPendingActivation)) { + LOGGER.info("Cluster does not have any registered partitions"); + return true; + } + if (appCtx.isCloudDeployment() && storageComputePartitionsMap != null) { + Set<String> computeNodes = storageComputePartitionsMap.getComputeNodes(); + if (!participantNodes.containsAll(computeNodes)) { + LOGGER.info("Cluster missing compute nodes; required {}, current {}", computeNodes, participantNodes); + return true; + } + } else { + // exclude partitions that are pending activation + if (clusterPartitions.values().stream().anyMatch(p -> !p.isActive() && !p.isPendingActivation())) { + return true; + } + } + return false; + } + + private synchronized void resetStorageComputeMap() { + if (storageComputePartitionsMap == null + && appCtx.getStorageProperties().getPartitioningScheme() == PartitioningScheme.STATIC + && !isClusterUnusable()) { + storageComputePartitionsMap = StorageComputePartitionsMap.computePartitionsMap(this); + } + } + private static InetSocketAddress getReplicaLocation(IClusterStateManager csm, String nodeId) { final Map<IOption, Object> ncConfig = csm.getActiveNcConfiguration().get(nodeId); if (ncConfig == null) { diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java index 080204eb0a..25b961017c 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java @@ -646,8 +646,8 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito public synchronized List<FileReference> getOnDiskPartitions() throws HyracksDataException { List<FileReference> onDiskPartitions = new ArrayList<>(); for (FileReference root : storageRoots) { - Collection<FileReference> partitions = ioManager.list(root, - (dir, name) -> dir.isDirectory() && name.startsWith(StorageConstants.PARTITION_DIR_PREFIX)); + Collection<FileReference> partitions = ioManager.list(root, (dir, name) -> dir != null && dir.isDirectory() + && name.startsWith(StorageConstants.PARTITION_DIR_PREFIX)); if (partitions != null) { onDiskPartitions.addAll(partitions); }