Murtadha Hubail has submitted this change and it was merged. Change subject: [ASTERIXDB-2081][STO] Introduce DatasetMemoryManager ......................................................................
[ASTERIXDB-2081][STO] Introduce DatasetMemoryManager - user model changes: no - storage format changes: no - interface changes: yes Added IDatasetMemoryManager to manage datasets memory reservation and allocation. Details: - Reserve metadata datasets memory to allow them to be opened when needed. - Add UngracefulShutdownNCApplication to force recovery to run on AsterixHyracksIntegrationUtil. - Refactor the use of firstAvilableUserDatasetID to check for metadata datasets. - Add ThreadSafe annotation. - Add test case for RecoveryManager after creating multiple datasets. Change-Id: Ica76b3c8eca6f7d2ad1d962fb5ef84267c258571 Reviewed-on: https://asterix-gerrit.ics.uci.edu/2112 Sonar-Qube: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Contrib: Jenkins <[email protected]> Reviewed-by: Michael Blow <[email protected]> Integration-Tests: Jenkins <[email protected]> --- M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java A asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/RecoveryManagerTest.java A asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetMemoryManager.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java A asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetMemoryManager.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetVirtualBufferCaches.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/MetadataIndexImmutableProperties.java A asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/DatasetMemoryManagerTest.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java A hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/annotations/ThreadSafe.java 13 files changed, 568 insertions(+), 65 deletions(-) Approvals: Jenkins: Verified; No violations found; ; Verified Michael Blow: Looks good to me, approved diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java index ecf25eb..50c3ff6 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java @@ -63,6 +63,7 @@ public ClusterControllerService cc; public NodeControllerService[] ncs = new NodeControllerService[0]; public IHyracksClientConnection hcc; + protected boolean gracefulShutdown = true; private static final String DEFAULT_STORAGE_PATH = joinPath("target", "io", "dir"); private static String storagePath = DEFAULT_STORAGE_PATH; @@ -158,6 +159,9 @@ } protected INCApplication createNCApplication() { + if (!gracefulShutdown) { + return new UngracefulShutdownNCApplication(); + } return new NCApplication(); } @@ -227,6 +231,10 @@ storagePath = path; } + public void setGracefulShutdown(boolean gracefulShutdown) { + this.gracefulShutdown = gracefulShutdown; + } + public static void restoreDefaultStoragePath() { storagePath = DEFAULT_STORAGE_PATH; } @@ -288,4 +296,11 @@ Thread.sleep(10000); } } + + private class UngracefulShutdownNCApplication extends NCApplication { + @Override + public void stop() throws Exception { + // ungraceful shutdown + } + } } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java index 5370c03..7b08f68 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java @@ -32,6 +32,7 @@ import org.apache.asterix.active.ActiveManager; import org.apache.asterix.api.common.AppRuntimeContextProviderForRecovery; import org.apache.asterix.common.api.IDatasetLifecycleManager; +import org.apache.asterix.common.api.IDatasetMemoryManager; import org.apache.asterix.common.api.INcApplicationContext; import org.apache.asterix.common.cluster.ClusterPartition; import org.apache.asterix.common.config.ActiveProperties; @@ -48,11 +49,11 @@ import org.apache.asterix.common.config.StorageProperties; import org.apache.asterix.common.config.TransactionProperties; import org.apache.asterix.common.context.DatasetLifecycleManager; +import org.apache.asterix.common.context.DatasetMemoryManager; import org.apache.asterix.common.context.IStorageComponentProvider; import org.apache.asterix.common.exceptions.ACIDException; import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.common.library.ILibraryManager; -import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties; import org.apache.asterix.common.replication.IRemoteRecoveryManager; import org.apache.asterix.common.replication.IReplicaResourcesManager; import org.apache.asterix.common.replication.IReplicationChannel; @@ -119,6 +120,7 @@ private MessagingProperties messagingProperties; private final NodeProperties nodeProperties; private ExecutorService threadExecutor; + private IDatasetMemoryManager datasetMemoryManager; private IDatasetLifecycleManager datasetLifecycleManager; private IBufferCache bufferCache; private ITransactionSubsystem txnSubsystem; @@ -198,9 +200,10 @@ localResourceRepository.deleteStorageData(true); } - datasetLifecycleManager = new DatasetLifecycleManager(storageProperties, localResourceRepository, - MetadataIndexImmutableProperties.FIRST_AVAILABLE_USER_DATASET_ID, txnSubsystem.getLogManager(), - ioManager.getIODevices().size()); + datasetMemoryManager = new DatasetMemoryManager(storageProperties); + datasetLifecycleManager = + new DatasetLifecycleManager(storageProperties, localResourceRepository, txnSubsystem.getLogManager(), + datasetMemoryManager, ioManager.getIODevices().size()); isShuttingdown = false; @@ -316,6 +319,11 @@ } @Override + public IDatasetMemoryManager getDatasetMemoryManager() { + return datasetMemoryManager; + } + + @Override public double getBloomFilterFalsePositiveRate() { return storageProperties.getBloomFilterFalsePositiveRate(); } diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/RecoveryManagerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/RecoveryManagerTest.java new file mode 100644 index 0000000..723786c --- /dev/null +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/RecoveryManagerTest.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.test.txn; + +import java.io.File; +import java.io.InputStream; +import java.util.Random; + +import org.apache.asterix.api.common.AsterixHyracksIntegrationUtil; +import org.apache.asterix.common.config.GlobalConfig; +import org.apache.asterix.common.configuration.AsterixConfiguration; +import org.apache.asterix.common.configuration.Property; +import org.apache.asterix.common.utils.Servlets; +import org.apache.asterix.test.common.TestExecutor; +import org.apache.asterix.test.common.TestHelper; +import org.apache.asterix.testframework.context.TestCaseContext; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; + +public class RecoveryManagerTest { + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final String DEFAULT_TEST_CONFIG_FILE_NAME = "asterix-build-configuration.xml"; + private static final String TEST_CONFIG_FILE_NAME = "asterix-test-configuration.xml"; + private static final String TEST_CONFIG_PATH = + System.getProperty("user.dir") + File.separator + "target" + File.separator + "config"; + private static final String TEST_CONFIG_FILE_PATH = TEST_CONFIG_PATH + File.separator + TEST_CONFIG_FILE_NAME; + private static final TestExecutor testExecutor = new TestExecutor(); + private static final AsterixHyracksIntegrationUtil integrationUtil = new AsterixHyracksIntegrationUtil(); + private static final Random random = new Random(); + private static final int numRecords = 1; + + @Before + public void setUp() throws Exception { + // Read default test configurations + AsterixConfiguration ac = TestHelper.getConfigurations(DEFAULT_TEST_CONFIG_FILE_NAME); + // override memory config to enforce dataset eviction + ac.getProperty().add(new Property("storage.memorycomponent.globalbudget", "128MB", "")); + ac.getProperty().add(new Property("storage.memorycomponent.numpages", "32", "")); + // Write test config file + TestHelper.writeConfigurations(ac, TEST_CONFIG_FILE_PATH); + System.setProperty(GlobalConfig.CONFIG_FILE_PROPERTY, TEST_CONFIG_FILE_PATH); + integrationUtil.setGracefulShutdown(false); + integrationUtil.init(true); + } + + @After + public void tearDown() throws Exception { + integrationUtil.deinit(true); + } + + @Test + public void multiDatasetRecovery() throws Exception { + String datasetNamePrefix = "ds_"; + final TestCaseContext.OutputFormat format = TestCaseContext.OutputFormat.CLEAN_JSON; + testExecutor.executeSqlppUpdateOrDdl("CREATE TYPE KeyType AS { id: int };", format); + int numDatasets = 50; + String datasetName = null; + for (int i = 1; i <= numDatasets; i++) { + datasetName = datasetNamePrefix + i; + testExecutor.executeSqlppUpdateOrDdl("CREATE DATASET " + datasetName + "(KeyType) PRIMARY KEY id;", format); + insertData(datasetName); + } + // do ungraceful shutdown to enforce recovery + integrationUtil.deinit(false); + integrationUtil.init(false); + validateRecovery(datasetName); + + // create more datasets after recovery + numDatasets = 100; + for (int i = 51; i <= numDatasets; i++) { + datasetName = datasetNamePrefix + i; + testExecutor.executeSqlppUpdateOrDdl("CREATE DATASET " + datasetName + "(KeyType) PRIMARY KEY id;", format); + insertData(datasetName); + } + // do ungraceful shutdown to enforce recovery again + integrationUtil.deinit(false); + integrationUtil.init(false); + validateRecovery(datasetName); + } + + private void insertData(String datasetName) throws Exception { + for (int i = 0; i < numRecords; i++) { + testExecutor.executeSqlppUpdateOrDdl("UPSERT INTO " + datasetName + " ({\"id\": " + random.nextInt() + "})", + TestCaseContext.OutputFormat.CLEAN_JSON); + } + } + + private void validateRecovery(String datasetName) throws Exception { + final String query = "select value count(*) from `" + datasetName + "`;"; + final InputStream inputStream = testExecutor + .executeQueryService(query, testExecutor.getEndpoint(Servlets.QUERY_SERVICE), + TestCaseContext.OutputFormat.CLEAN_JSON); + final ObjectNode jsonNodes = OBJECT_MAPPER.readValue(inputStream, ObjectNode.class); + JsonNode result = jsonNodes.get("results"); + // make sure there is result + Assert.assertEquals(1, result.size()); + for (int i = 0; i < result.size(); i++) { + JsonNode json = result.get(i); + Assert.assertEquals(numRecords, json.asInt()); + } + } +} \ No newline at end of file diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetMemoryManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetMemoryManager.java new file mode 100644 index 0000000..fde2c80 --- /dev/null +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetMemoryManager.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.common.api; + +public interface IDatasetMemoryManager { + + /** + * Allocates memory for dataset {@code datasetId}. + * + * @param datasetId + * @return true, if the allocation is successful, otherwise false. + */ + boolean allocate(int datasetId); + + /** + * Deallocates memory of dataset {@code datasetId}. + * + * @param datasetId + */ + void deallocate(int datasetId); + + /** + * Reserves memory for dataset {@code datasetId}. The reserved memory + * is guaranteed to be allocatable when needed for the dataset. Reserve + * maybe called after allocation to reserve the allocated budget + * on deallocation. + * + * @param datasetId + * @return true, if the allocation is successful, otherwise false. + */ + boolean reserve(int datasetId); + + /** + * Cancels the reserved memory for dataset {@code datasetId}. + * + * @param datasetId + */ + void cancelReserved(int datasetId); + + /** + * @return The remaining memory budget that can be used for datasets. + */ + long getAvailable(); + + /** + * @param datasetId + * @return The number of virtual buffer cache pages that should be allocated for dataset {@code datasetId}. + */ + int getNumPages(int datasetId); +} diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java index d4b9a92..548907c 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java @@ -62,6 +62,8 @@ IDatasetLifecycleManager getDatasetLifecycleManager(); + IDatasetMemoryManager getDatasetMemoryManager(); + IResourceIdFactory getResourceIdFactory(); ILSMOperationTracker getLSMBTreeOperationTracker(int datasetID); diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java index 545382a..e79f002 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java @@ -29,6 +29,7 @@ import java.util.logging.Logger; import org.apache.asterix.common.api.IDatasetLifecycleManager; +import org.apache.asterix.common.api.IDatasetMemoryManager; import org.apache.asterix.common.config.StorageProperties; import org.apache.asterix.common.dataflow.DatasetLocalResource; import org.apache.asterix.common.exceptions.ACIDException; @@ -51,27 +52,24 @@ import org.apache.hyracks.storage.common.LocalResource; public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeCycleComponent { + private static final Logger LOGGER = Logger.getLogger(DatasetLifecycleManager.class.getName()); private final Map<Integer, DatasetResource> datasets = new ConcurrentHashMap<>(); private final StorageProperties storageProperties; private final ILocalResourceRepository resourceRepository; - private final int firstAvilableUserDatasetID; - private final long capacity; - private long used; + private final IDatasetMemoryManager memoryManager; private final ILogManager logManager; private final LogRecord logRecord; private final int numPartitions; private volatile boolean stopped = false; public DatasetLifecycleManager(StorageProperties storageProperties, ILocalResourceRepository resourceRepository, - int firstAvilableUserDatasetID, ILogManager logManager, int numPartitions) { + ILogManager logManager, IDatasetMemoryManager memoryManager, int numPartitions) { this.logManager = logManager; this.storageProperties = storageProperties; this.resourceRepository = resourceRepository; - this.firstAvilableUserDatasetID = firstAvilableUserDatasetID; + this.memoryManager = memoryManager; this.numPartitions = numPartitions; - capacity = storageProperties.getMemoryComponentGlobalBudget(); - used = 0; logRecord = new LogRecord(); } @@ -200,9 +198,10 @@ for (DatasetResource dsr : datasetsResources) { PrimaryIndexOperationTracker opTracker = dsr.getOpTracker(); if (opTracker != null && opTracker.getNumActiveOperations() == 0 - && dsr.getDatasetInfo().getReferenceCount() == 0 && dsr.getDatasetInfo().isOpen() - && dsr.getDatasetInfo().getDatasetID() >= getFirstAvilableUserDatasetID()) { + && dsr.getDatasetInfo().getReferenceCount() == 0 && dsr.getDatasetInfo().isOpen() && !dsr + .isMetadataDataset()) { closeDataset(dsr.getDatasetInfo()); + LOGGER.info(() -> "Evicted Dataset" + dsr.getDatasetID()); return true; } } @@ -230,8 +229,9 @@ if (dsr == null) { DatasetInfo dsInfo = new DatasetInfo(did); PrimaryIndexOperationTracker opTracker = new PrimaryIndexOperationTracker(did, logManager, dsInfo); - DatasetVirtualBufferCaches vbcs = new DatasetVirtualBufferCaches(did, storageProperties, - getFirstAvilableUserDatasetID(), getNumPartitions()); + DatasetVirtualBufferCaches vbcs = + new DatasetVirtualBufferCaches(did, storageProperties, memoryManager.getNumPages(did), + numPartitions); dsr = new DatasetResource(dsInfo, opTracker, vbcs); datasets.put(did, dsr); } @@ -322,8 +322,8 @@ } @Override - public synchronized void start() { - used = 0; + public void start() { + // no op } @Override @@ -449,7 +449,7 @@ public synchronized void closeUserDatasets() throws HyracksDataException { ArrayList<DatasetResource> openDatasets = new ArrayList<>(datasets.values()); for (DatasetResource dsr : openDatasets) { - if (dsr.getDatasetID() >= getFirstAvilableUserDatasetID()) { + if (!dsr.isMetadataDataset()) { closeDataset(dsr.getDatasetInfo()); } } @@ -474,8 +474,8 @@ public void dumpState(OutputStream outputStream) throws IOException { StringBuilder sb = new StringBuilder(); - sb.append(String.format("Memory budget = %d\n", capacity)); - sb.append(String.format("Memory used = %d\n", used)); + sb.append(String.format("Memory budget = %d%n", storageProperties.getMemoryComponentGlobalBudget())); + sb.append(String.format("Memory available = %d%n", memoryManager.getAvailable())); sb.append("\n"); String dsHeaderFormat = "%-10s %-6s %-16s %-12s\n"; @@ -515,7 +515,7 @@ } synchronized (dsInfo) { if (dsInfo.isOpen() && dsInfo.isMemoryAllocated()) { - used -= getVirtualBufferCaches(dsInfo.getDatasetID()).getTotalSize(); + memoryManager.deallocate(datasetId); dsInfo.setMemoryAllocated(false); } } @@ -534,25 +534,15 @@ synchronized (dsInfo) { // This is not needed for external datasets' indexes since they never use the virtual buffer cache. if (!dsInfo.isMemoryAllocated() && !dsInfo.isExternal()) { - long additionalSize = getVirtualBufferCaches(dsInfo.getDatasetID()).getTotalSize(); - while (used + additionalSize > capacity) { + while (!memoryManager.allocate(datasetId)) { if (!evictCandidateDataset()) { throw new HyracksDataException("Cannot allocate dataset " + dsInfo.getDatasetID() + " memory since memory budget would be exceeded."); } } - used += additionalSize; dsInfo.setMemoryAllocated(true); } } - } - - public int getFirstAvilableUserDatasetID() { - return firstAvilableUserDatasetID; - } - - public int getNumPartitions() { - return numPartitions; } @Override diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetMemoryManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetMemoryManager.java new file mode 100644 index 0000000..88f406e --- /dev/null +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetMemoryManager.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.common.context; + +import java.util.HashMap; +import java.util.Map; +import java.util.logging.Logger; + +import org.apache.asterix.common.api.IDatasetMemoryManager; +import org.apache.asterix.common.config.StorageProperties; +import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties; +import org.apache.hyracks.util.annotations.ThreadSafe; + +@ThreadSafe +public class DatasetMemoryManager implements IDatasetMemoryManager { + + private static final Logger LOGGER = Logger.getLogger(DatasetMemoryManager.class.getName()); + private final Map<Integer, Long> allocatedMap = new HashMap<>(); + private final Map<Integer, Long> reservedMap = new HashMap<>(); + private long available; + private final StorageProperties storageProperties; + + public DatasetMemoryManager(StorageProperties storageProperties) { + this.storageProperties = storageProperties; + available = storageProperties.getMemoryComponentGlobalBudget(); + } + + @Override + public synchronized boolean allocate(int datasetId) { + if (allocatedMap.containsKey(datasetId)) { + throw new IllegalStateException("Memory is already allocated for dataset: " + datasetId); + } + if (reservedMap.containsKey(datasetId)) { + allocateReserved(datasetId); + return true; + } + final long required = getTotalSize(datasetId); + if (!isAllocatable(required)) { + return false; + } + allocatedMap.put(datasetId, required); + available -= required; + LOGGER.info(() -> "Allocated(" + required + ") for dataset(" + datasetId + ")"); + return true; + } + + @Override + public synchronized void deallocate(int datasetId) { + if (!allocatedMap.containsKey(datasetId) && !reservedMap.containsKey(datasetId)) { + throw new IllegalStateException("No allocated or reserved memory for dataset: " + datasetId); + } + final Long allocated = allocatedMap.remove(datasetId); + // return the allocated budget if it is not reserved + if (allocated != null && !reservedMap.containsKey(datasetId)) { + available += allocated; + LOGGER.info(() -> "Deallocated(" + allocated + ") from dataset(" + datasetId + ")"); + } + } + + @Override + public synchronized boolean reserve(int datasetId) { + if (reservedMap.containsKey(datasetId)) { + throw new IllegalStateException("Memory is already reserved for dataset: " + datasetId); + } + final long required = getTotalSize(datasetId); + if (!isAllocatable(required) && !allocatedMap.containsKey(datasetId)) { + return false; + } + reservedMap.put(datasetId, required); + // if the budget is already allocated, no need to reserve it again + if (!allocatedMap.containsKey(datasetId)) { + available -= required; + } + LOGGER.info(() -> "Reserved(" + required + ") for dataset(" + datasetId + ")"); + return true; + } + + @Override + public synchronized void cancelReserved(int datasetId) { + final Long reserved = reservedMap.remove(datasetId); + if (reserved == null) { + throw new IllegalStateException("No reserved memory for dataset: " + datasetId); + } + available += reserved; + LOGGER.info(() -> "Cancelled reserved(" + reserved + ") from dataset(" + datasetId + ")"); + } + + @Override + public long getAvailable() { + return available; + } + + @Override + public int getNumPages(int datasetId) { + return MetadataIndexImmutableProperties.isMetadataDataset(datasetId) ? + storageProperties.getMetadataMemoryComponentNumPages() : + storageProperties.getMemoryComponentNumPages(); + } + + private long getTotalSize(int datasetId) { + return storageProperties.getMemoryComponentPageSize() * (long) getNumPages(datasetId); + } + + private boolean isAllocatable(long required) { + return available - required >= 0; + } + + private void allocateReserved(int datasetId) { + final Long reserved = reservedMap.get(datasetId); + allocatedMap.put(datasetId, reserved); + } +} \ No newline at end of file diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java index f2f3b93..b59fe6a 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java @@ -21,6 +21,7 @@ import java.util.Map; import org.apache.asterix.common.dataflow.DatasetLocalResource; +import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; import org.apache.hyracks.storage.common.IIndex; @@ -141,4 +142,8 @@ public int getDatasetID() { return datasetInfo.getDatasetID(); } + + public boolean isMetadataDataset() { + return MetadataIndexImmutableProperties.isMetadataDataset(getDatasetID()); + } } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetVirtualBufferCaches.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetVirtualBufferCaches.java index c7eda4d..c9b9698 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetVirtualBufferCaches.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetVirtualBufferCaches.java @@ -33,51 +33,39 @@ public class DatasetVirtualBufferCaches { private final int datasetID; private final StorageProperties storageProperties; - private final int firstAvilableUserDatasetID; private final int numPartitions; + private final int numPages; private final Map<Integer, List<IVirtualBufferCache>> ioDeviceVirtualBufferCaches = new HashMap<>(); - public DatasetVirtualBufferCaches(int datasetID, StorageProperties storageProperties, - int firstAvilableUserDatasetID, int numPartitions) { + public DatasetVirtualBufferCaches(int datasetID, StorageProperties storageProperties, int numPages, + int numPartitions) { this.datasetID = datasetID; this.storageProperties = storageProperties; - this.firstAvilableUserDatasetID = firstAvilableUserDatasetID; this.numPartitions = numPartitions; - } - - public List<IVirtualBufferCache> initializeVirtualBufferCaches(IResourceMemoryManager memoryManager, - int ioDeviceNum) { - int numPages = datasetID < firstAvilableUserDatasetID - ? storageProperties.getMetadataMemoryComponentNumPages() - : storageProperties.getMemoryComponentNumPages(); - List<IVirtualBufferCache> vbcs = new ArrayList<>(); - for (int i = 0; i < storageProperties.getMemoryComponentsNum(); i++) { - MultitenantVirtualBufferCache vbc = new MultitenantVirtualBufferCache( - new VirtualBufferCache( - new ResourceHeapBufferAllocator(memoryManager, - Integer.toString(datasetID)), - storageProperties.getMemoryComponentPageSize(), - numPages / storageProperties.getMemoryComponentsNum() / numPartitions)); - vbcs.add(vbc); - } - ioDeviceVirtualBufferCaches.put(ioDeviceNum, vbcs); - return vbcs; + this.numPages = numPages; } public List<IVirtualBufferCache> getVirtualBufferCaches(IResourceMemoryManager memoryManager, int ioDeviceNum) { synchronized (ioDeviceVirtualBufferCaches) { List<IVirtualBufferCache> vbcs = ioDeviceVirtualBufferCaches.get(ioDeviceNum); if (vbcs == null) { - vbcs = initializeVirtualBufferCaches(memoryManager, ioDeviceNum); + vbcs = initializeVirtualBufferCaches(memoryManager, ioDeviceNum, numPages); } return vbcs; } } - public long getTotalSize() { - int numPages = datasetID < firstAvilableUserDatasetID - ? storageProperties.getMetadataMemoryComponentNumPages() - : storageProperties.getMemoryComponentNumPages(); - return storageProperties.getMemoryComponentPageSize() * ((long) numPages); + private List<IVirtualBufferCache> initializeVirtualBufferCaches(IResourceMemoryManager memoryManager, + int ioDeviceNum, int numPages) { + List<IVirtualBufferCache> vbcs = new ArrayList<>(); + for (int i = 0; i < storageProperties.getMemoryComponentsNum(); i++) { + MultitenantVirtualBufferCache vbc = new MultitenantVirtualBufferCache( + new VirtualBufferCache(new ResourceHeapBufferAllocator(memoryManager, Integer.toString(datasetID)), + storageProperties.getMemoryComponentPageSize(), + numPages / storageProperties.getMemoryComponentsNum() / numPartitions)); + vbcs.add(vbc); + } + ioDeviceVirtualBufferCaches.put(ioDeviceNum, vbcs); + return vbcs; } } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/MetadataIndexImmutableProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/MetadataIndexImmutableProperties.java index 74589cc..8b4c779 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/MetadataIndexImmutableProperties.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/MetadataIndexImmutableProperties.java @@ -49,4 +49,8 @@ public String getDatasetName() { return indexName; } + + public static boolean isMetadataDataset(int datasetId) { + return datasetId < FIRST_AVAILABLE_USER_DATASET_ID; + } } diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/DatasetMemoryManagerTest.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/DatasetMemoryManagerTest.java new file mode 100644 index 0000000..b8e3604 --- /dev/null +++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/DatasetMemoryManagerTest.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.test.context; + +import org.apache.asterix.common.config.StorageProperties; +import org.apache.asterix.common.context.DatasetMemoryManager; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; + +public class DatasetMemoryManagerTest { + + private static final StorageProperties storageProperties; + private static final long GLOBAL_BUDGET = 1000L; + private static final long METADATA_DATASET_BUDGET = 200L; + private static final long DATASET_BUDGET = 400L; + + static { + storageProperties = Mockito.mock(StorageProperties.class); + Mockito.when(storageProperties.getMemoryComponentGlobalBudget()).thenReturn(GLOBAL_BUDGET); + Mockito.when(storageProperties.getMemoryComponentNumPages()).thenReturn(8); + Mockito.when(storageProperties.getMetadataMemoryComponentNumPages()).thenReturn(4); + Mockito.when(storageProperties.getMemoryComponentPageSize()).thenReturn(50); + Mockito.when(storageProperties.getMemoryComponentsNum()).thenReturn(2); + } + + @Test + public void allocate() { + DatasetMemoryManager memoryManager = new DatasetMemoryManager(storageProperties); + // double allocate + Assert.assertTrue(memoryManager.allocate(1)); + boolean thrown = false; + try { + memoryManager.allocate(1); + } catch (IllegalStateException e) { + Assert.assertTrue(e.getMessage().contains("already allocated")); + thrown = true; + } + Assert.assertTrue(thrown); + + // allocate metadata and non-metadata datasets + Assert.assertTrue(memoryManager.allocate(400)); + + long expectedBudget = GLOBAL_BUDGET - METADATA_DATASET_BUDGET - DATASET_BUDGET; + Assert.assertEquals(memoryManager.getAvailable(), expectedBudget); + + // reserve after allocate shouldn't allocate the budget again + Assert.assertTrue(memoryManager.allocate(401)); + Assert.assertTrue(memoryManager.reserve(401)); + + // deallocate should still keep the reserved memory + memoryManager.deallocate(401); + expectedBudget = GLOBAL_BUDGET - METADATA_DATASET_BUDGET - (DATASET_BUDGET * 2); + Assert.assertEquals(memoryManager.getAvailable(), expectedBudget); + + // exceed budget should return false + Assert.assertFalse(memoryManager.allocate(402)); + } + + @Test + public void reserve() { + DatasetMemoryManager memoryManager = new DatasetMemoryManager(storageProperties); + // reserve then allocate budget + Assert.assertTrue(memoryManager.reserve(1)); + Assert.assertTrue(memoryManager.allocate(1)); + long expectedBudget = GLOBAL_BUDGET - METADATA_DATASET_BUDGET; + Assert.assertEquals(memoryManager.getAvailable(), expectedBudget); + + // double reserve + boolean thrown = false; + Assert.assertTrue(memoryManager.reserve(2)); + try { + memoryManager.reserve(2); + } catch (IllegalStateException e) { + Assert.assertTrue(e.getMessage().contains("already reserved")); + thrown = true; + } + Assert.assertTrue(thrown); + + // cancel reserved + memoryManager.cancelReserved(2); + Assert.assertEquals(memoryManager.getAvailable(), expectedBudget); + } + + @Test + public void deallocate() { + DatasetMemoryManager memoryManager = new DatasetMemoryManager(storageProperties); + // deallocate reserved + Assert.assertTrue(memoryManager.reserve(200)); + Assert.assertTrue(memoryManager.allocate(200)); + memoryManager.deallocate(200); + long expectedBudget = GLOBAL_BUDGET - DATASET_BUDGET; + Assert.assertEquals(memoryManager.getAvailable(), expectedBudget); + + // deallocate not allocated + boolean thrown = false; + try { + memoryManager.deallocate(1); + } catch (IllegalStateException e) { + Assert.assertTrue(e.getMessage().contains("No allocated")); + thrown = true; + } + Assert.assertTrue(thrown); + + // double deallocate + memoryManager.allocate(2); + memoryManager.deallocate(2); + thrown = false; + try { + memoryManager.deallocate(2); + } catch (IllegalStateException e) { + Assert.assertTrue(e.getMessage().contains("No allocated")); + thrown = true; + } + Assert.assertTrue(thrown); + } +} diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java index dc38749..a6f1ad0 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java @@ -302,6 +302,11 @@ */ public static void enlistMetadataDataset(INCServiceContext ncServiceCtx, IMetadataIndex index) throws HyracksDataException { + final int datasetId = index.getDatasetId().getId(); + // reserve memory for metadata dataset to ensure it can be opened when needed + if (!appContext.getDatasetMemoryManager().reserve(index.getDatasetId().getId())) { + throw new IllegalStateException("Failed to reserve memory for metadata dataset (" + datasetId + ")"); + } ClusterPartition metadataPartition = appContext.getMetadataProperties().getMetadataPartition(); int metadataDeviceId = metadataPartition.getIODeviceNum(); String metadataPartitionPath = StoragePathUtil.prepareStoragePartitionPath( @@ -317,20 +322,20 @@ // We are unable to do this since IStorageManager needs a dataset to determine the appropriate // objects ILSMOperationTrackerFactory opTrackerFactory = - index.isPrimaryIndex() ? new PrimaryIndexOperationTrackerFactory(index.getDatasetId().getId()) - : new SecondaryIndexOperationTrackerFactory(index.getDatasetId().getId()); + index.isPrimaryIndex() ? new PrimaryIndexOperationTrackerFactory(datasetId) + : new SecondaryIndexOperationTrackerFactory(datasetId); ILSMIOOperationCallbackFactory ioOpCallbackFactory = LSMBTreeIOOperationCallbackFactory.INSTANCE; IStorageComponentProvider storageComponentProvider = appContext.getStorageComponentProvider(); if (isNewUniverse()) { LSMBTreeLocalResourceFactory lsmBtreeFactory = new LSMBTreeLocalResourceFactory( storageComponentProvider.getStorageManager(), typeTraits, cmpFactories, null, null, null, opTrackerFactory, ioOpCallbackFactory, storageComponentProvider.getMetadataPageManagerFactory(), - new AsterixVirtualBufferCacheProvider(index.getDatasetId().getId()), + new AsterixVirtualBufferCacheProvider(datasetId), storageComponentProvider.getIoOperationSchedulerProvider(), appContext.getMetadataMergePolicyFactory(), GlobalConfig.DEFAULT_COMPACTION_POLICY_PROPERTIES, true, bloomFilterKeyFields, appContext.getBloomFilterFalsePositiveRate(), true, null); DatasetLocalResourceFactory dsLocalResourceFactory = - new DatasetLocalResourceFactory(index.getDatasetId().getId(), lsmBtreeFactory); + new DatasetLocalResourceFactory(datasetId, lsmBtreeFactory); // TODO(amoudi) Creating the index should be done through the same code path as other indexes // This is to be done by having a metadata dataset associated with each index IIndexBuilder indexBuilder = new IndexBuilder(ncServiceCtx, storageComponentProvider.getStorageManager(), diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/annotations/ThreadSafe.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/annotations/ThreadSafe.java new file mode 100644 index 0000000..2766c37 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/annotations/ThreadSafe.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.util.annotations; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * The type to which this annotation is applied is thread-safe. + */ +@Documented +@Target(ElementType.TYPE) +@Retention(RetentionPolicy.SOURCE) +public @interface ThreadSafe { +} \ No newline at end of file -- To view, visit https://asterix-gerrit.ics.uci.edu/2112 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: Ica76b3c8eca6f7d2ad1d962fb5ef84267c258571 Gerrit-PatchSet: 6 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Murtadha Hubail <[email protected]> Gerrit-Reviewer: Ian Maxon <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Michael Blow <[email protected]> Gerrit-Reviewer: Murtadha Hubail <[email protected]> Gerrit-Reviewer: Till Westmann <[email protected]> Gerrit-Reviewer: abdullah alamoudi <[email protected]>
