Murtadha Hubail has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/2112
Change subject: [ASTERIXDB-2081][STO] Introduce DatasetMemoryManager
......................................................................
[ASTERIXDB-2081][STO] Introduce DatasetMemoryManager
- user model changes: no
- storage format changes: no
- interface changes: yes
Added IDatasetMemoryManager to manage datasets memory
reservation and allocation.
Details:
- Reserve metadata datasets memory to allow them to be opened
when needed.
- Close all datasets at the end of recovery to clear
used memory and flush all recovered datasets.
- Add UngracefulShutdownNCApplication to force recovery
to run on AsterixHyracksIntegrationUtil.
- Refactor the use of firstAvilableUserDatasetID to check
for metadata datasets.
- Add ThreadSafe annotation.
- Add test case for RecoveryManager after creating multiple
datasets.
Change-Id: Ica76b3c8eca6f7d2ad1d962fb5ef84267c258571
---
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
A
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/RecoveryManagerTest.java
A
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetMemoryManager.java
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
A
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetMemoryManager.java
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetVirtualBufferCaches.java
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/MetadataIndexImmutableProperties.java
A
asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/DatasetMemoryManagerTest.java
M
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
A
hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/annotations/ThreadSafe.java
14 files changed, 566 insertions(+), 67 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/12/2112/1
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
index ecf25eb..d423d2d 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
@@ -66,6 +66,7 @@
private static final String DEFAULT_STORAGE_PATH = joinPath("target",
"io", "dir");
private static String storagePath = DEFAULT_STORAGE_PATH;
+ public static boolean gracefulShutdown = true;
private ConfigManager configManager;
private List<String> nodeNames;
@@ -158,6 +159,9 @@
}
protected INCApplication createNCApplication() {
+ if (!gracefulShutdown) {
+ return new UngracefulShutdownNCApplication();
+ }
return new NCApplication();
}
@@ -288,4 +292,11 @@
Thread.sleep(10000);
}
}
+
+ private class UngracefulShutdownNCApplication extends NCApplication {
+ @Override
+ public void stop() throws Exception {
+ // ungraceful shutdown
+ }
+ }
}
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index 5370c03..7b08f68 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -32,6 +32,7 @@
import org.apache.asterix.active.ActiveManager;
import org.apache.asterix.api.common.AppRuntimeContextProviderForRecovery;
import org.apache.asterix.common.api.IDatasetLifecycleManager;
+import org.apache.asterix.common.api.IDatasetMemoryManager;
import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.common.cluster.ClusterPartition;
import org.apache.asterix.common.config.ActiveProperties;
@@ -48,11 +49,11 @@
import org.apache.asterix.common.config.StorageProperties;
import org.apache.asterix.common.config.TransactionProperties;
import org.apache.asterix.common.context.DatasetLifecycleManager;
+import org.apache.asterix.common.context.DatasetMemoryManager;
import org.apache.asterix.common.context.IStorageComponentProvider;
import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.library.ILibraryManager;
-import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
import org.apache.asterix.common.replication.IRemoteRecoveryManager;
import org.apache.asterix.common.replication.IReplicaResourcesManager;
import org.apache.asterix.common.replication.IReplicationChannel;
@@ -119,6 +120,7 @@
private MessagingProperties messagingProperties;
private final NodeProperties nodeProperties;
private ExecutorService threadExecutor;
+ private IDatasetMemoryManager datasetMemoryManager;
private IDatasetLifecycleManager datasetLifecycleManager;
private IBufferCache bufferCache;
private ITransactionSubsystem txnSubsystem;
@@ -198,9 +200,10 @@
localResourceRepository.deleteStorageData(true);
}
- datasetLifecycleManager = new
DatasetLifecycleManager(storageProperties, localResourceRepository,
-
MetadataIndexImmutableProperties.FIRST_AVAILABLE_USER_DATASET_ID,
txnSubsystem.getLogManager(),
- ioManager.getIODevices().size());
+ datasetMemoryManager = new DatasetMemoryManager(storageProperties);
+ datasetLifecycleManager =
+ new DatasetLifecycleManager(storageProperties,
localResourceRepository, txnSubsystem.getLogManager(),
+ datasetMemoryManager, ioManager.getIODevices().size());
isShuttingdown = false;
@@ -316,6 +319,11 @@
}
@Override
+ public IDatasetMemoryManager getDatasetMemoryManager() {
+ return datasetMemoryManager;
+ }
+
+ @Override
public double getBloomFilterFalsePositiveRate() {
return storageProperties.getBloomFilterFalsePositiveRate();
}
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
index a502de9..7b1834a 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
@@ -401,6 +401,7 @@
for (long r : resourceIdList) {
datasetLifecycleManager.close(resourcesMap.get(r).getPath());
}
+ datasetLifecycleManager.closeAllDatasets();
}
}
diff --git
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/RecoveryManagerTest.java
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/RecoveryManagerTest.java
new file mode 100644
index 0000000..6386a54
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/RecoveryManagerTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.txn;
+
+import java.io.File;
+import java.io.InputStream;
+import java.util.Random;
+
+import org.apache.asterix.api.common.AsterixHyracksIntegrationUtil;
+import org.apache.asterix.common.config.GlobalConfig;
+import org.apache.asterix.common.configuration.AsterixConfiguration;
+import org.apache.asterix.common.configuration.Property;
+import org.apache.asterix.common.utils.Servlets;
+import org.apache.asterix.test.common.TestExecutor;
+import org.apache.asterix.test.common.TestHelper;
+import org.apache.asterix.testframework.context.TestCaseContext;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+public class RecoveryManagerTest {
+
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ private static final String DEFAULT_TEST_CONFIG_FILE_NAME =
"asterix-build-configuration.xml";
+ private static final String TEST_CONFIG_FILE_NAME =
"asterix-test-configuration.xml";
+ private static final String TEST_CONFIG_PATH =
+ System.getProperty("user.dir") + File.separator + "target" +
File.separator + "config";
+ private static final String TEST_CONFIG_FILE_PATH = TEST_CONFIG_PATH +
File.separator + TEST_CONFIG_FILE_NAME;
+ private static final TestExecutor testExecutor = new TestExecutor();
+ private static final AsterixHyracksIntegrationUtil integrationUtil = new
AsterixHyracksIntegrationUtil();
+ private static final Random random = new Random();
+ private static final int numRecords = 1;
+
+ @Before
+ public void setUp() throws Exception {
+ // Read default test configurations
+ AsterixConfiguration ac =
TestHelper.getConfigurations(DEFAULT_TEST_CONFIG_FILE_NAME);
+ // override memory config to enforce dataset eviction
+ ac.getProperty().add(new
Property("storage.memorycomponent.globalbudget", String.valueOf("128MB"), ""));
+ ac.getProperty().add(new Property("storage.memorycomponent.numpages",
String.valueOf("32"), ""));
+ // Write test config file
+ TestHelper.writeConfigurations(ac, TEST_CONFIG_FILE_PATH);
+ System.setProperty(GlobalConfig.CONFIG_FILE_PROPERTY,
TEST_CONFIG_FILE_PATH);
+ integrationUtil.gracefulShutdown = false;
+ integrationUtil.init(true);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ integrationUtil.deinit(true);
+ }
+
+ @Test
+ public void multiDatasetRecovery() throws Exception {
+ String datasetNamePrefix = "ds_";
+ final TestCaseContext.OutputFormat format =
TestCaseContext.OutputFormat.CLEAN_JSON;
+ testExecutor.executeSqlppUpdateOrDdl("CREATE TYPE KeyType AS { id: int
};", format);
+ int numDatasets = 50;
+ String datasetName = null;
+ for (int i = 1; i <= numDatasets; i++) {
+ datasetName = datasetNamePrefix + i;
+ testExecutor.executeSqlppUpdateOrDdl("CREATE DATASET " +
datasetName + "(KeyType) PRIMARY KEY id;", format);
+ insertData(datasetName);
+ }
+ // do ungraceful shutdown to enforce recovery
+ integrationUtil.deinit(false);
+ integrationUtil.init(false);
+ validateRecovery(datasetName);
+
+ // create more datasets after recovery
+ numDatasets = 100;
+ for (int i = 51; i <= numDatasets; i++) {
+ datasetName = datasetNamePrefix + i;
+ testExecutor.executeSqlppUpdateOrDdl("CREATE DATASET " +
datasetName + "(KeyType) PRIMARY KEY id;", format);
+ insertData(datasetName);
+ }
+ // do ungraceful shutdown to enforce recovery again
+ integrationUtil.deinit(false);
+ integrationUtil.init(false);
+ validateRecovery(datasetName);
+ }
+
+ private void insertData(String datasetName) throws Exception {
+ for (int i = 0; i < numRecords; i++) {
+ testExecutor.executeSqlppUpdateOrDdl("UPSERT INTO " + datasetName
+ " ({\"id\": " + random.nextInt() + "})",
+ TestCaseContext.OutputFormat.CLEAN_JSON);
+ }
+ }
+
+ private void validateRecovery(String datasetName) throws Exception {
+ final String query = "select value count(*) from `" + datasetName +
"`;";
+ final InputStream inputStream = testExecutor
+ .executeQueryService(query,
testExecutor.getEndpoint(Servlets.QUERY_SERVICE),
+ TestCaseContext.OutputFormat.CLEAN_JSON);
+ final ObjectNode jsonNodes = OBJECT_MAPPER.readValue(inputStream,
ObjectNode.class);
+ JsonNode result = jsonNodes.get("results");
+ // make sure there is result
+ Assert.assertEquals(1, result.size());
+ for (int i = 0; i < result.size(); i++) {
+ JsonNode json = result.get(i);
+ Assert.assertEquals(numRecords, json.asInt());
+ }
+ }
+}
\ No newline at end of file
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetMemoryManager.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetMemoryManager.java
new file mode 100644
index 0000000..1ae6438
--- /dev/null
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetMemoryManager.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.api;
+
+public interface IDatasetMemoryManager {
+
+ /**
+ * Allocates memory for dataset {@code datasetId}.
+ *
+ * @param datasetId
+ * @return true, if the allocation is successful, otherwise false.
+ */
+ boolean allocate(int datasetId);
+
+ /**
+ * Deallocates memory of dataset {@code datasetId}.
+ *
+ * @param datasetId
+ */
+ void deallocate(int datasetId);
+
+ /**
+ * Reserves memory for dataset {@code datasetId}. The reserved memory
+ * is guaranteed to be allocatable when needed for the dataset.
+ *
+ * @param datasetId
+ * @return true, if the allocation is successful, otherwise false.
+ */
+ boolean reserve(int datasetId);
+
+ /**
+ * Cancels the reserved memory for dataset {@code datasetId}.
+ *
+ * @param datasetId
+ */
+ void cancelReserved(int datasetId);
+
+ /**
+ * @return The remaining memory budget that can be used for datasets.
+ */
+ long getAvailable();
+
+ /**
+ * @param datasetId
+ * @return The number of virtual buffer cache pages that should be
allocated for dataset {@code datasetId}.
+ */
+ int getNumPages(int datasetId);
+}
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
index d4b9a92..548907c 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
@@ -62,6 +62,8 @@
IDatasetLifecycleManager getDatasetLifecycleManager();
+ IDatasetMemoryManager getDatasetMemoryManager();
+
IResourceIdFactory getResourceIdFactory();
ILSMOperationTracker getLSMBTreeOperationTracker(int datasetID);
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index 545382a..eac479c 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -29,6 +29,7 @@
import java.util.logging.Logger;
import org.apache.asterix.common.api.IDatasetLifecycleManager;
+import org.apache.asterix.common.api.IDatasetMemoryManager;
import org.apache.asterix.common.config.StorageProperties;
import org.apache.asterix.common.dataflow.DatasetLocalResource;
import org.apache.asterix.common.exceptions.ACIDException;
@@ -45,33 +46,29 @@
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
-import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
import org.apache.hyracks.storage.common.IIndex;
import org.apache.hyracks.storage.common.ILocalResourceRepository;
import org.apache.hyracks.storage.common.LocalResource;
public class DatasetLifecycleManager implements IDatasetLifecycleManager,
ILifeCycleComponent {
+
private static final Logger LOGGER =
Logger.getLogger(DatasetLifecycleManager.class.getName());
private final Map<Integer, DatasetResource> datasets = new
ConcurrentHashMap<>();
private final StorageProperties storageProperties;
private final ILocalResourceRepository resourceRepository;
- private final int firstAvilableUserDatasetID;
- private final long capacity;
- private long used;
+ private final IDatasetMemoryManager memoryManager;
private final ILogManager logManager;
private final LogRecord logRecord;
private final int numPartitions;
private volatile boolean stopped = false;
public DatasetLifecycleManager(StorageProperties storageProperties,
ILocalResourceRepository resourceRepository,
- int firstAvilableUserDatasetID, ILogManager logManager, int
numPartitions) {
+ ILogManager logManager, IDatasetMemoryManager memoryManager, int
numPartitions) {
this.logManager = logManager;
this.storageProperties = storageProperties;
this.resourceRepository = resourceRepository;
- this.firstAvilableUserDatasetID = firstAvilableUserDatasetID;
+ this.memoryManager = memoryManager;
this.numPartitions = numPartitions;
- capacity = storageProperties.getMemoryComponentGlobalBudget();
- used = 0;
logRecord = new LogRecord();
}
@@ -200,9 +197,10 @@
for (DatasetResource dsr : datasetsResources) {
PrimaryIndexOperationTracker opTracker = dsr.getOpTracker();
if (opTracker != null && opTracker.getNumActiveOperations() == 0
- && dsr.getDatasetInfo().getReferenceCount() == 0 &&
dsr.getDatasetInfo().isOpen()
- && dsr.getDatasetInfo().getDatasetID() >=
getFirstAvilableUserDatasetID()) {
+ && dsr.getDatasetInfo().getReferenceCount() == 0 &&
dsr.getDatasetInfo().isOpen() && !dsr
+ .isMetadataDataset()) {
closeDataset(dsr.getDatasetInfo());
+ LOGGER.info(() -> "Evicted Dataset" + dsr.getDatasetID());
return true;
}
}
@@ -230,8 +228,9 @@
if (dsr == null) {
DatasetInfo dsInfo = new DatasetInfo(did);
PrimaryIndexOperationTracker opTracker = new
PrimaryIndexOperationTracker(did, logManager, dsInfo);
- DatasetVirtualBufferCaches vbcs = new
DatasetVirtualBufferCaches(did, storageProperties,
- getFirstAvilableUserDatasetID(), getNumPartitions());
+ DatasetVirtualBufferCaches vbcs =
+ new DatasetVirtualBufferCaches(did, storageProperties,
memoryManager.getNumPages(did),
+ numPartitions);
dsr = new DatasetResource(dsInfo, opTracker, vbcs);
datasets.put(did, dsr);
}
@@ -322,8 +321,7 @@
}
@Override
- public synchronized void start() {
- used = 0;
+ public void start() {
}
@Override
@@ -350,7 +348,7 @@
for (IndexInfo iInfo : dsr.getIndexes().values()) {
AbstractLSMIOOperationCallback ioCallback =
(AbstractLSMIOOperationCallback)
iInfo.getIndex().getIOOperationCallback();
- if (!(((AbstractLSMIndex)
iInfo.getIndex()).isCurrentMutableComponentEmpty()
+ if (!(iInfo.getIndex().isCurrentMutableComponentEmpty()
|| ioCallback.hasPendingFlush() ||
opTracker.isFlushLogCreated()
|| opTracker.isFlushOnExit())) {
long firstLSN = ioCallback.getFirstLSN();
@@ -449,7 +447,7 @@
public synchronized void closeUserDatasets() throws HyracksDataException {
ArrayList<DatasetResource> openDatasets = new
ArrayList<>(datasets.values());
for (DatasetResource dsr : openDatasets) {
- if (dsr.getDatasetID() >= getFirstAvilableUserDatasetID()) {
+ if (!dsr.isMetadataDataset()) {
closeDataset(dsr.getDatasetInfo());
}
}
@@ -474,8 +472,8 @@
public void dumpState(OutputStream outputStream) throws IOException {
StringBuilder sb = new StringBuilder();
- sb.append(String.format("Memory budget = %d\n", capacity));
- sb.append(String.format("Memory used = %d\n", used));
+ sb.append(String.format("Memory budget = %d\n",
storageProperties.getMemoryComponentGlobalBudget()));
+ sb.append(String.format("Memory available = %d\n",
memoryManager.getAvailable()));
sb.append("\n");
String dsHeaderFormat = "%-10s %-6s %-16s %-12s\n";
@@ -515,7 +513,7 @@
}
synchronized (dsInfo) {
if (dsInfo.isOpen() && dsInfo.isMemoryAllocated()) {
- used -=
getVirtualBufferCaches(dsInfo.getDatasetID()).getTotalSize();
+ memoryManager.deallocate(datasetId);
dsInfo.setMemoryAllocated(false);
}
}
@@ -534,25 +532,15 @@
synchronized (dsInfo) {
// This is not needed for external datasets' indexes since they
never use the virtual buffer cache.
if (!dsInfo.isMemoryAllocated() && !dsInfo.isExternal()) {
- long additionalSize =
getVirtualBufferCaches(dsInfo.getDatasetID()).getTotalSize();
- while (used + additionalSize > capacity) {
+ while (!memoryManager.allocate(datasetId)) {
if (!evictCandidateDataset()) {
throw new HyracksDataException("Cannot allocate
dataset " + dsInfo.getDatasetID()
+ " memory since memory budget would be
exceeded.");
}
}
- used += additionalSize;
dsInfo.setMemoryAllocated(true);
}
}
- }
-
- public int getFirstAvilableUserDatasetID() {
- return firstAvilableUserDatasetID;
- }
-
- public int getNumPartitions() {
- return numPartitions;
}
@Override
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetMemoryManager.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetMemoryManager.java
new file mode 100644
index 0000000..ccce1f1
--- /dev/null
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetMemoryManager.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.context;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.api.IDatasetMemoryManager;
+import org.apache.asterix.common.config.StorageProperties;
+import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
+import org.apache.hyracks.util.annotations.ThreadSafe;
+
+@ThreadSafe
+public class DatasetMemoryManager implements IDatasetMemoryManager {
+
+ private static final Logger LOGGER =
Logger.getLogger(DatasetMemoryManager.class.getName());
+ private final Map<Integer, Long> allocatedMap = new ConcurrentHashMap<>();
+ private final Map<Integer, Long> reservedMap = new ConcurrentHashMap<>();
+ private final AtomicLong available;
+ private final StorageProperties storageProperties;
+
+ public DatasetMemoryManager(StorageProperties storageProperties) {
+ this.storageProperties = storageProperties;
+ available = new
AtomicLong(storageProperties.getMemoryComponentGlobalBudget());
+ }
+
+ @Override
+ public synchronized boolean allocate(int datasetId) {
+ if (allocatedMap.containsKey(datasetId)) {
+ throw new IllegalStateException("Memory is already allocated for
dataset: " + datasetId);
+ }
+ if (reservedMap.containsKey(datasetId)) {
+ allocateReserved(datasetId);
+ return true;
+ }
+ final long required = getTotalSize(datasetId);
+ if (!isAllocatable(required)) {
+ return false;
+ }
+ allocatedMap.put(datasetId, required);
+ available.addAndGet(-required);
+ LOGGER.info(() -> "Allocated(" + required + ") for dataset(" +
datasetId + ")");
+ return true;
+ }
+
+ @Override
+ public synchronized void deallocate(int datasetId) {
+ if (!allocatedMap.containsKey(datasetId) &&
!reservedMap.containsKey(datasetId)) {
+ throw new IllegalStateException("No allocated or reserved memory
for dataset: " + datasetId);
+ }
+ final Long allocated = allocatedMap.remove(datasetId);
+ // return the allocated budget if it is not reserved
+ if (allocated != null && !reservedMap.containsKey(datasetId)) {
+ available.addAndGet(allocated);
+ LOGGER.info(() -> "Deallocated(" + allocated + ") from dataset(" +
datasetId + ")");
+ }
+ }
+
+ @Override
+ public synchronized boolean reserve(int datasetId) {
+ if (allocatedMap.containsKey(datasetId)) {
+ throw new IllegalStateException("Memory is already allocated for
dataset: " + datasetId);
+ }
+ if (reservedMap.containsKey(datasetId)) {
+ throw new IllegalStateException("Memory is already reserved for
dataset: " + datasetId);
+ }
+ final long required = getTotalSize(datasetId);
+ if (!isAllocatable(required)) {
+ return false;
+ }
+ reservedMap.put(datasetId, required);
+ available.addAndGet(-required);
+ LOGGER.info(() -> "Reserved(" + required + ") for dataset(" +
datasetId + ")");
+ return true;
+ }
+
+ @Override
+ public synchronized void cancelReserved(int datasetId) {
+ final Long reserved = reservedMap.remove(datasetId);
+ if (reserved == null) {
+ throw new IllegalStateException("No reserved memory for dataset: "
+ datasetId);
+ }
+ available.addAndGet(reserved);
+ LOGGER.info(() -> "Cancelled reserved(" + reserved + ") from dataset("
+ datasetId + ")");
+ }
+
+ @Override
+ public long getAvailable() {
+ return available.get();
+ }
+
+ @Override
+ public int getNumPages(int datasetId) {
+ return MetadataIndexImmutableProperties.isMetadataDataset(datasetId) ?
+ storageProperties.getMetadataMemoryComponentNumPages() :
+ storageProperties.getMemoryComponentNumPages();
+ }
+
+ private long getTotalSize(int datasetId) {
+ return storageProperties.getMemoryComponentPageSize() * (long)
getNumPages(datasetId);
+ }
+
+ private boolean isAllocatable(long required) {
+ return available.get() - required >= 0;
+ }
+
+ private void allocateReserved(int datasetId) {
+ final Long reserved = reservedMap.get(datasetId);
+ allocatedMap.put(datasetId, reserved);
+ }
+}
\ No newline at end of file
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
index f2f3b93..b59fe6a 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
@@ -21,6 +21,7 @@
import java.util.Map;
import org.apache.asterix.common.dataflow.DatasetLocalResource;
+import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
import org.apache.hyracks.storage.common.IIndex;
@@ -141,4 +142,8 @@
public int getDatasetID() {
return datasetInfo.getDatasetID();
}
+
+ public boolean isMetadataDataset() {
+ return
MetadataIndexImmutableProperties.isMetadataDataset(getDatasetID());
+ }
}
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetVirtualBufferCaches.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetVirtualBufferCaches.java
index c7eda4d..37a067f 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetVirtualBufferCaches.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetVirtualBufferCaches.java
@@ -33,51 +33,38 @@
public class DatasetVirtualBufferCaches {
private final int datasetID;
private final StorageProperties storageProperties;
- private final int firstAvilableUserDatasetID;
private final int numPartitions;
+ private final int numPages;
private final Map<Integer, List<IVirtualBufferCache>>
ioDeviceVirtualBufferCaches = new HashMap<>();
- public DatasetVirtualBufferCaches(int datasetID, StorageProperties
storageProperties,
- int firstAvilableUserDatasetID, int numPartitions) {
+ public DatasetVirtualBufferCaches(int datasetID, StorageProperties
storageProperties, int numPages, int numPartitions) {
this.datasetID = datasetID;
this.storageProperties = storageProperties;
- this.firstAvilableUserDatasetID = firstAvilableUserDatasetID;
this.numPartitions = numPartitions;
- }
-
- public List<IVirtualBufferCache>
initializeVirtualBufferCaches(IResourceMemoryManager memoryManager,
- int ioDeviceNum) {
- int numPages = datasetID < firstAvilableUserDatasetID
- ? storageProperties.getMetadataMemoryComponentNumPages()
- : storageProperties.getMemoryComponentNumPages();
- List<IVirtualBufferCache> vbcs = new ArrayList<>();
- for (int i = 0; i < storageProperties.getMemoryComponentsNum(); i++) {
- MultitenantVirtualBufferCache vbc = new
MultitenantVirtualBufferCache(
- new VirtualBufferCache(
- new ResourceHeapBufferAllocator(memoryManager,
- Integer.toString(datasetID)),
- storageProperties.getMemoryComponentPageSize(),
- numPages /
storageProperties.getMemoryComponentsNum() / numPartitions));
- vbcs.add(vbc);
- }
- ioDeviceVirtualBufferCaches.put(ioDeviceNum, vbcs);
- return vbcs;
+ this.numPages = numPages;
}
public List<IVirtualBufferCache>
getVirtualBufferCaches(IResourceMemoryManager memoryManager, int ioDeviceNum) {
synchronized (ioDeviceVirtualBufferCaches) {
List<IVirtualBufferCache> vbcs =
ioDeviceVirtualBufferCaches.get(ioDeviceNum);
if (vbcs == null) {
- vbcs = initializeVirtualBufferCaches(memoryManager,
ioDeviceNum);
+ vbcs = initializeVirtualBufferCaches(memoryManager,
ioDeviceNum, numPages);
}
return vbcs;
}
}
- public long getTotalSize() {
- int numPages = datasetID < firstAvilableUserDatasetID
- ? storageProperties.getMetadataMemoryComponentNumPages()
- : storageProperties.getMemoryComponentNumPages();
- return storageProperties.getMemoryComponentPageSize() * ((long)
numPages);
+ private List<IVirtualBufferCache>
initializeVirtualBufferCaches(IResourceMemoryManager memoryManager,
+ int ioDeviceNum, int numPages) {
+ List<IVirtualBufferCache> vbcs = new ArrayList<>();
+ for (int i = 0; i < storageProperties.getMemoryComponentsNum(); i++) {
+ MultitenantVirtualBufferCache vbc = new
MultitenantVirtualBufferCache(
+ new VirtualBufferCache(new
ResourceHeapBufferAllocator(memoryManager, Integer.toString(datasetID)),
+ storageProperties.getMemoryComponentPageSize(),
+ numPages /
storageProperties.getMemoryComponentsNum() / numPartitions));
+ vbcs.add(vbc);
+ }
+ ioDeviceVirtualBufferCaches.put(ioDeviceNum, vbcs);
+ return vbcs;
}
}
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/MetadataIndexImmutableProperties.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/MetadataIndexImmutableProperties.java
index 74589cc..8b4c779 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/MetadataIndexImmutableProperties.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/MetadataIndexImmutableProperties.java
@@ -49,4 +49,8 @@
public String getDatasetName() {
return indexName;
}
+
+ public static boolean isMetadataDataset(int datasetId) {
+ return datasetId < FIRST_AVAILABLE_USER_DATASET_ID;
+ }
}
diff --git
a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/DatasetMemoryManagerTest.java
b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/DatasetMemoryManagerTest.java
new file mode 100644
index 0000000..8304a80
--- /dev/null
+++
b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/DatasetMemoryManagerTest.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.context;
+
+import java.util.logging.ConsoleHandler;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.config.StorageProperties;
+import org.apache.asterix.common.context.DatasetMemoryManager;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class DatasetMemoryManagerTest {
+
+ private static final StorageProperties storageProperties;
+ private static final long GLOBAL_BUDGET = 1000L;
+ private static final long METADATA_DATASET_BUDGET = 200L;
+ private static final long DATASET_BUDGET = 400L;
+
+ static {
+ Logger root = Logger.getLogger("org.apache");
+ ConsoleHandler handler = new ConsoleHandler();
+ handler.setLevel(Level.INFO);
+ root.addHandler(handler);
+ root.setLevel(Level.INFO);
+ root.fine("just testing");
+
+ storageProperties = Mockito.mock(StorageProperties.class);
+
Mockito.when(storageProperties.getMemoryComponentGlobalBudget()).thenReturn(GLOBAL_BUDGET);
+
Mockito.when(storageProperties.getMemoryComponentNumPages()).thenReturn(8);
+
Mockito.when(storageProperties.getMetadataMemoryComponentNumPages()).thenReturn(4);
+
Mockito.when(storageProperties.getMemoryComponentPageSize()).thenReturn(50);
+ Mockito.when(storageProperties.getMemoryComponentsNum()).thenReturn(2);
+ }
+
+ @Test
+ public void allocate() {
+ DatasetMemoryManager memoryManager = new
DatasetMemoryManager(storageProperties);
+ // double allocate
+ Assert.assertTrue(memoryManager.allocate(1));
+ boolean thrown = false;
+ try {
+ memoryManager.allocate(1);
+ } catch (IllegalStateException e) {
+ Assert.assertTrue(e.getMessage().contains("already allocated"));
+ thrown = true;
+ }
+ Assert.assertTrue(thrown);
+
+ // allocate metadata and non-metadata datasets
+ Assert.assertTrue(memoryManager.allocate(400));
+
+ long expectedBudget = GLOBAL_BUDGET - METADATA_DATASET_BUDGET -
DATASET_BUDGET;
+ Assert.assertEquals(memoryManager.getAvailable(), expectedBudget);
+
+ // exceed budget
+ Assert.assertTrue(memoryManager.allocate(401));
+ Assert.assertFalse(memoryManager.allocate(402));
+ }
+
+ @Test
+ public void reserve() {
+ DatasetMemoryManager memoryManager = new
DatasetMemoryManager(storageProperties);
+ // reserve then allocate budget
+ Assert.assertTrue(memoryManager.reserve(1));
+ Assert.assertTrue(memoryManager.allocate(1));
+ long expectedBudget = GLOBAL_BUDGET - METADATA_DATASET_BUDGET;
+ Assert.assertEquals(memoryManager.getAvailable(), expectedBudget);
+
+ // double reserve
+ boolean thrown = false;
+ Assert.assertTrue(memoryManager.reserve(2));
+ try {
+ memoryManager.reserve(2);
+ } catch (IllegalStateException e) {
+ Assert.assertTrue(e.getMessage().contains("already reserved"));
+ thrown = true;
+ }
+ Assert.assertTrue(thrown);
+
+ // cancel reserved
+ memoryManager.cancelReserved(2);
+ Assert.assertEquals(memoryManager.getAvailable(), expectedBudget);
+ }
+
+ @Test
+ public void deallocate() {
+ DatasetMemoryManager memoryManager = new
DatasetMemoryManager(storageProperties);
+ // deallocate reserved
+ Assert.assertTrue(memoryManager.reserve(200));
+ Assert.assertTrue(memoryManager.allocate(200));
+ memoryManager.deallocate(200);
+ long expectedBudget = GLOBAL_BUDGET - DATASET_BUDGET;
+ Assert.assertEquals(memoryManager.getAvailable(), expectedBudget);
+
+ // deallocate not allocated
+ boolean thrown = false;
+ try {
+ memoryManager.deallocate(1);
+ } catch (IllegalStateException e) {
+ Assert.assertTrue(e.getMessage().contains("No allocated"));
+ thrown = true;
+ }
+ Assert.assertTrue(thrown);
+
+ // double deallocate
+ memoryManager.allocate(2);
+ memoryManager.deallocate(2);
+ thrown = false;
+ try {
+ memoryManager.deallocate(2);
+ } catch (IllegalStateException e) {
+ Assert.assertTrue(e.getMessage().contains("No allocated"));
+ thrown = true;
+ }
+ Assert.assertTrue(thrown);
+ }
+}
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
index dc38749..af1aa75 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -302,6 +302,11 @@
*/
public static void enlistMetadataDataset(INCServiceContext ncServiceCtx,
IMetadataIndex index)
throws HyracksDataException {
+ final int datasetId = index.getDatasetId().getId();
+ // reserve memory for metadata dataset to ensure it can be opened when
needed
+ if
(!appContext.getDatasetMemoryManager().reserve(index.getDatasetId().getId())) {
+ throw new IllegalStateException("Failed to reserve memory for
metadata dataset(" + datasetId + ")");
+ }
ClusterPartition metadataPartition =
appContext.getMetadataProperties().getMetadataPartition();
int metadataDeviceId = metadataPartition.getIODeviceNum();
String metadataPartitionPath =
StoragePathUtil.prepareStoragePartitionPath(
@@ -317,20 +322,20 @@
// We are unable to do this since IStorageManager needs a dataset to
determine the appropriate
// objects
ILSMOperationTrackerFactory opTrackerFactory =
- index.isPrimaryIndex() ? new
PrimaryIndexOperationTrackerFactory(index.getDatasetId().getId())
- : new
SecondaryIndexOperationTrackerFactory(index.getDatasetId().getId());
+ index.isPrimaryIndex() ? new
PrimaryIndexOperationTrackerFactory(datasetId)
+ : new SecondaryIndexOperationTrackerFactory(datasetId);
ILSMIOOperationCallbackFactory ioOpCallbackFactory =
LSMBTreeIOOperationCallbackFactory.INSTANCE;
IStorageComponentProvider storageComponentProvider =
appContext.getStorageComponentProvider();
if (isNewUniverse()) {
LSMBTreeLocalResourceFactory lsmBtreeFactory = new
LSMBTreeLocalResourceFactory(
storageComponentProvider.getStorageManager(), typeTraits,
cmpFactories, null, null, null,
opTrackerFactory, ioOpCallbackFactory,
storageComponentProvider.getMetadataPageManagerFactory(),
- new
AsterixVirtualBufferCacheProvider(index.getDatasetId().getId()),
+ new AsterixVirtualBufferCacheProvider(datasetId),
storageComponentProvider.getIoOperationSchedulerProvider(),
appContext.getMetadataMergePolicyFactory(),
GlobalConfig.DEFAULT_COMPACTION_POLICY_PROPERTIES, true,
bloomFilterKeyFields,
appContext.getBloomFilterFalsePositiveRate(), true, null);
DatasetLocalResourceFactory dsLocalResourceFactory =
- new
DatasetLocalResourceFactory(index.getDatasetId().getId(), lsmBtreeFactory);
+ new DatasetLocalResourceFactory(datasetId,
lsmBtreeFactory);
// TODO(amoudi) Creating the index should be done through the same
code path as other indexes
// This is to be done by having a metadata dataset associated with
each index
IIndexBuilder indexBuilder = new IndexBuilder(ncServiceCtx,
storageComponentProvider.getStorageManager(),
diff --git
a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/annotations/ThreadSafe.java
b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/annotations/ThreadSafe.java
new file mode 100644
index 0000000..2766c37
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/annotations/ThreadSafe.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.util.annotations;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * The type to which this annotation is applied is thread-safe.
+ */
+@Documented
+@Target(ElementType.TYPE)
+@Retention(RetentionPolicy.SOURCE)
+public @interface ThreadSafe {
+}
\ No newline at end of file
--
To view, visit https://asterix-gerrit.ics.uci.edu/2112
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: Ica76b3c8eca6f7d2ad1d962fb5ef84267c258571
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Murtadha Hubail <[email protected]>