>From Murtadha Hubail <[email protected]>:
Murtadha Hubail has uploaded this change for review. (
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17553 )
Change subject: [ASTERIXDB-3196][*DB] Cluster state for compute-storage
separation
......................................................................
[ASTERIXDB-3196][*DB] Cluster state for compute-storage separation
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
- Implement changes required to drive cluster state based on
compute-storage partitions map.
- Persist index checkpoints to cloud storage.
- Remove eager caching from NC startup tasks.
- Fixes for static data partitioning.
Change-Id: I217da04d06884d841c4a56aee3ab9815cc659de7
---
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
M
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/StorageComputePartitionsMap.java
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
M
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
M
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
7 files changed, 89 insertions(+), 35 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/53/17553/1
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index f60ed63..7f0b529 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -204,8 +204,7 @@
storageProperties.getBufferCachePageSize(),
storageProperties.getBufferCacheNumPages());
lsmIOScheduler = createIoScheduler(storageProperties);
metadataMergePolicyFactory = new ConcurrentMergePolicyFactory();
- // TODO do we want to write checkpoints for cloud?
- indexCheckpointManagerProvider = new
IndexCheckpointManagerProvider(ioManager);
+ indexCheckpointManagerProvider = new
IndexCheckpointManagerProvider(persistenceIOManager);
ILocalResourceRepositoryFactory
persistentLocalResourceRepositoryFactory =
new
PersistentLocalResourceRepositoryFactory(persistenceIOManager,
indexCheckpointManagerProvider,
persistedResourceRegistry);
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
index 4f9613d..449dd27 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
@@ -35,7 +35,6 @@
import org.apache.asterix.app.nc.task.BindMetadataNodeTask;
import org.apache.asterix.app.nc.task.CheckpointTask;
-import org.apache.asterix.app.nc.task.CloudToLocalStorageCachingTask;
import org.apache.asterix.app.nc.task.ExportMetadataNodeTask;
import org.apache.asterix.app.nc.task.LocalRecoveryTask;
import org.apache.asterix.app.nc.task.LocalStorageCleanupTask;
@@ -52,7 +51,6 @@
import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
import org.apache.asterix.common.api.INCLifecycleTask;
import org.apache.asterix.common.cluster.IClusterStateManager;
-import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.asterix.common.messaging.api.ICCMessageBroker;
@@ -222,11 +220,6 @@
tasks.add(new UpdateNodeStatusTask(NodeStatus.BOOTING,
nodeActivePartitions));
int metadataPartitionId =
clusterManager.getMetadataPartition().getPartitionId();
tasks.add(new LocalStorageCleanupTask(metadataPartitionId));
-
- if (((ICcApplicationContext)
(serviceContext.getControllerService()).getApplicationContext())
- .isCloudDeployment()) {
- tasks.add(new CloudToLocalStorageCachingTask(activePartitions));
- }
if (state == SystemState.CORRUPTED) {
// need to perform local recovery for node active partitions
LocalRecoveryTask rt = new LocalRecoveryTask(nodeActivePartitions);
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
index 3d0fee8..5f714f4 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
@@ -286,4 +286,10 @@
* @return the count of storage partitions
*/
int getStoragePartitionsCount();
+
+ /**
+ * Sets the compute-storage partitions map
+ * @param map
+ */
+ void setComputeStoragePartitionsMap(StorageComputePartitionsMap map);
}
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/StorageComputePartitionsMap.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/StorageComputePartitionsMap.java
index 48b2ea1..6561d05 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/StorageComputePartitionsMap.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/StorageComputePartitionsMap.java
@@ -23,6 +23,8 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
public class StorageComputePartitionsMap {
@@ -53,9 +55,10 @@
}
}
int[][] computerToStoArray = new
int[computeToStoragePartitions.size()][];
+ int partitionIdx = 0;
for (Map.Entry<Integer, List<Integer>> integerListEntry :
computeToStoragePartitions.entrySet()) {
- computerToStoArray[integerListEntry.getKey()] =
- integerListEntry.getValue().stream().mapToInt(i ->
i).toArray();
+ computerToStoArray[partitionIdx] =
integerListEntry.getValue().stream().mapToInt(i -> i).toArray();
+ partitionIdx++;
}
return computerToStoArray;
}
@@ -94,4 +97,8 @@
}
return newMap;
}
+
+ public Set<String> getComputeNodes() {
+ return
stoToComputeLocation.values().stream().map(ComputePartition::getNodeId).collect(Collectors.toSet());
+ }
}
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index b07a03e..ea1b9e6 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -32,7 +32,6 @@
import java.util.Optional;
import java.util.stream.Collectors;
-import org.apache.asterix.common.cluster.IClusterStateManager;
import org.apache.asterix.common.cluster.PartitioningProperties;
import org.apache.asterix.common.config.DatasetConfig.DatasetType;
import org.apache.asterix.common.config.DatasetConfig.IndexType;
@@ -891,13 +890,7 @@
} else {
numElementsHint = Long.parseLong(numElementsHintString);
}
- int numPartitions = 0;
- List<String> nodeGroup =
- MetadataManager.INSTANCE.getNodegroup(mdTxnCtx,
dataset.getNodeGroupName()).getNodeNames();
- IClusterStateManager csm = appCtx.getClusterStateManager();
- for (String nd : nodeGroup) {
- numPartitions += csm.getNodePartitionsCount(nd);
- }
+ int numPartitions =
getPartitioningProperties(dataset).getNumberOfPartitions();
return numElementsHint / numPartitions;
}
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
index 92ea173..a6583f0 100644
---
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
@@ -30,6 +30,7 @@
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
+import java.util.stream.Collectors;
import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
import org.apache.asterix.common.cluster.ClusterPartition;
@@ -155,7 +156,16 @@
if (active) {
updateClusterCounters(nodeId, localCounters);
participantNodes.add(nodeId);
- activateNodePartitions(nodeId, activePartitions);
+ if (appCtx.isCloudDeployment()) {
+ // node compute partitions never change
+ ClusterPartition[] nodePartitions = getNodePartitions(nodeId);
+ activePartitions =
+
Arrays.stream(nodePartitions).map(ClusterPartition::getPartitionId).collect(Collectors.toSet());
+ activateNodePartitions(nodeId, activePartitions);
+ } else {
+ activateNodePartitions(nodeId, activePartitions);
+ }
+
} else {
participantNodes.remove(nodeId);
deactivateNodePartitions(nodeId);
@@ -183,16 +193,7 @@
return;
}
resetClusterPartitionConstraint();
- // if the cluster has no registered partitions or all partitions are
pending activation -> UNUSABLE
- if (clusterPartitions.isEmpty()
- ||
clusterPartitions.values().stream().allMatch(ClusterPartition::isPendingActivation))
{
- LOGGER.info("Cluster does not have any registered partitions");
- setState(ClusterState.UNUSABLE);
- return;
- }
-
- // exclude partitions that are pending activation
- if (clusterPartitions.values().stream().anyMatch(p -> !p.isActive() &&
!p.isPendingActivation())) {
+ if (isClusterUnusable()) {
setState(ClusterState.UNUSABLE);
return;
}
@@ -220,6 +221,7 @@
setState(ClusterState.ACTIVE);
}
+
@Override
public synchronized void waitForState(ClusterState waitForState) throws
InterruptedException {
while (state != waitForState) {
@@ -310,9 +312,7 @@
clusterActiveLocations.removeAll(pendingRemoval);
clusterPartitionConstraint =
new
AlgebricksAbsolutePartitionConstraint(clusterActiveLocations.toArray(new
String[] {}));
- if (appCtx.getStorageProperties().getPartitioningScheme() ==
PartitioningScheme.STATIC) {
- storageComputePartitionsMap =
StorageComputePartitionsMap.computePartitionsMap(this);
- }
+ resetStorageComputeMap();
}
@Override
@@ -512,6 +512,11 @@
return storageComputePartitionsMap;
}
+ @Override
+ public synchronized void
setComputeStoragePartitionsMap(StorageComputePartitionsMap map) {
+ this.storageComputePartitionsMap = map;
+ }
+
private void updateClusterCounters(String nodeId, NcLocalCounters
localCounters) {
final IResourceIdManager resourceIdManager =
appCtx.getResourceIdManager();
resourceIdManager.report(nodeId, localCounters.getMaxResourceId());
@@ -543,6 +548,36 @@
false));
}
+ private synchronized boolean isClusterUnusable() {
+ // if the cluster has no registered partitions or all partitions are
pending activation -> UNUSABLE
+ if (clusterPartitions.isEmpty()
+ ||
clusterPartitions.values().stream().allMatch(ClusterPartition::isPendingActivation))
{
+ LOGGER.info("Cluster does not have any registered partitions");
+ return true;
+ }
+ if (appCtx.isCloudDeployment() && storageComputePartitionsMap != null)
{
+ Set<String> computeNodes =
storageComputePartitionsMap.getComputeNodes();
+ if (!participantNodes.containsAll(computeNodes)) {
+ LOGGER.info("Cluster missing compute nodes; required {},
current {}", computeNodes, participantNodes);
+ return true;
+ }
+ } else {
+ // exclude partitions that are pending activation
+ if (clusterPartitions.values().stream().anyMatch(p ->
!p.isActive() && !p.isPendingActivation())) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private synchronized void resetStorageComputeMap() {
+ if (storageComputePartitionsMap == null
+ && appCtx.getStorageProperties().getPartitioningScheme() ==
PartitioningScheme.STATIC
+ && !isClusterUnusable()) {
+ storageComputePartitionsMap =
StorageComputePartitionsMap.computePartitionsMap(this);
+ }
+ }
+
private static InetSocketAddress getReplicaLocation(IClusterStateManager
csm, String nodeId) {
final Map<IOption, Object> ncConfig =
csm.getActiveNcConfiguration().get(nodeId);
if (ncConfig == null) {
diff --git
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index 080204e..25b9610 100644
---
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -646,8 +646,8 @@
public synchronized List<FileReference> getOnDiskPartitions() throws
HyracksDataException {
List<FileReference> onDiskPartitions = new ArrayList<>();
for (FileReference root : storageRoots) {
- Collection<FileReference> partitions = ioManager.list(root,
- (dir, name) -> dir.isDirectory() &&
name.startsWith(StorageConstants.PARTITION_DIR_PREFIX));
+ Collection<FileReference> partitions = ioManager.list(root, (dir,
name) -> dir != null && dir.isDirectory()
+ && name.startsWith(StorageConstants.PARTITION_DIR_PREFIX));
if (partitions != null) {
onDiskPartitions.addAll(partitions);
}
--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17553
To unsubscribe, or for help writing mail filters, visit
https://asterix-gerrit.ics.uci.edu/settings
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: I217da04d06884d841c4a56aee3ab9815cc659de7
Gerrit-Change-Number: 17553
Gerrit-PatchSet: 1
Gerrit-Owner: Murtadha Hubail <[email protected]>
Gerrit-MessageType: newchange