Murtadha Hubail has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/408
Change subject: ASTERIXDB-1058: make Asterix compatible with lazy LSM memory
allocation
......................................................................
ASTERIXDB-1058: make Asterix compatible with lazy LSM memory allocation
Change-Id: I4ea1eb129fe3043d43b077473dc29d17a97dfcc2
---
M
asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
M
asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
M
asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/TestRuntimeContextProvider.java
M pom.xml
4 files changed, 113 insertions(+), 45 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/08/408/1
diff --git
a/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
b/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index 27fb2c6..f6a796c 100644
---
a/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++
b/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -27,6 +27,8 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
import org.apache.asterix.common.api.ILocalResourceMetadata;
import org.apache.asterix.common.config.AsterixStorageProperties;
@@ -51,6 +53,7 @@
import org.apache.hyracks.storage.common.file.LocalResource;
public class DatasetLifecycleManager implements IIndexLifecycleManager,
ILifeCycleComponent {
+ private static final Logger LOGGER =
Logger.getLogger(DatasetLifecycleManager.class.getName());
private final AsterixStorageProperties storageProperties;
private final Map<Integer, List<IVirtualBufferCache>>
datasetVirtualBufferCaches;
private final Map<Integer, ILSMOperationTracker> datasetOpTrackers;
@@ -157,11 +160,7 @@
dsInfo.indexes.remove(resourceID);
if (dsInfo.referenceCount == 0 && dsInfo.isOpen &&
dsInfo.indexes.isEmpty() && !dsInfo.isExternal) {
- List<IVirtualBufferCache> vbcs = getVirtualBufferCaches(did);
- assert vbcs != null;
- for (IVirtualBufferCache vbc : vbcs) {
- used -= (vbc.getNumPages() * vbc.getPageSize());
- }
+ deallocateDatasetMemory(did);
datasetInfos.remove(did);
datasetVirtualBufferCaches.remove(did);
datasetOpTrackers.remove(did);
@@ -182,22 +181,7 @@
throw new HyracksDataException("Failed to open index with resource
ID " + resourceID
+ " since it does not exist.");
}
-
- // This is not needed for external datasets' indexes since they never
use the virtual buffer cache.
- if (!dsInfo.isOpen && !dsInfo.isExternal) {
- List<IVirtualBufferCache> vbcs = getVirtualBufferCaches(did);
- assert vbcs != null;
- long additionalSize = 0;
- for (IVirtualBufferCache vbc : vbcs) {
- additionalSize += vbc.getNumPages() * vbc.getPageSize();
- }
- while (used + additionalSize > capacity) {
- if (!evictCandidateDataset()) {
- throw new HyracksDataException("Cannot activate index
since memory budget would be exceeded.");
- }
- }
- used += additionalSize;
- }
+ initializeDatasetVirtualBufferCache(did);
dsInfo.isOpen = true;
dsInfo.touch();
@@ -209,6 +193,65 @@
iInfo.isOpen = true;
}
iInfo.touch();
+ }
+
+ @Override
+ public synchronized void allocateMemory(long resourceID) throws
HyracksDataException {
+ int did = getDIDfromRID(resourceID);
+ allocateDatasetMemory(did);
+ }
+
+ @Override
+ public synchronized void deallocateMemory(long resourceID) throws
HyracksDataException {
+ int did = getDIDfromRID(resourceID);
+ deallocateDatasetMemory(did);
+ }
+
+ public synchronized void allocateDatasetMemory(int datasetId) throws
HyracksDataException {
+ DatasetInfo dsInfo = datasetInfos.get(datasetId);
+ if (dsInfo == null) {
+ throw new HyracksDataException("Failed to deallocate memory for
dataset with ID " + datasetId
+ + " since it is not open.");
+ }
+ synchronized (dsInfo) {
+ // This is not needed for external datasets' indexes since they
never use the virtual buffer cache.
+ if (!dsInfo.memoryAllocated && !dsInfo.isExternal) {
+ List<IVirtualBufferCache> vbcs =
getVirtualBufferCaches(dsInfo.datasetID);
+ long additionalSize = 0;
+ for (IVirtualBufferCache vbc : vbcs) {
+ additionalSize += vbc.getNumPages() * vbc.getPageSize();
+ }
+ while (used + additionalSize > capacity) {
+ if (!evictCandidateDataset()) {
+ throw new HyracksDataException("Cannot allocate
dataset " + dsInfo.datasetID
+ + " memory since memory budget would be
exceeded.");
+ }
+ }
+ used += additionalSize;
+ dsInfo.memoryAllocated = true;
+ LOGGER.log(Level.INFO, "Allocated memory (" + additionalSize +
") bytes for dataset( "
+ + dsInfo.datasetID + "). Total Used (" + used + ")
bytes.");
+ }
+ }
+ }
+
+ public synchronized void deallocateDatasetMemory(int datasetId) throws
HyracksDataException {
+ DatasetInfo dsInfo = datasetInfos.get(datasetId);
+ if (dsInfo == null) {
+ throw new HyracksDataException("Failed to deallocate memory for
dataset with ID " + datasetId
+ + " since it is not open.");
+ }
+ synchronized (dsInfo) {
+ if (dsInfo.isOpen && dsInfo.memoryAllocated) {
+ List<IVirtualBufferCache> vbcs =
getVirtualBufferCaches(dsInfo.datasetID);
+ for (IVirtualBufferCache vbc : vbcs) {
+ used -= (vbc.getNumPages() * vbc.getPageSize());
+ }
+ dsInfo.memoryAllocated = false;
+ LOGGER.log(Level.INFO, "Deallocated memory for dataset( " +
dsInfo.datasetID + "). Total Used (" + used
+ + ") bytes.");
+ }
+ }
}
private boolean evictCandidateDataset() throws HyracksDataException {
@@ -289,18 +332,26 @@
synchronized (datasetVirtualBufferCaches) {
List<IVirtualBufferCache> vbcs =
datasetVirtualBufferCaches.get(datasetID);
if (vbcs == null) {
- vbcs = new ArrayList<IVirtualBufferCache>();
- int numPages = datasetID < firstAvilableUserDatasetID ?
storageProperties
- .getMetadataMemoryComponentNumPages() :
storageProperties.getMemoryComponentNumPages();
- for (int i = 0; i <
storageProperties.getMemoryComponentsNum(); i++) {
- MultitenantVirtualBufferCache vbc = new
MultitenantVirtualBufferCache(new VirtualBufferCache(
- new HeapBufferAllocator(),
storageProperties.getMemoryComponentPageSize(), numPages
- /
storageProperties.getMemoryComponentsNum()));
- vbcs.add(vbc);
- }
- datasetVirtualBufferCaches.put(datasetID, vbcs);
+ initializeDatasetVirtualBufferCache(datasetID);
+ vbcs = datasetVirtualBufferCaches.get(datasetID);
+ assert vbcs != null;
}
return vbcs;
+ }
+ }
+
+ private void initializeDatasetVirtualBufferCache(int datasetID) {
+ List<IVirtualBufferCache> vbcs = new ArrayList<IVirtualBufferCache>();
+ synchronized (datasetVirtualBufferCaches) {
+ int numPages = datasetID < firstAvilableUserDatasetID ?
storageProperties
+ .getMetadataMemoryComponentNumPages() :
storageProperties.getMemoryComponentNumPages();
+ for (int i = 0; i < storageProperties.getMemoryComponentsNum();
i++) {
+ MultitenantVirtualBufferCache vbc = new
MultitenantVirtualBufferCache(new VirtualBufferCache(
+ new HeapBufferAllocator(),
storageProperties.getMemoryComponentPageSize(), numPages
+ / storageProperties.getMemoryComponentsNum()));
+ vbcs.add(vbc);
+ }
+ datasetVirtualBufferCaches.put(datasetID, vbcs);
}
}
@@ -347,12 +398,14 @@
private long lastAccess;
private int numActiveIOOps;
private final boolean isExternal;
+ private boolean memoryAllocated;
public DatasetInfo(int datasetID, boolean isExternal) {
this.indexes = new HashMap<Long, IndexInfo>();
this.lastAccess = -1;
this.datasetID = datasetID;
this.isExternal = isExternal;
+ this.memoryAllocated = false;
}
@Override
@@ -426,6 +479,14 @@
public String toString() {
return "DatasetID: " + datasetID + ", isOpen: " + isOpen + ",
refCount: " + referenceCount
+ ", lastAccess: " + lastAccess + "}";
+ }
+
+ public boolean isMemoryAllocated() {
+ return memoryAllocated;
+ }
+
+ public int getDatasetID() {
+ return datasetID;
}
}
@@ -537,7 +598,6 @@
} catch (Exception e) {
throw new HyracksDataException(e);
}
-
for (IndexInfo iInfo : dsInfo.indexes.values()) {
if (iInfo.isOpen) {
ILSMOperationTracker opTracker =
iInfo.index.getOperationTracker();
@@ -548,12 +608,8 @@
}
assert iInfo.referenceCount == 0;
}
+ deallocateDatasetMemory(dsInfo.datasetID);
dsInfo.isOpen = false;
-
- List<IVirtualBufferCache> vbcs =
getVirtualBufferCaches(dsInfo.datasetID);
- for (IVirtualBufferCache vbc : vbcs) {
- used -= vbc.getNumPages() * vbc.getPageSize();
- }
}
@Override
diff --git
a/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
b/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
index 2603de2..ac33115 100644
---
a/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
+++
b/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
@@ -59,6 +59,9 @@
public void beforeOperation(ILSMIndex index, LSMOperationType opType,
ISearchOperationCallback searchCallback,
IModificationOperationCallback modificationCallback) throws
HyracksDataException {
if (opType == LSMOperationType.MODIFICATION || opType ==
LSMOperationType.FORCE_MODIFICATION) {
+ if (!dsInfo.isMemoryAllocated()) {
+
datasetLifecycleManager.allocateDatasetMemory(dsInfo.getDatasetID());
+ }
incrementNumActiveOperations(modificationCallback);
} else if (opType == LSMOperationType.FLUSH || opType ==
LSMOperationType.MERGE) {
dsInfo.declareActiveIOOperation();
@@ -77,7 +80,7 @@
@Override
public synchronized void completeOperation(ILSMIndex index,
LSMOperationType opType,
ISearchOperationCallback searchCallback,
IModificationOperationCallback modificationCallback)
- throws HyracksDataException {
+ throws HyracksDataException {
if (opType == LSMOperationType.MODIFICATION || opType ==
LSMOperationType.FORCE_MODIFICATION) {
decrementNumActiveOperations(modificationCallback);
if (numActiveOperations.get() == 0) {
@@ -108,7 +111,7 @@
}
if (needsFlush || flushOnExit) {
- //Make the current mutable components READABLE_UNWRITABLE to stop
coming modify operations from entering them until the current flush is schedule.
+ //Make the current mutable components READABLE_UNWRITABLE to stop
coming modify operations from entering them until the current flush is
scheduled.
for (ILSMIndex lsmIndex : indexes) {
AbstractLSMIndex abstractLSMIndex = ((AbstractLSMIndex)
lsmIndex);
ILSMOperationTracker opTracker =
abstractLSMIndex.getOperationTracker();
@@ -133,7 +136,7 @@
}
}
- //Since this method is called sequentially by
LogPage.notifyFlushTerminator in the sequence flush were scheduled.
+ //This method is called sequentially by LogPage.notifyFlushTerminator in
the sequence flushes were scheduled.
public synchronized void triggerScheduleFlush(LogRecord logRecord) throws
HyracksDataException {
for (ILSMIndex lsmIndex : dsInfo.getDatasetIndexes()) {
diff --git
a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/TestRuntimeContextProvider.java
b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/TestRuntimeContextProvider.java
index 730116a..0df5761 100644
---
a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/TestRuntimeContextProvider.java
+++
b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/TestRuntimeContextProvider.java
@@ -39,9 +39,9 @@
class TestRuntimeContextProvider implements IAsterixAppRuntimeContextProvider {
- AsterixThreadExecutor ate = new
AsterixThreadExecutor(Executors.defaultThreadFactory());
+ AsterixThreadExecutor ate = new
AsterixThreadExecutor(Executors.defaultThreadFactory());
IIndexLifecycleManager ilm = new IndexLifecycleManager();
-
+
@Override
public AsterixThreadExecutor getThreadExecutor() {
return ate;
@@ -138,6 +138,15 @@
public List<IIndex> getOpenIndexes() {
throw new UnsupportedOperationException();
}
-
+
+ @Override
+ public void allocateMemory(long resourceID) throws
HyracksDataException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void deallocateMemory(long resourceID) throws
HyracksDataException {
+ throw new UnsupportedOperationException();
+ }
}
}
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 94960e9..1245271 100644
--- a/pom.xml
+++ b/pom.xml
@@ -56,8 +56,8 @@
<global.test.includes>**/*TestSuite.java,**/*Test.java,${execution.tests}</global.test.includes>
<global.test.excludes>${optimizer.tests},${metadata.tests},${invalid.tests},${repeated.tests}</global.test.excludes>
<!-- Versions under dependencymanagement or used in many projects via
properties -->
- <algebricks.version>0.2.16-incubating</algebricks.version>
- <hyracks.version>0.2.16-incubating</hyracks.version>
+ <algebricks.version>0.2.17-SNAPSHOT</algebricks.version>
+ <hyracks.version>0.2.17-SNAPSHOT</hyracks.version>
<hadoop.version>2.2.0</hadoop.version>
<junit.version>4.11</junit.version>
<commons.io.version>2.4</commons.io.version>
--
To view, visit https://asterix-gerrit.ics.uci.edu/408
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I4ea1eb129fe3043d43b077473dc29d17a97dfcc2
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Murtadha Hubail <[email protected]>