>From Murtadha Hubail <[email protected]>:

Murtadha Hubail has uploaded this change for review. ( 
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17509 )


Change subject: WIP: Static bucketing
......................................................................

WIP: Static bucketing

Change-Id: Ieca7ffb0f48e16fba4dc5beb0868c1ef8ac9245e
---
A 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/ComputePartition.java
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
A 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/SplitComputeLocations.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DataPartitioningProvider.java
M 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java
M asterixdb/asterix-app/src/test/resources/cc.conf
M asterixdb/asterix-app/src/main/resources/cc.conf
A 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/StoragePartitionsMap.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
M 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
10 files changed, 286 insertions(+), 10 deletions(-)



  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/09/17509/1

diff --git a/asterixdb/asterix-app/src/main/resources/cc.conf 
b/asterixdb/asterix-app/src/main/resources/cc.conf
index dc6e5a2..cbc051b 100644
--- a/asterixdb/asterix-app/src/main/resources/cc.conf
+++ b/asterixdb/asterix-app/src/main/resources/cc.conf
@@ -65,3 +65,4 @@
 compiler.internal.sanitycheck=true
 messaging.frame.size=4096
 messaging.frame.count=512
+storage.partitioning=static
diff --git a/asterixdb/asterix-app/src/test/resources/cc.conf 
b/asterixdb/asterix-app/src/test/resources/cc.conf
index 953284964..ec7566a 100644
--- a/asterixdb/asterix-app/src/test/resources/cc.conf
+++ b/asterixdb/asterix-app/src/test/resources/cc.conf
@@ -61,3 +61,4 @@
 compiler.windowmemory=192KB
 messaging.frame.size=4096
 messaging.frame.count=512
+storage.partitioning=static
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/ComputePartition.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/ComputePartition.java
new file mode 100644
index 0000000..1d11c30
--- /dev/null
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/ComputePartition.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.cluster;
+
+public class ComputePartition {
+    private final String nodeId;
+    private final int id;
+
+    public ComputePartition(int id, String nodeId) {
+        this.id = id;
+        this.nodeId = nodeId;
+    }
+
+    public String getNodeId() {
+        return nodeId;
+    }
+
+    public int getId() {
+        return id;
+    }
+}
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/SplitComputeLocations.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/SplitComputeLocations.java
new file mode 100644
index 0000000..b58c39f
--- /dev/null
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/SplitComputeLocations.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.cluster;
+
+import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
+
+public class SplitComputeLocations {
+
+    private final IFileSplitProvider splitsProvider;
+    private final AlgebricksPartitionConstraint constraints;
+
+    public SplitComputeLocations(IFileSplitProvider splitsProvider, 
AlgebricksPartitionConstraint constraints) {
+        this.splitsProvider = splitsProvider;
+        this.constraints = constraints;
+    }
+
+    public IFileSplitProvider getSplitsProvider() {
+        return splitsProvider;
+    }
+
+    public AlgebricksPartitionConstraint getConstraints() {
+        return constraints;
+    }
+}
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/StoragePartitionsMap.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/StoragePartitionsMap.java
new file mode 100644
index 0000000..a8029b4
--- /dev/null
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/StoragePartitionsMap.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.cluster;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.common.utils.StorageConstants;
+
+public class StoragePartitionsMap {
+
+    private Map<Integer, ComputePartition> stoToComputeLocation = new 
HashMap<>();
+
+    public void addStoragePartition(int stoPart, ComputePartition compute){
+        stoToComputeLocation.put(stoPart, compute);
+    }
+
+    public int[][] getComputeToStorageMap(boolean metadataDataset) {
+        Map<Integer, List<Integer>> computeToStoragePartitions = new 
HashMap<>();
+        if (metadataDataset) {
+            final int computePartitionIdForMetadata = 0;
+            computeToStoragePartitions.put(computePartitionIdForMetadata,
+                    Collections.singletonList(computePartitionIdForMetadata));
+        } else {
+            for(int i = 0; i < StorageConstants.NUM_STORAGE_PARTITIONS; i++){
+                ComputePartition computePartition = getComputePartition(i);
+                int computeId = computePartition.getId();
+                List<Integer> storagePartitions = 
computeToStoragePartitions.computeIfAbsent(computeId, k -> new ArrayList<>());
+                storagePartitions.add(i);
+            }
+        }
+        int[][] computerToStoArray = new 
int[computeToStoragePartitions.size()][];
+        for (Map.Entry<Integer, List<Integer>> integerListEntry : 
computeToStoragePartitions.entrySet()) {
+            computerToStoArray[integerListEntry.getKey()] =
+                    integerListEntry.getValue().stream().mapToInt(i -> 
i).toArray();
+        }
+        return computerToStoArray;
+    }
+
+    public synchronized ComputePartition getComputePartition(int 
storagePartition) {
+        return stoToComputeLocation.get(storagePartition);
+    }
+}
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
index 2d231d3..84b8b6d 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
@@ -46,6 +46,8 @@
     public static final String DEFAULT_COMPACTION_POLICY_NAME = 
