Michael Blow has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/705
Change subject: ASTERIXDB-1337: Dataset Memory Management on Multi-Partition NC
......................................................................
ASTERIXDB-1337: Dataset Memory Management on Multi-Partition NC
As sugggested in ASTERIXDB-1337, this change is to maintain a per-
partition MultitenantVirtualBufferCache budget, as opposed to sharing
the budget across the dataset.
Change-Id: Ibbf08f532c1210c30be6a51c73570a789174213b
---
M
asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java
M
asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContextProdiverForRecovery.java
M
asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java
M
asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
M
asterix-common/src/main/java/org/apache/asterix/common/context/AsterixVirtualBufferCacheProvider.java
M
asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
M
asterix-common/src/main/java/org/apache/asterix/common/transactions/IAsterixAppRuntimeContextProvider.java
M
asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
M
asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/LSMBTreeLocalResourceMetadata.java
M
asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/LSMInvertedIndexLocalResourceMetadata.java
M
asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/LSMRTreeLocalResourceMetadata.java
M
asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/TestRuntimeContextProvider.java
12 files changed, 92 insertions(+), 57 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/05/705/1
diff --git
a/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java
b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java
index 5532e79..905854d 100644
---
a/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java
+++
b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java
@@ -189,7 +189,8 @@
initializeResourceIdFactory();
datasetLifecycleManager = new
DatasetLifecycleManager(storageProperties, localResourceRepository,
-
MetadataIndexImmutableProperties.FIRST_AVAILABLE_USER_DATASET_ID,
txnSubsystem.getLogManager());
+
MetadataIndexImmutableProperties.FIRST_AVAILABLE_USER_DATASET_ID,
txnSubsystem.getLogManager(),
+ ioManager.getIODevices().size());
isShuttingdown = false;
@@ -361,8 +362,8 @@
}
@Override
- public List<IVirtualBufferCache> getVirtualBufferCaches(int datasetID) {
- return datasetLifecycleManager.getVirtualBufferCaches(datasetID);
+ public List<IVirtualBufferCache> getVirtualBufferCaches(int datasetID, int
partition) {
+ return datasetLifecycleManager.getVirtualBufferCaches(datasetID,
partition);
}
@Override
diff --git
a/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContextProdiverForRecovery.java
b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContextProdiverForRecovery.java
index b975970..83f03f9 100644
---
a/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContextProdiverForRecovery.java
+++
b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContextProdiverForRecovery.java
@@ -88,8 +88,8 @@
}
@Override
- public List<IVirtualBufferCache> getVirtualBufferCaches(int datasetID) {
- return asterixAppRuntimeContext.getVirtualBufferCaches(datasetID);
+ public List<IVirtualBufferCache> getVirtualBufferCaches(int datasetID, int
partition) {
+ return asterixAppRuntimeContext.getVirtualBufferCaches(datasetID,
partition);
}
@Override
diff --git
a/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java
b/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java
index 496a10e..c16ed31 100644
---
a/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java
+++
b/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java
@@ -75,7 +75,7 @@
public double getBloomFilterFalsePositiveRate();
- public List<IVirtualBufferCache> getVirtualBufferCaches(int datasetID);
+ public List<IVirtualBufferCache> getVirtualBufferCaches(int datasetID, int
partition);
public Object getFeedManager();
diff --git
a/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
b/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
index 3b4617a..a99aead 100644
---
a/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
+++
b/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
@@ -81,9 +81,10 @@
* creates (if necessary) and returns the dataset virtual buffer caches.
*
* @param datasetID
+ * @param partition
* @return
*/
- List<IVirtualBufferCache> getVirtualBufferCaches(int datasetID);
+ List<IVirtualBufferCache> getVirtualBufferCaches(int datasetID, int
partition);
/**
* Flushes then closes all open datasets
diff --git
a/asterix-common/src/main/java/org/apache/asterix/common/context/AsterixVirtualBufferCacheProvider.java
b/asterix-common/src/main/java/org/apache/asterix/common/context/AsterixVirtualBufferCacheProvider.java
index c58b013..2675686 100644
---
a/asterix-common/src/main/java/org/apache/asterix/common/context/AsterixVirtualBufferCacheProvider.java
+++
b/asterix-common/src/main/java/org/apache/asterix/common/context/AsterixVirtualBufferCacheProvider.java
@@ -37,7 +37,7 @@
@Override
public List<IVirtualBufferCache>
getVirtualBufferCaches(IHyracksTaskContext ctx) {
return ((IAsterixAppRuntimeContext)
ctx.getJobletContext().getApplicationContext().getApplicationObject())
- .getVirtualBufferCaches(datasetID);
+ .getVirtualBufferCaches(datasetID,
ctx.getTaskAttemptId().getTaskId().getPartition());
}
}
diff --git
a/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
b/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index 0cd88d6..270656a 100644
---
a/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++
b/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -53,7 +53,7 @@
public class DatasetLifecycleManager implements IDatasetLifecycleManager,
ILifeCycleComponent {
private final AsterixStorageProperties storageProperties;
- private final Map<Integer, List<IVirtualBufferCache>>
datasetVirtualBufferCaches;
+ private final Map<Integer, DatasetVirtualBufferCaches>
datasetVirtualBufferCachesMap;
private final Map<Integer, ILSMOperationTracker> datasetOpTrackers;
private final Map<Integer, DatasetInfo> datasetInfos;
private final ILocalResourceRepository resourceRepository;
@@ -62,14 +62,17 @@
private long used;
private final ILogManager logManager;
private final LogRecord logRecord;
+ private final int numPartitions;
public DatasetLifecycleManager(AsterixStorageProperties storageProperties,
- ILocalResourceRepository resourceRepository, int
firstAvilableUserDatasetID, ILogManager logManager) {
+ ILocalResourceRepository
resourceRepository, int firstAvilableUserDatasetID,
+ ILogManager logManager, int numPartitions) {
this.logManager = logManager;
this.storageProperties = storageProperties;
this.resourceRepository = resourceRepository;
this.firstAvilableUserDatasetID = firstAvilableUserDatasetID;
- datasetVirtualBufferCaches = new HashMap<Integer,
List<IVirtualBufferCache>>();
+ this.numPartitions = numPartitions;
+ datasetVirtualBufferCachesMap = new HashMap<>();
datasetOpTrackers = new HashMap<Integer, ILSMOperationTracker>();
datasetInfos = new HashMap<Integer, DatasetInfo>();
capacity = storageProperties.getMemoryComponentGlobalBudget();
@@ -303,45 +306,36 @@
return openIndexesInfo;
}
- @Override
- public List<IVirtualBufferCache> getVirtualBufferCaches(int datasetID) {
- synchronized (datasetVirtualBufferCaches) {
- List<IVirtualBufferCache> vbcs =
datasetVirtualBufferCaches.get(datasetID);
+ private DatasetVirtualBufferCaches getVirtualBufferCaches(int datasetID) {
+ synchronized (datasetVirtualBufferCachesMap) {
+ DatasetVirtualBufferCaches vbcs =
datasetVirtualBufferCachesMap.get(datasetID);
if (vbcs == null) {
- initializeDatasetVirtualBufferCache(datasetID);
- vbcs = datasetVirtualBufferCaches.get(datasetID);
- if (vbcs == null) {
- throw new IllegalStateException("Could not find dataset "
+ datasetID + " virtual buffer cache.");
- }
+ vbcs = initializeDatasetVirtualBufferCache(datasetID);
}
return vbcs;
}
}
+ @Override
+ public List<IVirtualBufferCache> getVirtualBufferCaches(int datasetID, int
partition) {
+ DatasetVirtualBufferCaches dvbcs = getVirtualBufferCaches(datasetID);
+ return dvbcs.getVirtualBufferCaches(partition);
+ }
+
private void removeDatasetFromCache(int datasetID) throws
HyracksDataException {
deallocateDatasetMemory(datasetID);
datasetInfos.remove(datasetID);
- datasetVirtualBufferCaches.remove(datasetID);
+ datasetVirtualBufferCachesMap.remove(datasetID);
datasetOpTrackers.remove(datasetID);
}
- private void initializeDatasetVirtualBufferCache(int datasetID) {
- List<IVirtualBufferCache> vbcs = new ArrayList<IVirtualBufferCache>();
- synchronized (datasetVirtualBufferCaches) {
- int numPages = datasetID < firstAvilableUserDatasetID
- ? storageProperties.getMetadataMemoryComponentNumPages()
- : storageProperties.getMemoryComponentNumPages();
- for (int i = 0; i < storageProperties.getMemoryComponentsNum();
i++) {
- MultitenantVirtualBufferCache vbc = new
MultitenantVirtualBufferCache(
- new VirtualBufferCache(new
ResourceHeapBufferAllocator(this, Integer.toString(datasetID)),
- storageProperties.getMemoryComponentPageSize(),
- numPages /
storageProperties.getMemoryComponentsNum()));
- vbcs.add(vbc);
- }
- datasetVirtualBufferCaches.put(datasetID, vbcs);
+ private DatasetVirtualBufferCaches initializeDatasetVirtualBufferCache(int
datasetID) {
+ synchronized (datasetVirtualBufferCachesMap) {
+ DatasetVirtualBufferCaches dvbcs = new
DatasetVirtualBufferCaches(datasetID);
+ datasetVirtualBufferCachesMap.put(datasetID, dvbcs);
+ return dvbcs;
}
}
-
@Override
public ILSMOperationTracker getOperationTracker(int datasetID) {
synchronized (datasetOpTrackers) {
@@ -354,7 +348,7 @@
}
}
- private abstract class Info {
+ private static abstract class Info {
protected int referenceCount;
protected boolean isOpen;
@@ -372,7 +366,7 @@
}
}
- public class IndexInfo extends Info {
+ public static class IndexInfo extends Info {
private final ILSMIndex index;
private final long resourceId;
private final int datasetId;
@@ -396,7 +390,7 @@
}
}
- public class DatasetInfo extends Info implements Comparable<DatasetInfo> {
+ public static class DatasetInfo extends Info implements
Comparable<DatasetInfo> {
private final Map<Long, IndexInfo> indexes;
private final int datasetID;
private long lastAccess;
@@ -637,7 +631,7 @@
closeAllDatasets();
- datasetVirtualBufferCaches.clear();
+ datasetVirtualBufferCachesMap.clear();
datasetOpTrackers.clear();
datasetInfos.clear();
}
@@ -685,11 +679,7 @@
synchronized (dsInfo) {
// This is not needed for external datasets' indexes since they
never use the virtual buffer cache.
if (!dsInfo.memoryAllocated && !dsInfo.isExternal) {
- List<IVirtualBufferCache> vbcs =
getVirtualBufferCaches(dsInfo.datasetID);
- long additionalSize = 0;
- for (IVirtualBufferCache vbc : vbcs) {
- additionalSize += vbc.getNumPages() * vbc.getPageSize();
- }
+ long additionalSize =
getVirtualBufferCaches(dsInfo.datasetID).getTotalSize();
while (used + additionalSize > capacity) {
if (!evictCandidateDataset()) {
throw new HyracksDataException("Cannot allocate
dataset " + dsInfo.datasetID
@@ -710,10 +700,7 @@
}
synchronized (dsInfo) {
if (dsInfo.isOpen && dsInfo.memoryAllocated) {
- List<IVirtualBufferCache> vbcs =
getVirtualBufferCaches(dsInfo.datasetID);
- for (IVirtualBufferCache vbc : vbcs) {
- used -= (vbc.getNumPages() * vbc.getPageSize());
- }
+ used -=
getVirtualBufferCaches(dsInfo.datasetID).getTotalSize();
dsInfo.memoryAllocated = false;
}
}
@@ -725,4 +712,50 @@
int did = Integer.parseInt(resourcePath);
allocateDatasetMemory(did);
}
+
+ private class DatasetVirtualBufferCaches {
+ private final int datasetID;
+ private final Map<Integer, List<IVirtualBufferCache>>
partitionVirtualBufferCaches = new HashMap<>();
+
+ public DatasetVirtualBufferCaches(int datasetID) {
+ this.datasetID = datasetID;
+ }
+
+ private List<IVirtualBufferCache> initializeVirtualBufferCaches(int
partition) {
+ synchronized (partitionVirtualBufferCaches) {
+ int numPages = datasetID < firstAvilableUserDatasetID
+ ?
storageProperties.getMetadataMemoryComponentNumPages()
+ : storageProperties.getMemoryComponentNumPages();
+ List<IVirtualBufferCache> vbcs = new ArrayList<>();
+ for (int i = 0; i <
storageProperties.getMemoryComponentsNum(); i++) {
+ MultitenantVirtualBufferCache vbc = new
MultitenantVirtualBufferCache(
+ new VirtualBufferCache(new
ResourceHeapBufferAllocator(DatasetLifecycleManager.this,
+ Integer.toString(datasetID)),
storageProperties.getMemoryComponentPageSize(),
+ numPages /
storageProperties.getMemoryComponentsNum() / numPartitions));
+ vbcs.add(vbc);
+ }
+ partitionVirtualBufferCaches.put(partition, vbcs);
+ return vbcs;
+ }
+ }
+
+ public List<IVirtualBufferCache> getVirtualBufferCaches(int partition)
{
+ synchronized (partitionVirtualBufferCaches) {
+ List<IVirtualBufferCache> vbcs =
partitionVirtualBufferCaches.get(partition);
+ if (vbcs == null) {
+ vbcs = initializeVirtualBufferCaches(partition);
+ }
+ return vbcs;
+ }
+ }
+
+ public long getTotalSize() {
+ int numPages = datasetID < firstAvilableUserDatasetID
+ ? storageProperties.getMetadataMemoryComponentNumPages()
+ : storageProperties.getMemoryComponentNumPages();
+
+ return storageProperties.getMemoryComponentPageSize() * numPages;
+ }
+ }
+
}
diff --git
a/asterix-common/src/main/java/org/apache/asterix/common/transactions/IAsterixAppRuntimeContextProvider.java
b/asterix-common/src/main/java/org/apache/asterix/common/transactions/IAsterixAppRuntimeContextProvider.java
index 6382af9..33a7a34 100644
---
a/asterix-common/src/main/java/org/apache/asterix/common/transactions/IAsterixAppRuntimeContextProvider.java
+++
b/asterix-common/src/main/java/org/apache/asterix/common/transactions/IAsterixAppRuntimeContextProvider.java
@@ -56,7 +56,7 @@
public IIOManager getIOManager();
- public List<IVirtualBufferCache> getVirtualBufferCaches(int datasetID);
+ public List<IVirtualBufferCache> getVirtualBufferCaches(int datasetID, int
partition);
public IAsterixAppRuntimeContext getAppContext();
}
diff --git
a/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
b/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
index ec0d21f..b3660af 100644
---
a/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
+++
b/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -350,7 +350,7 @@
FileReference file = ioManager.getAbsoluteFileRef(metadataDeviceId,
resourceName);
List<IVirtualBufferCache> virtualBufferCaches = runtimeContext
- .getVirtualBufferCaches(index.getDatasetId().getId());
+ .getVirtualBufferCaches(index.getDatasetId().getId(),
metadataPartition.getPartitionId());
ITypeTraits[] typeTraits = index.getTypeTraits();
IBinaryComparatorFactory[] comparatorFactories =
index.getKeyBinaryComparatorFactory();
int[] bloomFilterKeyFields = index.getBloomFilterKeyFields();
diff --git
a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/LSMBTreeLocalResourceMetadata.java
b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/LSMBTreeLocalResourceMetadata.java
index 88e95dd..95a41c7 100644
---
a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/LSMBTreeLocalResourceMetadata.java
+++
b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/LSMBTreeLocalResourceMetadata.java
@@ -64,7 +64,7 @@
public ILSMIndex createIndexInstance(IAsterixAppRuntimeContextProvider
runtimeContextProvider, String filePath,
int partition) {
FileReference file = new FileReference(new File(filePath));
- List<IVirtualBufferCache> virtualBufferCaches =
runtimeContextProvider.getVirtualBufferCaches(datasetID);
+ List<IVirtualBufferCache> virtualBufferCaches =
runtimeContextProvider.getVirtualBufferCaches(datasetID, partition);
LSMBTree lsmBTree = LSMBTreeUtils.createLSMTree(virtualBufferCaches,
file,
runtimeContextProvider.getBufferCache(),
runtimeContextProvider.getFileMapManager(), typeTraits,
cmpFactories, bloomFilterKeyFields,
runtimeContextProvider.getBloomFilterFalsePositiveRate(),
diff --git
a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/LSMInvertedIndexLocalResourceMetadata.java
b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/LSMInvertedIndexLocalResourceMetadata.java
index c087b56..cbbabbc 100644
---
a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/LSMInvertedIndexLocalResourceMetadata.java
+++
b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/LSMInvertedIndexLocalResourceMetadata.java
@@ -74,7 +74,7 @@
@Override
public ILSMIndex createIndexInstance(IAsterixAppRuntimeContextProvider
runtimeContextProvider, String filePath,
int partition) throws HyracksDataException {
- List<IVirtualBufferCache> virtualBufferCaches =
runtimeContextProvider.getVirtualBufferCaches(datasetID);
+ List<IVirtualBufferCache> virtualBufferCaches =
runtimeContextProvider.getVirtualBufferCaches(datasetID, partition);
try {
if (isPartitioned) {
return
InvertedIndexUtils.createPartitionedLSMInvertedIndex(virtualBufferCaches,
diff --git
a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/LSMRTreeLocalResourceMetadata.java
b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/LSMRTreeLocalResourceMetadata.java
index 7a43849..07489ea 100644
---
a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/LSMRTreeLocalResourceMetadata.java
+++
b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/LSMRTreeLocalResourceMetadata.java
@@ -76,7 +76,7 @@
public ILSMIndex createIndexInstance(IAsterixAppRuntimeContextProvider
runtimeContextProvider, String filePath,
int partition) throws HyracksDataException {
FileReference file = new FileReference(new File(filePath));
- List<IVirtualBufferCache> virtualBufferCaches =
runtimeContextProvider.getVirtualBufferCaches(datasetID);
+ List<IVirtualBufferCache> virtualBufferCaches =
runtimeContextProvider.getVirtualBufferCaches(datasetID, partition);
try {
return LSMRTreeUtils.createLSMTree(virtualBufferCaches, file,
runtimeContextProvider.getBufferCache(),
runtimeContextProvider.getFileMapManager(), typeTraits,
rtreeCmpFactories, btreeCmpFactories,
diff --git
a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/TestRuntimeContextProvider.java
b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/TestRuntimeContextProvider.java
index 358913b..482d1e4 100644
---
a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/TestRuntimeContextProvider.java
+++
b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/TestRuntimeContextProvider.java
@@ -18,8 +18,6 @@
*/
package org.apache.asterix.transaction.management.service.locking;
-import static org.mockito.Mockito.mock;
-
import java.util.List;
import java.util.concurrent.Executors;
@@ -36,6 +34,8 @@
import org.apache.hyracks.storage.common.file.IFileMapProvider;
import org.apache.hyracks.storage.common.file.ILocalResourceRepository;
import org.apache.hyracks.storage.common.file.ResourceIdFactory;
+
+import static org.mockito.Mockito.mock;
class TestRuntimeContextProvider implements IAsterixAppRuntimeContextProvider {
@@ -98,7 +98,7 @@
}
@Override
- public List<IVirtualBufferCache> getVirtualBufferCaches(int datasetID) {
+ public List<IVirtualBufferCache> getVirtualBufferCaches(int datasetID, int
partition) {
throw new UnsupportedOperationException();
}
--
To view, visit https://asterix-gerrit.ics.uci.edu/705
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: Ibbf08f532c1210c30be6a51c73570a789174213b
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Michael Blow <[email protected]>