ConcurrentMergePolicyFactory.NAME;
     public static final String DEFAULT_FILTERED_DATASET_COMPACTION_POLICY_NAME 
= "correlated-prefix";
     public static final Map<String, String> 
DEFAULT_COMPACTION_POLICY_PROPERTIES;
+    public static final int METADATA_PARTITION = -1;
+    public static final int NUM_STORAGE_PARTITIONS = 16;

     /**
      * The storage version of AsterixDB related artifacts (e.g. log files, 
checkpoint files, etc..).
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index a8df92a..9a6abec 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -933,7 +933,7 @@

     public FileSplit[] splitsForIndex(MetadataTransactionContext mdTxnCtx, 
Dataset dataset, String indexName)
             throws AlgebricksException {
-        return SplitsAndConstraintsUtil.getIndexSplits(dataset, indexName, 
mdTxnCtx, appCtx.getClusterStateManager());
+        return dataPartitioningProvider.getPartitioningProperties(mdTxnCtx, 
dataset, indexName).getSpiltsProvider().getFileSplits();
     }

     public DatasourceAdapter getAdapter(MetadataTransactionContext mdTxnCtx, 
DataverseName dataverseName,
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DataPartitioningProvider.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DataPartitioningProvider.java
index ec4c985..d8509e9 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DataPartitioningProvider.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DataPartitioningProvider.java
@@ -20,23 +20,34 @@

 import static org.apache.asterix.common.utils.PartitioningScheme.DYNAMIC;
 import static org.apache.asterix.common.utils.PartitioningScheme.STATIC;
+import static 
org.apache.asterix.metadata.utils.MetadataConstants.METADATA_NODEGROUP_NAME;

+import java.io.File;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Set;
 import java.util.TreeSet;

+import org.apache.asterix.common.cluster.ComputePartition;
 import org.apache.asterix.common.cluster.IClusterStateManager;
 import org.apache.asterix.common.cluster.PartitioningProperties;
+import org.apache.asterix.common.cluster.SplitComputeLocations;
+import org.apache.asterix.common.cluster.StoragePartitionsMap;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.dataflow.IDataPartitioningProvider;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.metadata.DataverseName;
+import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
 import org.apache.asterix.common.utils.PartitioningScheme;
+import org.apache.asterix.common.utils.StorageConstants;
 import org.apache.asterix.common.utils.StoragePathUtil;
 import org.apache.asterix.external.util.FeedUtils;
 import org.apache.asterix.metadata.MetadataTransactionContext;
 import org.apache.asterix.metadata.entities.Dataset;
 import org.apache.asterix.metadata.entities.Feed;
+import org.apache.asterix.runtime.utils.ClusterStateManager;
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
@@ -44,30 +55,87 @@
 import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.api.io.FileSplit;
+import org.apache.hyracks.api.io.MappedFileSplit;
+import org.apache.hyracks.dataflow.std.file.ConstantFileSplitProvider;
 import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;

 public class DataPartitioningProvider implements IDataPartitioningProvider {

     private final ICcApplicationContext appCtx;
     private final PartitioningScheme scheme;
+    private final ClusterStateManager clusterStateManager;

     public DataPartitioningProvider(ICcApplicationContext appCtx) {
         this.appCtx = appCtx;
+        this.clusterStateManager = (ClusterStateManager) 
appCtx.getClusterStateManager();
         scheme = appCtx.getStorageProperties().getPartitioningScheme();
     }

     public PartitioningProperties getPartitioningProperties(DataverseName 
dataverseName) {
         if (scheme == DYNAMIC) {
-            Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
splitsAndConstraints = SplitsAndConstraintsUtil
-                    
.getDataverseSplitProviderAndConstraints(appCtx.getClusterStateManager(), 
dataverseName);
+            Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
splitsAndConstraints =
+                    
SplitsAndConstraintsUtil.getDataverseSplitProviderAndConstraints(appCtx.getClusterStateManager(),
+                            dataverseName);
             int[][] partitionsMap = 
getPartitionsMap(getNumPartitions(splitsAndConstraints.second));
             return PartitioningProperties.of(splitsAndConstraints.first, 
splitsAndConstraints.second, partitionsMap);
         } else if (scheme == STATIC) {
-            throw new NotImplementedException();
+            SplitComputeLocations dataverseSplits = 
getDataverseSplits(dataverseName);
+            StoragePartitionsMap partitionMap = 
clusterStateManager.getPartitionMap();
+            int[][] partitionsMap = partitionMap.getComputeToStorageMap(false);
+            return 
PartitioningProperties.of(dataverseSplits.getSplitsProvider(), 
dataverseSplits.getConstraints(),
+                    partitionsMap);
         }
         throw new IllegalStateException();
     }

+    private SplitComputeLocations getDataverseSplits(DataverseName 
dataverseName) {
+        List<FileSplit> splits = new ArrayList<>();
+        List<String> locations = new ArrayList<>();
+        Set<Integer> uniqueLocations = new HashSet<>();
+        StoragePartitionsMap partitionMap = 
clusterStateManager.getPartitionMap();
+        for (int i = 0; i < StorageConstants.NUM_STORAGE_PARTITIONS; i++) {
+            File f = new File(StoragePathUtil.prepareStoragePartitionPath(i),
+                    StoragePathUtil.prepareDataverseName(dataverseName));
+            ComputePartition computePartition = 
partitionMap.getComputePartition(i);
+            splits.add(new MappedFileSplit(computePartition.getNodeId(), 
f.getPath(), 0));
+            if (!uniqueLocations.contains(computePartition.getId())) {
+                locations.add(computePartition.getNodeId());
+            }
+            uniqueLocations.add(computePartition.getId());
+        }
+        IFileSplitProvider splitProvider = 
StoragePathUtil.splitProvider(splits.toArray(new FileSplit[0]));
+        AlgebricksPartitionConstraint constraints =
+                new 
AlgebricksAbsolutePartitionConstraint(locations.toArray(new String[0]));
+        return new SplitComputeLocations(splitProvider, constraints);
+    }
+
+    private SplitComputeLocations getDatasetSplits(Dataset dataset, String 
indexName) {
+        List<FileSplit> splits = new ArrayList<>();
+        List<String> locations = new ArrayList<>();
+        Set<Integer> uniqueLocations = new HashSet<>();
+        StoragePartitionsMap partitionMap = 
clusterStateManager.getPartitionMap();
+
+        final int datasetPartitons = getNumberOfPartitions(dataset);
+        boolean metadataDataset = 
MetadataIndexImmutableProperties.isMetadataDataset(dataset.getDatasetId());
+        for (int i = 0; i < datasetPartitons; i++) {
+            int storagePartition = metadataDataset ? 
StorageConstants.METADATA_PARTITION : i;
+            final String relPath =
+                    
StoragePathUtil.prepareDataverseIndexName(dataset.getDataverseName(), 
dataset.getDatasetName(),
+                            indexName, dataset.getRebalanceCount());
+            File f = new 
File(StoragePathUtil.prepareStoragePartitionPath(storagePartition), relPath);
+            ComputePartition computePartition = 
partitionMap.getComputePartition(storagePartition);
+            splits.add(new MappedFileSplit(computePartition.getNodeId(), 
f.getPath(), 0));
+            if (!uniqueLocations.contains(computePartition.getId())) {
+                locations.add(computePartition.getNodeId());
+            }
+            uniqueLocations.add(computePartition.getId());
+        }
+        IFileSplitProvider splitProvider = 
StoragePathUtil.splitProvider(splits.toArray(new FileSplit[0]));
+        AlgebricksPartitionConstraint constraints =
+                new 
AlgebricksAbsolutePartitionConstraint(locations.toArray(new String[0]));
+        return new SplitComputeLocations(splitProvider, constraints);
+    }
+
     public PartitioningProperties 
getPartitioningProperties(MetadataTransactionContext mdTxnCtx, Dataset ds,
             String indexName) throws AlgebricksException {
         if (scheme == DYNAMIC) {
@@ -78,7 +146,12 @@
             int[][] partitionsMap = 
getPartitionsMap(getNumPartitions(splitsAndConstraints.second));
             return PartitioningProperties.of(splitsAndConstraints.first, 
splitsAndConstraints.second, partitionsMap);
         } else if (scheme == STATIC) {
-            throw new NotImplementedException();
+            SplitComputeLocations dataverseSplits = getDatasetSplits(ds, 
indexName);
+            StoragePartitionsMap partitionMap = 
clusterStateManager.getPartitionMap();
+            int[][] partitionsMap = partitionMap.getComputeToStorageMap(
+                    
MetadataIndexImmutableProperties.isMetadataDataset(ds.getDatasetId()));
+            return 
PartitioningProperties.of(dataverseSplits.getSplitsProvider(), 
dataverseSplits.getConstraints(),
+                    partitionsMap);
         }
         throw new IllegalStateException();
     }
@@ -97,7 +170,7 @@
             int[][] partitionsMap = 
getPartitionsMap(getNumPartitions(spC.second));
             return PartitioningProperties.of(spC.first, spC.second, 
partitionsMap);
         } else if (scheme == STATIC) {
-            throw new NotImplementedException();
+            return getPartitioningProperties(feed.getDataverseName());
         }
         throw new IllegalStateException();
     }
@@ -117,4 +190,10 @@
         }
         return map;
     }
+
+    private static int getNumberOfPartitions(Dataset ds) {
+        return 
MetadataIndexImmutableProperties.isMetadataDataset(ds.getDatasetId()) ?
+                MetadataIndexImmutableProperties.METADATA_DATASETS_PARTITIONS :
+                StorageConstants.NUM_STORAGE_PARTITIONS;
+    }
 }
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java
index 7e51ec1..e26b436 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java
@@ -205,7 +205,8 @@
         try {
             INcApplicationContext appCtx =
                     (INcApplicationContext) 
ctx.getJobletContext().getServiceContext().getApplicationContext();
-
+            writer.open();
+            writerOpen = true;
             for (int i = 0; i < partitions.length; i++) {
                 IIndexDataflowHelper indexHelper = indexHelpers[i];
                 indexHelper.open();
@@ -224,8 +225,6 @@
                             new 
PrimaryIndexLogMarkerCallback((AbstractLSMIndex) indexes[0]);
                     TaskUtil.put(ILogMarkerCallback.KEY_MARKER_CALLBACK, 
callback, ctx);
                 }
-                writer.open();
-                writerOpen = true;
                 modCallbacks[i] =
                         
modOpCallbackFactory.createModificationOperationCallback(indexHelper.getResource(),
 ctx, this);
                 searchCallbacks[i] = (LockThenSearchOperationCallback) 
searchCallbackFactory
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
index d3c87ff..fda0d6a 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
@@ -33,13 +33,17 @@

 import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
 import org.apache.asterix.common.cluster.ClusterPartition;
+import org.apache.asterix.common.cluster.ComputePartition;
 import org.apache.asterix.common.cluster.IClusterStateManager;
+import org.apache.asterix.common.cluster.StoragePartitionsMap;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.replication.INcLifecycleCoordinator;
 import org.apache.asterix.common.transactions.IResourceIdManager;
 import org.apache.asterix.common.utils.NcLocalCounters;
+import org.apache.asterix.common.utils.PartitioningScheme;
+import org.apache.asterix.common.utils.StorageConstants;
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.config.IOption;
@@ -79,6 +83,10 @@
     private ICcApplicationContext appCtx;
     private ClusterPartition metadataPartition;
     private boolean rebalanceRequired;
+    private StoragePartitionsMap partitionMap;
+    public synchronized StoragePartitionsMap getPartitionMap() {
+        return partitionMap;
+    }

     @Override
     public void setCcAppCtx(ICcApplicationContext appCtx) {
@@ -86,7 +94,15 @@
         node2PartitionsMap = 
appCtx.getMetadataProperties().getNodePartitions();
         clusterPartitions = 
appCtx.getMetadataProperties().getClusterPartitions();
         currentMetadataNode = 
appCtx.getMetadataProperties().getMetadataNodeName();
-        metadataPartition = node2PartitionsMap.get(currentMetadataNode)[0];
+
+        PartitioningScheme partitioningScheme = 
appCtx.getStorageProperties().getPartitioningScheme();
+        if (partitioningScheme == PartitioningScheme.DYNAMIC) {
+            metadataPartition = node2PartitionsMap.get(currentMetadataNode)[0];
+        } else {
+            final ClusterPartition fixedMetadataPartition = new 
ClusterPartition(StorageConstants.METADATA_PARTITION,
+                    appCtx.getMetadataProperties().getMetadataNodeName(), 0);
+            metadataPartition = fixedMetadataPartition;
+        }
         lifecycleCoordinator = appCtx.getNcLifecycleCoordinator();
         lifecycleCoordinator.bindTo(this);
     }
@@ -299,6 +315,7 @@
         clusterActiveLocations.removeAll(pendingRemoval);
         clusterPartitionConstraint =
                 new 
AlgebricksAbsolutePartitionConstraint(clusterActiveLocations.toArray(new 
String[] {}));
+        computePartitionsMap();
     }

     @Override
@@ -489,6 +506,34 @@
         return nodeIds.stream().anyMatch(failedNodes::contains);
     }

+    private synchronized void computePartitionsMap() {
+        StoragePartitionsMap newMap = new StoragePartitionsMap();
+        newMap.addStoragePartition(metadataPartition.getPartitionId(),
+                new ComputePartition(metadataPartition.getPartitionId(), 
metadataPartition.getActiveNodeId()));
+        int storagePartitionsPerComputePartition = 
StorageConstants.NUM_STORAGE_PARTITIONS / clusterPartitions.size();
+        int storagePartitionId = 0;
+        int lastComputePartition = 1;
+        int remainingStoragePartition = 
StorageConstants.NUM_STORAGE_PARTITIONS % clusterPartitions.size();
+        for (Map.Entry<Integer, ClusterPartition> cp : 
clusterPartitions.entrySet()) {
+            ClusterPartition clusterPartition = cp.getValue();
+            for (int i = 0; i < storagePartitionsPerComputePartition; i++) {
+                newMap.addStoragePartition(storagePartitionId,
+                        new 
ComputePartition(clusterPartition.getPartitionId(), 
clusterPartition.getActiveNodeId()));
+                storagePartitionId++;
+            }
+            if (lastComputePartition == clusterPartitions.size() && 
remainingStoragePartition != 0) {
+                // assign all remaining partitions to last compute partition
+                for (int k = 0; k < remainingStoragePartition; k++) {
+                    newMap.addStoragePartition(storagePartitionId, new 
ComputePartition(clusterPartition.getPartitionId(),
+                            clusterPartition.getActiveNodeId()));
+                    storagePartitionId++;
+                }
+            }
+            lastComputePartition++;
+        }
+        partitionMap = newMap;
+    }
+
     private void updateClusterCounters(String nodeId, NcLocalCounters 
localCounters) {
         final IResourceIdManager resourceIdManager = 
appCtx.getResourceIdManager();
         resourceIdManager.report(nodeId, localCounters.getMaxResourceId());

--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17509
To unsubscribe, or for help writing mail filters, visit 
https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: Ieca7ffb0f48e16fba4dc5beb0868c1ef8ac9245e
Gerrit-Change-Number: 17509
Gerrit-PatchSet: 1
Gerrit-Owner: Murtadha Hubail <[email protected]>
Gerrit-MessageType: newchange

Reply via email to