Luo Chen has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/1761
Change subject: Implemented Disk Components Alignment Based on IDs
......................................................................
Implemented Disk Components Alignment Based on IDs
- Added IDs (using LSN) for disk components. When a disk component is
flushed, its initial ID is set as the LSN. When components are merged,
the result ID is the union of all IDs to be merged.
- Changed the correlated merge policy to correlate disk components of
the primary and secondaries using IDs.
Change-Id: I768ee9ac0a8d3c99c631086093a6b778b2e7588e
---
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/IndexInfo.java
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallback.java
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallback.java
A
asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java
M
asterixdb/asterix-common/src/test/java/org/apache/asterix/test/memory/ConcurrentFramePoolUnitTest.java
M
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponent.java
A
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponentID.java
M
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java
A
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMDiskComponentID.java
16 files changed, 738 insertions(+), 81 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/61/1761/1
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java
index cd1d95e..d4a80ea 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java
@@ -31,9 +31,11 @@
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
import
org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.ComponentState;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentID;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
+import org.apache.hyracks.storage.am.lsm.common.impls.PrefixMergePolicy;
public class CorrelatedPrefixMergePolicy implements ILSMMergePolicy {
@@ -57,66 +59,30 @@
// all such components for which the sum of their sizes exceeds
MaxMrgCompSz. Schedule a merge of those components into a new component.
// 2. If a merge from 1 doesn't happen, see if the set of candidate
components for merging exceeds MaxTolCompCnt. If so, schedule
// a merge all of the current candidates into a new single component.
- List<ILSMDiskComponent> immutableComponents = new
ArrayList<>(index.getImmutableComponents());
- // Reverse the components order so that we look at components from
oldest to newest.
- Collections.reverse(immutableComponents);
- for (ILSMDiskComponent c : immutableComponents) {
- if (c.getState() != ComponentState.READABLE_UNWRITABLE) {
+ if (fullMergeIsRequested) {
+ //full merge request is handled by each index separately, since it
is possible that
+ //when a primary index wants to send full merge requests for all
secondaries,
+ //one secondary index is being merged and the request cannot be
scheduled
+ ArrayList<ILSMDiskComponent> immutableComponents = new
ArrayList<>(index.getImmutableComponents());
+ if (!areComponentsReadableUnwritableState(immutableComponents)) {
return;
}
- }
- if (fullMergeIsRequested) {
+
ILSMIndexAccessor accessor =
index.createAccessor(NoOpOperationCallback.INSTANCE,
NoOpOperationCallback.INSTANCE);
accessor.scheduleFullMerge(index.getIOOperationCallback());
return;
}
+
if (!index.isPrimaryIndex()) {
return;
}
- long totalSize = 0;
- int startIndex = -1;
- int minNumComponents = Integer.MAX_VALUE;
-
- Set<ILSMIndex> indexes =
datasetLifecycleManager.getDatasetInfo(datasetId).getDatasetIndexes();
- for (ILSMIndex lsmIndex : indexes) {
- minNumComponents = Math.min(minNumComponents,
lsmIndex.getImmutableComponents().size());
+ ArrayList<ILSMDiskComponent> immutableComponents = new
ArrayList<>(index.getImmutableComponents());
+ if (!areComponentsReadableUnwritableState(immutableComponents)) {
+ return;
}
-
- for (int i = 0; i < minNumComponents; i++) {
- ILSMComponent c = immutableComponents.get(i);
- long componentSize = ((ILSMDiskComponent) c).getComponentSize();
- if (componentSize > maxMergableComponentSize) {
- startIndex = i;
- totalSize = 0;
- continue;
- }
- totalSize += componentSize;
- boolean isLastComponent = i + 1 == minNumComponents ? true : false;
- if (totalSize > maxMergableComponentSize
- || (isLastComponent && i - startIndex >=
maxToleranceComponentCount)) {
-
- for (ILSMIndex lsmIndex : indexes) {
- List<ILSMDiskComponent> reversedImmutableComponents =
- new ArrayList<>(lsmIndex.getImmutableComponents());
- // Reverse the components order so that we look at
components from oldest to newest.
- Collections.reverse(reversedImmutableComponents);
-
- List<ILSMDiskComponent> mergableComponents = new
ArrayList<>();
- for (int j = startIndex + 1; j <= i; j++) {
-
mergableComponents.add(reversedImmutableComponents.get(j));
- }
- // Reverse the components order back to its original order
- Collections.reverse(mergableComponents);
-
- ILSMIndexAccessor accessor =
-
lsmIndex.createAccessor(NoOpOperationCallback.INSTANCE,
NoOpOperationCallback.INSTANCE);
- accessor.scheduleMerge(lsmIndex.getIOOperationCallback(),
mergableComponents);
- }
- break;
- }
- }
+ scheduleMerge(index);
}
@Override
@@ -127,9 +93,184 @@
Integer.parseInt(properties.get(CorrelatedPrefixMergePolicyFactory.KEY_MAX_COMPONENT_COUNT));
}
+ /**
+ * Adopts the similar logic to decide merge lagging based on {@link
PrefixMergePolicy}
+ *
+ * @throws HyracksDataException
+ */
@Override
- public boolean isMergeLagging(ILSMIndex index) {
- //TODO implement properly according to the merge policy
+ public boolean isMergeLagging(ILSMIndex index) throws HyracksDataException
{
+ List<ILSMDiskComponent> immutableComponents =
index.getImmutableComponents();
+ int mergableImmutableComponentCount =
getMergableImmutableComponentCount(immutableComponents);
+
+ // [case 1]
+ if (mergableImmutableComponentCount < maxToleranceComponentCount) {
+ return false;
+ }
+
+ boolean isMergeOngoing = isMergeOngoing(immutableComponents);
+
+ if (isMergeOngoing) {
+ // [case 2]
+ return true;
+ }
+
+ if (index.isPrimaryIndex()) {
+ // [case 3]
+ // make sure that all components are of READABLE_UNWRITABLE state.
+ if (!areComponentsReadableUnwritableState(immutableComponents)) {
+ throw new IllegalStateException();
+ }
+ // schedule a merge operation
+ boolean isMergeTriggered = scheduleMerge(index);
+ if (!isMergeTriggered) {
+ throw new IllegalStateException();
+ }
+ return true;
+ } else {
+ //[case 3]
+ //if the index is secondary, then ignore the merge request (since
merge should be coordinated by the primary),
+ //and here we simply treat it as not lagged.
+ return false;
+ }
+ }
+
+ private boolean scheduleMerge(ILSMIndex index) throws HyracksDataException
{
+ List<ILSMDiskComponent> immutableComponents = new
ArrayList<>(index.getImmutableComponents());
+ Collections.reverse(immutableComponents);
+
+ long totalSize = 0;
+ int startIndex = -1;
+
+ int numComponents = immutableComponents.size();
+
+ for (int i = 0; i < numComponents; i++) {
+ ILSMComponent c = immutableComponents.get(i);
+ long componentSize = ((ILSMDiskComponent) c).getComponentSize();
+ if (componentSize > maxMergableComponentSize ||
((ILSMDiskComponent) c).getComponentID().notFound()) {
+ startIndex = i;
+ totalSize = 0;
+ continue;
+ }
+ totalSize += componentSize;
+ boolean isLastComponent = i + 1 == numComponents ? true : false;
+ if (totalSize > maxMergableComponentSize
+ || (isLastComponent && i - startIndex >=
maxToleranceComponentCount)) {
+ //merge disk components from startIndex+1 to i
+ long minID = immutableComponents.get(startIndex +
1).getComponentID().getMinID();
+ long maxID =
immutableComponents.get(i).getComponentID().getMaxID();
+ Set<IndexInfo> indexInfos =
datasetLifecycleManager.getDatasetInfo(datasetId).getDatsetIndexInfos();
+ triggerScheduledMerge(minID, maxID, getIndexPartition(index,
indexInfos), indexInfos);
+ return true;
+ }
+ }
return false;
}
+
+ /**
+ * Submit merge requests for all disk components within [minID, maxID] of
all indexes of a given dataset in the given partition
+ * @param minID
+ * @param maxID
+ * @param partition
+ * @param indexInfos
+ * @throws HyracksDataException
+ */
+ private void triggerScheduledMerge(long minID, long maxID, int partition,
Set<IndexInfo> indexInfos)
+ throws HyracksDataException {
+ for (IndexInfo info : indexInfos) {
+ if (info.getPartition() == partition) {
+ ILSMIndex lsmIndex = info.getIndex();
+
+ List<ILSMDiskComponent> immutableComponents = new
ArrayList<>(lsmIndex.getImmutableComponents());
+ if (isMergeOngoing(immutableComponents)) {
+ continue;
+ }
+ List<ILSMDiskComponent> mergableComponents = new ArrayList<>();
+
+ for (ILSMDiskComponent component : immutableComponents) {
+ ILSMDiskComponentID id = component.getComponentID();
+ if (id.notFound()) {
+ //ignore invalid IDs
+ continue;
+ }
+ if (id.getMinID() >= minID && id.getMaxID() <= maxID) {
+ mergableComponents.add(component);
+ }
+ if (id.getMaxID() < minID) {
+ //disk components are ordered from latest (with
largest IDs) to oldest (with smallest IDs)
+ //if the component.maxID < minID, then we can safely
assume the rest disk components in the list
+ break;
+ }
+ }
+ ILSMIndexAccessor accessor =
+
lsmIndex.createAccessor(NoOpOperationCallback.INSTANCE,
NoOpOperationCallback.INSTANCE);
+ accessor.scheduleMerge(lsmIndex.getIOOperationCallback(),
mergableComponents);
+ }
+ }
+ }
+
+ /**
+ * This method returns the number of mergable components among the given
list
+ * of immutable components that are ordered from the latest component to
order ones. A caller
+ * need to make sure the order in the list.
+ *
+ * @param immutableComponents
+ * @return the number of mergable component
+ * @throws HyracksDataException
+ */
+ private int getMergableImmutableComponentCount(List<ILSMDiskComponent>
immutableComponents)
+ throws HyracksDataException {
+ int count = 0;
+ for (ILSMComponent c : immutableComponents) {
+ long componentSize = ((ILSMDiskComponent) c).getComponentSize();
+ //stop when the first non-mergable component is found.
+ if (c.getState() != ComponentState.READABLE_UNWRITABLE ||
componentSize > maxMergableComponentSize
+ || ((ILSMDiskComponent) c).getComponentID().notFound()) {
+ break;
+ }
+ ++count;
+ }
+ return count;
+ }
+
+ /**
+ * This method returns whether there is an ongoing merge operation or not
by checking
+ * each component state of given components.
+ *
+ * @param immutableComponents
+ * @return true if there is an ongoing merge operation, false otherwise.
+ */
+ private boolean isMergeOngoing(List<ILSMDiskComponent>
immutableComponents) {
+ int size = immutableComponents.size();
+ for (int i = 0; i < size; i++) {
+ if (immutableComponents.get(i).getState() ==
ComponentState.READABLE_MERGING) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * checks whether all given components are of READABLE_UNWRITABLE state
+ *
+ * @param immutableComponents
+ * @return true if all components are of READABLE_UNWRITABLE state, false
otherwise.
+ */
+ private boolean
areComponentsReadableUnwritableState(List<ILSMDiskComponent>
immutableComponents) {
+ for (ILSMComponent c : immutableComponents) {
+ if (c.getState() != ComponentState.READABLE_UNWRITABLE) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private int getIndexPartition(ILSMIndex index, Set<IndexInfo> indexInfos) {
+ for (IndexInfo info : indexInfos) {
+ if (info.getIndex() == index) {
+ return info.getPartition();
+ }
+ }
+ return -1;
+ }
}
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
index 6a2cc56..71d4a96 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
@@ -76,6 +76,17 @@
return datasetIndexes;
}
+ public synchronized Set<IndexInfo> getDatsetIndexInfos() {
+ Set<IndexInfo> infos = new HashSet<>();
+ for (IndexInfo iInfo : getIndexes().values()) {
+ if (iInfo.isOpen()) {
+ infos.add(iInfo);
+ }
+ }
+
+ return infos;
+ }
+
@Override
public int compareTo(DatasetInfo i) {
// sort by (isOpen, referenceCount, lastAccess) ascending, where true
< false
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index 9529366..51a535a 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -93,12 +93,12 @@
public synchronized void register(String resourcePath, IIndex index)
throws HyracksDataException {
validateDatasetLifecycleManagerState();
int did = getDIDfromResourcePath(resourcePath);
- long resourceID = getResourceIDfromResourcePath(resourcePath);
+ LocalResource resource = resourceRepository.get(resourcePath);
DatasetResource datasetResource = datasets.get(did);
if (datasetResource == null) {
datasetResource = getDatasetLifecycle(did);
}
- datasetResource.register(resourceID, index);
+ datasetResource.register(resource, index);
}
public int getDIDfromResourcePath(String resourcePath) throws
HyracksDataException {
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
index a880ce2..f2f3b93 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
@@ -20,9 +20,11 @@
import java.util.Map;
+import org.apache.asterix.common.dataflow.DatasetLocalResource;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
import org.apache.hyracks.storage.common.IIndex;
+import org.apache.hyracks.storage.common.LocalResource;
/**
* A dataset can be in one of two states { EVICTED , LOADED }.
@@ -41,8 +43,7 @@
private final PrimaryIndexOperationTracker datasetPrimaryOpTracker;
private final DatasetVirtualBufferCaches datasetVirtualBufferCaches;
- public DatasetResource(DatasetInfo datasetInfo,
- PrimaryIndexOperationTracker datasetPrimaryOpTracker,
+ public DatasetResource(DatasetInfo datasetInfo,
PrimaryIndexOperationTracker datasetPrimaryOpTracker,
DatasetVirtualBufferCaches datasetVirtualBufferCaches) {
this.datasetInfo = datasetInfo;
this.datasetPrimaryOpTracker = datasetPrimaryOpTracker;
@@ -86,7 +87,8 @@
return (iInfo == null) ? null : iInfo.getIndex();
}
- public void register(long resourceID, IIndex index) throws
HyracksDataException {
+ public void register(LocalResource resource, IIndex index) throws
HyracksDataException {
+ long resourceID = resource.getId();
if (!datasetInfo.isRegistered()) {
synchronized (datasetInfo) {
if (!datasetInfo.isRegistered()) {
@@ -102,8 +104,8 @@
if (index == null) {
throw new HyracksDataException("Attempt to register a null index");
}
- datasetInfo.getIndexes().put(resourceID,
- new IndexInfo((ILSMIndex) index, datasetInfo.getDatasetID(),
resourceID));
+ datasetInfo.getIndexes().put(resourceID, new IndexInfo((ILSMIndex)
index, datasetInfo.getDatasetID(),
+ resourceID, ((DatasetLocalResource)
resource.getResource()).getPartition()));
}
public DatasetInfo getDatasetInfo() {
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/IndexInfo.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/IndexInfo.java
index 34ccce0..56f15e8 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/IndexInfo.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/IndexInfo.java
@@ -22,13 +22,15 @@
public class IndexInfo extends Info {
private final ILSMIndex index;
- private final long resourceId;
private final int datasetId;
+ private final long resourceID;
+ private final int partition;
- public IndexInfo(ILSMIndex index, int datasetId, long resourceId) {
+ public IndexInfo(ILSMIndex index, int datasetId, long resourceID, int
partition) {
this.index = index;
this.datasetId = datasetId;
- this.resourceId = resourceId;
+ this.resourceID = resourceID;
+ this.partition = partition;
}
public ILSMIndex getIndex() {
@@ -36,7 +38,11 @@
}
public long getResourceId() {
- return resourceId;
+ return resourceID;
+ }
+
+ public int getPartition() {
+ return partition;
}
public int getDatasetId() {
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
index f903b65..8f5bad5 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
@@ -20,6 +20,8 @@
package org.apache.asterix.common.ioopcallbacks;
import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.data.std.primitive.LongPointable;
@@ -28,11 +30,16 @@
import
org.apache.hyracks.storage.am.common.freepage.MutableArrayValueReference;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentID;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
+import org.apache.hyracks.storage.am.lsm.common.impls.DiskComponentMetadata;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMDiskComponentID;
// A single LSMIOOperationCallback per LSM index used to perform actions
around Flush and Merge operations
public abstract class AbstractLSMIOOperationCallback implements
ILSMIOOperationCallback {
+ private static final Logger logger =
Logger.getLogger(AbstractLSMIOOperationCallback.class.getName());
+
public static final MutableArrayValueReference LSN_KEY = new
MutableArrayValueReference("LSN".getBytes());
public static final long INVALID = -1L;
@@ -100,6 +107,44 @@
index.getMetadata().put(LSN_KEY,
LongPointable.FACTORY.createPointable(getComponentLSN(oldComponents)));
}
+ public ILSMDiskComponentID getComponentID(ILSMDiskComponent component,
List<ILSMComponent> oldComponents)
+ throws HyracksDataException {
+ if (oldComponents == null) {
+ //a flush operation
+ synchronized (this) {
+ long id = getComponentLSN(oldComponents);
+ if (id == 0) {
+ logger.log(Level.WARNING, "Flushing a memory component
without setting the LSN");
+ id = ILSMDiskComponentID.NOT_FOUND;
+ }
+ return new LSMDiskComponentID(id, id);
+ }
+ } else {
+ long minID = Long.MAX_VALUE;
+ long maxID = Long.MIN_VALUE;
+ for (ILSMComponent oldComponent : oldComponents) {
+ ILSMDiskComponentID oldComponentID = ((ILSMDiskComponent)
oldComponent).getComponentID();
+ if (oldComponentID.getMinID() < minID) {
+ minID = oldComponentID.getMinID();
+ }
+ if (oldComponentID.getMaxID() > maxID) {
+ maxID = oldComponentID.getMaxID();
+ }
+ }
+ return new LSMDiskComponentID(minID, maxID);
+ }
+ }
+
+ public void putComponentIDIntoMetadata(ILSMDiskComponent component,
List<ILSMComponent> oldComponents)
+ throws HyracksDataException {
+ DiskComponentMetadata metadata = component.getMetadata();
+ ILSMDiskComponentID componentID = getComponentID(component,
oldComponents);
+ metadata.put(ILSMDiskComponentID.COMPONENT_ID_MIN_KEY,
+ LongPointable.FACTORY.createPointable(componentID.getMinID()));
+ metadata.put(ILSMDiskComponentID.COMPONENT_ID_MAX_KEY,
+ LongPointable.FACTORY.createPointable(componentID.getMaxID()));
+ }
+
public static long getTreeIndexLSN(ITreeIndex treeIndex) throws
HyracksDataException {
LongPointable pointable = new LongPointable();
IMetadataPageManager metadataPageManager = (IMetadataPageManager)
treeIndex.getPageManager();
@@ -111,6 +156,7 @@
mutableLastLSNs[writeIndex] = lastLSN;
}
+
public void setFirstLSN(long firstLSN) {
// We make sure that this method is only called on an empty component
so the first LSN is not set incorrectly
firstLSNs[writeIndex] = firstLSN;
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java
index 173c962..e8934fd 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java
@@ -45,6 +45,7 @@
if (newComponent != null) {
LSMBTreeDiskComponent btreeComponent = (LSMBTreeDiskComponent)
newComponent;
putLSNIntoMetadata(btreeComponent, oldComponents);
+ putComponentIDIntoMetadata(newComponent, oldComponents);
if (opType == LSMOperationType.MERGE) {
LongPointable markerLsn = LongPointable.FACTORY
.createPointable(ComponentMetadataUtil.getLong(oldComponents.get(0).getMetadata(),
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallback.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallback.java
index 6c987d6..e186569 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallback.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallback.java
@@ -36,6 +36,7 @@
if (newComponent != null) {
LSMBTreeWithBuddyDiskComponent btreeComponent =
(LSMBTreeWithBuddyDiskComponent) newComponent;
putLSNIntoMetadata(btreeComponent, oldComponents);
+ putComponentIDIntoMetadata(newComponent, oldComponents);
}
}
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java
index 657d908..1428fe0 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java
@@ -41,6 +41,7 @@
if (newComponent != null) {
LSMInvertedIndexDiskComponent invIndexComponent =
(LSMInvertedIndexDiskComponent) newComponent;
putLSNIntoMetadata(invIndexComponent, oldComponents);
+ putComponentIDIntoMetadata(invIndexComponent, oldComponents);
}
}
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallback.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallback.java
index 2dc06f7..55f1c60 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallback.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallback.java
@@ -41,6 +41,7 @@
if (newComponent != null) {
LSMRTreeDiskComponent rtreeComponent = (LSMRTreeDiskComponent)
newComponent;
putLSNIntoMetadata(rtreeComponent, oldComponents);
+ putComponentIDIntoMetadata(rtreeComponent, oldComponents);
}
}
diff --git
a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java
b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java
new file mode 100644
index 0000000..bf8e8f4
--- /dev/null
+++
b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.test.context;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.asterix.common.api.IDatasetLifecycleManager;
+import org.apache.asterix.common.context.CorrelatedPrefixMergePolicy;
+import org.apache.asterix.common.context.CorrelatedPrefixMergePolicyFactory;
+import org.apache.asterix.common.context.DatasetInfo;
+import org.apache.asterix.common.context.IndexInfo;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import
org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.ComponentState;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentID;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMDiskComponentID;
+import org.apache.hyracks.storage.common.IModificationOperationCallback;
+import org.apache.hyracks.storage.common.ISearchOperationCallback;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import junit.framework.TestCase;
+
+public class CorrelatedPrefixMergePolicyTest extends TestCase {
+
+ private final long DEFAULT_COMPONENT_SIZE = 1L;
+
+ private final int MAX_COMPONENT_SIZE = 3;
+
+ private final int MAX_COMPONENT_COUNT = 3;
+
+ private final int DATASET_ID = 1;
+
+ @Test
+ public void testBasic() {
+ try {
+ List<ILSMDiskComponentID> componentIDs =
+ Arrays.asList(new LSMDiskComponentID(5, 5), new
LSMDiskComponentID(4, 4),
+ new LSMDiskComponentID(3, 3), new
LSMDiskComponentID(2, 2), new LSMDiskComponentID(1, 1));
+
+ List<ILSMDiskComponentID> resultPrimaryIDs = new ArrayList<>();
+ IndexInfo primary = mockIndex(true, componentIDs,
resultPrimaryIDs, 0);
+
+ List<ILSMDiskComponentID> resultSecondaryIDs = new ArrayList<>();
+ IndexInfo secondary = mockIndex(false, componentIDs,
resultSecondaryIDs, 0);
+
+ ILSMMergePolicy policy = mockMergePolicy(primary, secondary);
+ policy.diskComponentAdded(secondary.getIndex(), false);
+ Assert.assertTrue(resultPrimaryIDs.isEmpty());
+ Assert.assertTrue(resultSecondaryIDs.isEmpty());
+
+ policy.diskComponentAdded(primary.getIndex(), false);
+
+ Assert.assertEquals(Arrays.asList(new LSMDiskComponentID(4, 4),
new LSMDiskComponentID(3, 3),
+ new LSMDiskComponentID(2, 2), new LSMDiskComponentID(1,
1)), resultPrimaryIDs);
+ Assert.assertEquals(Arrays.asList(new LSMDiskComponentID(4, 4),
new LSMDiskComponentID(3, 3),
+ new LSMDiskComponentID(2, 2), new LSMDiskComponentID(1,
1)), resultSecondaryIDs);
+
+ } catch (HyracksDataException e) {
+ Assert.fail(e.getMessage());
+ }
+
+ }
+
+ @Test
+ public void testIDIntervals() {
+ try {
+ List<ILSMDiskComponentID> componentIDs = Arrays.asList(new
LSMDiskComponentID(40, 50),
+ new LSMDiskComponentID(30, 35), new LSMDiskComponentID(25,
29), new LSMDiskComponentID(20, 24),
+ new LSMDiskComponentID(10, 19));
+
+ List<ILSMDiskComponentID> resultPrimaryIDs = new ArrayList<>();
+ IndexInfo primary = mockIndex(true, componentIDs,
resultPrimaryIDs, 0);
+
+ List<ILSMDiskComponentID> resultSecondaryIDs = new ArrayList<>();
+ IndexInfo secondary = mockIndex(false, componentIDs,
resultSecondaryIDs, 0);
+
+ ILSMMergePolicy policy = mockMergePolicy(primary, secondary);
+ policy.diskComponentAdded(secondary.getIndex(), false);
+ Assert.assertTrue(resultPrimaryIDs.isEmpty());
+ Assert.assertTrue(resultSecondaryIDs.isEmpty());
+
+ policy.diskComponentAdded(primary.getIndex(), false);
+
+ Assert.assertEquals(Arrays.asList(new LSMDiskComponentID(30, 35),
new LSMDiskComponentID(25, 29),
+ new LSMDiskComponentID(20, 24), new LSMDiskComponentID(10,
19)), resultPrimaryIDs);
+ Assert.assertEquals(Arrays.asList(new LSMDiskComponentID(30, 35),
new LSMDiskComponentID(25, 29),
+ new LSMDiskComponentID(20, 24), new LSMDiskComponentID(10,
19)), resultSecondaryIDs);
+
+ } catch (HyracksDataException e) {
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testSecondaryMissing() {
+ try {
+ List<ILSMDiskComponentID> primaryComponentIDs = Arrays.asList(new
LSMDiskComponentID(40, 50),
+ new LSMDiskComponentID(30, 35), new LSMDiskComponentID(25,
29), new LSMDiskComponentID(20, 24),
+ new LSMDiskComponentID(10, 19));
+ List<ILSMDiskComponentID> resultPrimaryIDs = new ArrayList<>();
+ IndexInfo primary = mockIndex(true, primaryComponentIDs,
resultPrimaryIDs, 0);
+
+ List<ILSMDiskComponentID> secondaryComponentIDs =
Arrays.asList(new LSMDiskComponentID(30, 35),
+ new LSMDiskComponentID(25, 29), new LSMDiskComponentID(20,
24));
+ List<ILSMDiskComponentID> resultSecondaryIDs = new ArrayList<>();
+ IndexInfo secondary = mockIndex(false, secondaryComponentIDs,
resultSecondaryIDs, 0);
+
+ ILSMMergePolicy policy = mockMergePolicy(primary, secondary);
+
+ policy.diskComponentAdded(secondary.getIndex(), false);
+ Assert.assertTrue(resultPrimaryIDs.isEmpty());
+ Assert.assertTrue(resultSecondaryIDs.isEmpty());
+
+ policy.diskComponentAdded(primary.getIndex(), false);
+ Assert.assertEquals(Arrays.asList(new LSMDiskComponentID(30, 35),
new LSMDiskComponentID(25, 29),
+ new LSMDiskComponentID(20, 24), new LSMDiskComponentID(10,
19)), resultPrimaryIDs);
+ Assert.assertEquals(Arrays.asList(new LSMDiskComponentID(30, 35),
new LSMDiskComponentID(25, 29),
+ new LSMDiskComponentID(20, 24)), resultSecondaryIDs);
+
+ } catch (HyracksDataException e) {
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testPrimaryNotFound() {
+ try {
+ List<ILSMDiskComponentID> primaryComponentIDs = Arrays.asList(new
LSMDiskComponentID(40, 50),
+ new LSMDiskComponentID(30, 35), new LSMDiskComponentID(25,
29),
+ new LSMDiskComponentID(ILSMDiskComponentID.NOT_FOUND,
ILSMDiskComponentID.NOT_FOUND),
+ new LSMDiskComponentID(10, 19));
+ List<ILSMDiskComponentID> resultPrimaryIDs = new ArrayList<>();
+ IndexInfo primary = mockIndex(true, primaryComponentIDs,
resultPrimaryIDs, 0);
+
+ List<ILSMDiskComponentID> secondaryComponentIDs =
Arrays.asList(new LSMDiskComponentID(30, 35),
+ new LSMDiskComponentID(25, 29), new LSMDiskComponentID(20,
24));
+ List<ILSMDiskComponentID> resultSecondaryIDs = new ArrayList<>();
+ IndexInfo secondary = mockIndex(false, secondaryComponentIDs,
resultSecondaryIDs, 0);
+
+ ILSMMergePolicy policy = mockMergePolicy(primary, secondary);
+
+ policy.diskComponentAdded(secondary.getIndex(), false);
+ Assert.assertTrue(resultPrimaryIDs.isEmpty());
+ Assert.assertTrue(resultSecondaryIDs.isEmpty());
+
+ policy.diskComponentAdded(primary.getIndex(), false);
+ Assert.assertEquals(Arrays.asList(new LSMDiskComponentID(40, 50),
new LSMDiskComponentID(30, 35),
+ new LSMDiskComponentID(25, 29)), resultPrimaryIDs);
+ Assert.assertEquals(Arrays.asList(new LSMDiskComponentID(30, 35),
new LSMDiskComponentID(25, 29)),
+ resultSecondaryIDs);
+ } catch (HyracksDataException e) {
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testSecondaryNotFound() {
+ try {
+ List<ILSMDiskComponentID> primaryComponentIDs = Arrays.asList(new
LSMDiskComponentID(40, 50),
+ new LSMDiskComponentID(30, 35), new LSMDiskComponentID(25,
29), new LSMDiskComponentID(20, 24),
+ new LSMDiskComponentID(10, 19));
+ List<ILSMDiskComponentID> resultPrimaryIDs = new ArrayList<>();
+ IndexInfo primary = mockIndex(true, primaryComponentIDs,
resultPrimaryIDs, 0);
+
+ List<ILSMDiskComponentID> secondaryComponentIDs =
Arrays.asList(new LSMDiskComponentID(30, 35),
+ new LSMDiskComponentID(ILSMDiskComponentID.NOT_FOUND,
ILSMDiskComponentID.NOT_FOUND),
+ new LSMDiskComponentID(20, 24));
+ List<ILSMDiskComponentID> resultSecondaryIDs = new ArrayList<>();
+ IndexInfo secondary = mockIndex(false, secondaryComponentIDs,
resultSecondaryIDs, 0);
+
+ ILSMMergePolicy policy = mockMergePolicy(primary, secondary);
+
+ policy.diskComponentAdded(secondary.getIndex(), false);
+ Assert.assertTrue(resultPrimaryIDs.isEmpty());
+ Assert.assertTrue(resultSecondaryIDs.isEmpty());
+
+ policy.diskComponentAdded(primary.getIndex(), false);
+ Assert.assertEquals(Arrays.asList(new LSMDiskComponentID(30, 35),
new LSMDiskComponentID(25, 29),
+ new LSMDiskComponentID(20, 24), new LSMDiskComponentID(10,
19)), resultPrimaryIDs);
+ Assert.assertEquals(Arrays.asList(new LSMDiskComponentID(30, 35),
new LSMDiskComponentID(20, 24)),
+ resultSecondaryIDs);
+
+ } catch (HyracksDataException e) {
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testMultiPartition() {
+ try {
+ List<ILSMDiskComponentID> componentIDs = Arrays.asList(new
LSMDiskComponentID(40, 50),
+ new LSMDiskComponentID(30, 35), new LSMDiskComponentID(25,
29), new LSMDiskComponentID(20, 24),
+ new LSMDiskComponentID(10, 19));
+
+ List<ILSMDiskComponentID> resultPrimaryIDs = new ArrayList<>();
+ IndexInfo primary = mockIndex(true, componentIDs,
resultPrimaryIDs, 0);
+
+ List<ILSMDiskComponentID> resultSecondaryIDs = new ArrayList<>();
+ IndexInfo secondary = mockIndex(false, componentIDs,
resultSecondaryIDs, 0);
+
+ List<ILSMDiskComponentID> resultSecondaryIDs1 = new ArrayList<>();
+ IndexInfo secondary1 = mockIndex(false, componentIDs,
resultSecondaryIDs, 1);
+
+ ILSMMergePolicy policy = mockMergePolicy(primary, secondary,
secondary1);
+ policy.diskComponentAdded(secondary.getIndex(), false);
+ Assert.assertTrue(resultPrimaryIDs.isEmpty());
+ Assert.assertTrue(resultSecondaryIDs.isEmpty());
+
+ policy.diskComponentAdded(primary.getIndex(), false);
+
+ Assert.assertEquals(Arrays.asList(new LSMDiskComponentID(30, 35),
new LSMDiskComponentID(25, 29),
+ new LSMDiskComponentID(20, 24), new LSMDiskComponentID(10,
19)), resultPrimaryIDs);
+ Assert.assertEquals(Arrays.asList(new LSMDiskComponentID(30, 35),
new LSMDiskComponentID(25, 29),
+ new LSMDiskComponentID(20, 24), new LSMDiskComponentID(10,
19)), resultSecondaryIDs);
+ Assert.assertTrue(resultSecondaryIDs1.isEmpty());
+ } catch (HyracksDataException e) {
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ private ILSMMergePolicy mockMergePolicy(IndexInfo... indexes) {
+ Map<String, String> properties = new HashMap<>();
+
properties.put(CorrelatedPrefixMergePolicyFactory.KEY_MAX_COMPONENT_COUNT,
String.valueOf(MAX_COMPONENT_COUNT));
+
properties.put(CorrelatedPrefixMergePolicyFactory.KEY_MAX_COMPONENT_SIZE,
String.valueOf(MAX_COMPONENT_SIZE));
+
+ Set<IndexInfo> indexInfos = new HashSet<>();
+ for (IndexInfo info : indexes) {
+ indexInfos.add(info);
+ }
+
+ DatasetInfo dsInfo = Mockito.mock(DatasetInfo.class);
+ Mockito.when(dsInfo.getDatsetIndexInfos()).thenReturn(indexInfos);
+
+ IDatasetLifecycleManager manager =
Mockito.mock(IDatasetLifecycleManager.class);
+ Mockito.when(manager.getDatasetInfo(DATASET_ID)).thenReturn(dsInfo);
+
+ ILSMMergePolicy policy = new CorrelatedPrefixMergePolicy(manager,
DATASET_ID);
+ policy.configure(properties);
+ return policy;
+ }
+
+ private IndexInfo mockIndex(boolean isPrimary, List<ILSMDiskComponentID>
componentIDs,
+ List<ILSMDiskComponentID> resultComponentIDs, int partition)
throws HyracksDataException {
+ List<ILSMDiskComponent> components = new ArrayList<>();
+ for (ILSMDiskComponentID id : componentIDs) {
+ ILSMDiskComponent component =
Mockito.mock(ILSMDiskComponent.class);
+ Mockito.when(component.getComponentID()).thenReturn(id);
+
Mockito.when(component.getComponentSize()).thenReturn(DEFAULT_COMPONENT_SIZE);
+
Mockito.when(component.getState()).thenReturn(ComponentState.READABLE_UNWRITABLE);
+ components.add(component);
+ }
+
+ ILSMIndex index = Mockito.mock(ILSMIndex.class);
+ Mockito.when(index.getImmutableComponents()).thenReturn(components);
+
+ ILSMIndexAccessor accessor = Mockito.mock(ILSMIndexAccessor.class);
+ Mockito.doAnswer(new Answer<Void>() {
+
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ List<ILSMDiskComponent> mergedComponents =
invocation.getArgumentAt(1, List.class);
+ mergedComponents.forEach(component -> {
+ try {
+ resultComponentIDs.add(component.getComponentID());
+ } catch (HyracksDataException e) {
+ e.printStackTrace();
+ }
+ });
+ return null;
+ }
+
}).when(accessor).scheduleMerge(Mockito.any(ILSMIOOperationCallback.class),
+ Mockito.anyListOf(ILSMDiskComponent.class));
+
+
Mockito.when(index.createAccessor(Mockito.any(IModificationOperationCallback.class),
+
Mockito.any(ISearchOperationCallback.class))).thenReturn(accessor);
+ Mockito.when(index.isPrimaryIndex()).thenReturn(isPrimary);
+
+ return new IndexInfo(index, DATASET_ID, 0, partition);
+ }
+
+}
diff --git
a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/memory/ConcurrentFramePoolUnitTest.java
b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/memory/ConcurrentFramePoolUnitTest.java
index 9be5837..0243a63 100644
---
a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/memory/ConcurrentFramePoolUnitTest.java
+++
b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/memory/ConcurrentFramePoolUnitTest.java
@@ -60,8 +60,8 @@
public void testMemoryManager() {
ActiveProperties afp = Mockito.mock(ActiveProperties.class);
Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET);
- ConcurrentFramePool fmm = new ConcurrentFramePool("TestNode",
afp.getMemoryComponentGlobalBudget(),
- DEFAULT_FRAME_SIZE);
+ ConcurrentFramePool fmm =
+ new ConcurrentFramePool("TestNode",
afp.getMemoryComponentGlobalBudget(), DEFAULT_FRAME_SIZE);
int i = 0;
while (fmm.get() != null) {
i++;
@@ -75,8 +75,8 @@
try {
ActiveProperties afp = Mockito.mock(ActiveProperties.class);
Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET);
- ConcurrentFramePool fmm = new ConcurrentFramePool("TestNode",
afp.getMemoryComponentGlobalBudget(),
- DEFAULT_FRAME_SIZE);
+ ConcurrentFramePool fmm =
+ new ConcurrentFramePool("TestNode",
afp.getMemoryComponentGlobalBudget(), DEFAULT_FRAME_SIZE);
FixedSizeAllocator[] runners = new FixedSizeAllocator[NUM_THREADS];
Thread[] threads = new Thread[NUM_THREADS];
Arrays.parallelSetAll(runners, (int i) -> new
FixedSizeAllocator(fmm));
@@ -106,8 +106,8 @@
try {
ActiveProperties afp = Mockito.mock(ActiveProperties.class);
Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET);
- ConcurrentFramePool fmm = new ConcurrentFramePool("TestNode",
afp.getMemoryComponentGlobalBudget(),
- DEFAULT_FRAME_SIZE);
+ ConcurrentFramePool fmm =
+ new ConcurrentFramePool("TestNode",
afp.getMemoryComponentGlobalBudget(), DEFAULT_FRAME_SIZE);
Random random = new Random();
int i = 0;
int req;
@@ -141,8 +141,8 @@
try {
ActiveProperties afp = Mockito.mock(ActiveProperties.class);
Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET);
- ConcurrentFramePool fmm = new ConcurrentFramePool("TestNode",
afp.getMemoryComponentGlobalBudget(),
- DEFAULT_FRAME_SIZE);
+ ConcurrentFramePool fmm =
+ new ConcurrentFramePool("TestNode",
afp.getMemoryComponentGlobalBudget(), DEFAULT_FRAME_SIZE);
VarSizeAllocator[] runners = new VarSizeAllocator[NUM_THREADS];
Thread[] threads = new Thread[NUM_THREADS];
@@ -180,8 +180,8 @@
public void testAcquireReleaseMemoryManager() throws HyracksDataException {
ActiveProperties afp = Mockito.mock(ActiveProperties.class);
Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET);
- ConcurrentFramePool fmm = new ConcurrentFramePool("TestNode",
afp.getMemoryComponentGlobalBudget(),
- DEFAULT_FRAME_SIZE);
+ ConcurrentFramePool fmm =
+ new ConcurrentFramePool("TestNode",
afp.getMemoryComponentGlobalBudget(), DEFAULT_FRAME_SIZE);
Random random = new Random();
ArrayDeque<ByteBuffer> stack = new ArrayDeque<>();
while (true) {
@@ -213,8 +213,8 @@
try {
ActiveProperties afp = Mockito.mock(ActiveProperties.class);
Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET);
- ConcurrentFramePool fmm = new ConcurrentFramePool("TestNode",
afp.getMemoryComponentGlobalBudget(),
- DEFAULT_FRAME_SIZE);
+ ConcurrentFramePool fmm =
+ new ConcurrentFramePool("TestNode",
afp.getMemoryComponentGlobalBudget(), DEFAULT_FRAME_SIZE);
FixedSizeGoodAllocator[] runners = new
FixedSizeGoodAllocator[NUM_THREADS];
Thread[] threads = new Thread[NUM_THREADS];
Arrays.parallelSetAll(runners, (int i) -> new
FixedSizeGoodAllocator(fmm));
@@ -244,8 +244,8 @@
try {
ActiveProperties afp = Mockito.mock(ActiveProperties.class);
Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET);
- ConcurrentFramePool fmm = new ConcurrentFramePool("TestNode",
afp.getMemoryComponentGlobalBudget(),
- DEFAULT_FRAME_SIZE);
+ ConcurrentFramePool fmm =
+ new ConcurrentFramePool("TestNode",
afp.getMemoryComponentGlobalBudget(), DEFAULT_FRAME_SIZE);
Random random = new Random();
ArrayDeque<ByteBuffer> stack = new ArrayDeque<>();
int i = 0;
@@ -297,8 +297,8 @@
try {
ActiveProperties afp = Mockito.mock(ActiveProperties.class);
Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET);
- ConcurrentFramePool fmm = new ConcurrentFramePool("TestNode",
afp.getMemoryComponentGlobalBudget(),
- DEFAULT_FRAME_SIZE);
+ ConcurrentFramePool fmm =
+ new ConcurrentFramePool("TestNode",
afp.getMemoryComponentGlobalBudget(), DEFAULT_FRAME_SIZE);
VarSizeGoodAllocator[] runners = new
VarSizeGoodAllocator[NUM_THREADS];
Thread[] threads = new Thread[NUM_THREADS];
Arrays.parallelSetAll(runners, (int i) -> new
VarSizeGoodAllocator(fmm));
@@ -333,8 +333,8 @@
try {
ActiveProperties afp = Mockito.mock(ActiveProperties.class);
Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET);
- ConcurrentFramePool fmm = new ConcurrentFramePool("TestNode",
afp.getMemoryComponentGlobalBudget(),
- DEFAULT_FRAME_SIZE);
+ ConcurrentFramePool fmm =
+ new ConcurrentFramePool("TestNode",
afp.getMemoryComponentGlobalBudget(), DEFAULT_FRAME_SIZE);
int i = 0;
ByteBuffer buffer = ByteBuffer.allocate(DEFAULT_FRAME_SIZE);
LinkedBlockingDeque<ByteBuffer> buffers = new
LinkedBlockingDeque<>();
@@ -399,8 +399,8 @@
try {
ActiveProperties afp = Mockito.mock(ActiveProperties.class);
Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET);
- ConcurrentFramePool fmm = new ConcurrentFramePool("TestNode",
afp.getMemoryComponentGlobalBudget(),
- DEFAULT_FRAME_SIZE);
+ ConcurrentFramePool fmm =
+ new ConcurrentFramePool("TestNode",
afp.getMemoryComponentGlobalBudget(), DEFAULT_FRAME_SIZE);
int i = 0;
ByteBuffer buffer = ByteBuffer.allocate(DEFAULT_FRAME_SIZE);
LinkedBlockingDeque<ByteBuffer> buffers = new
LinkedBlockingDeque<>();
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponent.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponent.java
index 335e84e..c863e5a 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponent.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponent.java
@@ -46,4 +46,7 @@
* @throws HyracksDataException
*/
void destroy() throws HyracksDataException;
+
+ ILSMDiskComponentID getComponentID() throws HyracksDataException;
+
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponentID.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponentID.java
new file mode 100644
index 0000000..3f6b516
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponentID.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.common.api;
+
+import
org.apache.hyracks.storage.am.common.freepage.MutableArrayValueReference;
+
+public interface ILSMDiskComponentID {
+
+ public static final long NOT_FOUND = -1;
+
+ public static final MutableArrayValueReference COMPONENT_ID_MIN_KEY =
+ new MutableArrayValueReference("Component_ID_Min".getBytes());
+
+ public static final MutableArrayValueReference COMPONENT_ID_MAX_KEY =
+ new MutableArrayValueReference("Component_ID_Max".getBytes());
+
+ long getMinID();
+
+ long getMaxID();
+
+ default boolean notFound() {
+ return getMinID() == NOT_FOUND || getMaxID() == NOT_FOUND;
+ }
+
+}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java
index 508a6cc..9371437 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java
@@ -18,13 +18,19 @@
*/
package org.apache.hyracks.storage.am.lsm.common.impls;
+import java.util.logging.Logger;
+
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.common.api.IMetadataPageManager;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilter;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentID;
import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
+import org.apache.hyracks.storage.am.lsm.common.utils.ComponentMetadataUtil;
public abstract class AbstractLSMDiskComponent extends AbstractLSMComponent
implements ILSMDiskComponent {
+
+ private static final Logger LOGGER =
Logger.getLogger(AbstractLSMDiskComponent.class.getName());
private final DiskComponentMetadata metadata;
@@ -96,4 +102,14 @@
public DiskComponentMetadata getMetadata() {
return metadata;
}
+
+ @Override
+ public ILSMDiskComponentID getComponentID() throws HyracksDataException {
+ long minID = ComponentMetadataUtil.getLong(metadata,
ILSMDiskComponentID.COMPONENT_ID_MIN_KEY,
+ ILSMDiskComponentID.NOT_FOUND);
+ long maxID = ComponentMetadataUtil.getLong(metadata,
ILSMDiskComponentID.COMPONENT_ID_MAX_KEY,
+ ILSMDiskComponentID.NOT_FOUND);
+ //TODO: do we need to throw an exception when ID is not found?
+ return new LSMDiskComponentID(minID, maxID);
+ }
}
\ No newline at end of file
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMDiskComponentID.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMDiskComponentID.java
new file mode 100644
index 0000000..dedff15
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMDiskComponentID.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.storage.am.lsm.common.impls;
+
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentID;
+
+public class LSMDiskComponentID implements ILSMDiskComponentID {
+
+ private final long minID;
+
+ private final long maxID;
+
+ public LSMDiskComponentID(long minID, long maxID) {
+ this.minID = minID;
+ this.maxID = maxID;
+ }
+
+ @Override
+ public long getMinID() {
+ return this.minID;
+ }
+
+ @Override
+ public long getMaxID() {
+ return this.maxID;
+ }
+
+ @Override
+ public String toString() {
+ return "[" + minID + "," + maxID + "]";
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + (int) (maxID ^ (maxID >>> 32));
+ result = prime * result + (int) (minID ^ (minID >>> 32));
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ LSMDiskComponentID other = (LSMDiskComponentID) obj;
+ if (maxID != other.maxID)
+ return false;
+ if (minID != other.minID)
+ return false;
+ return true;
+ }
+
+}
--
To view, visit https://asterix-gerrit.ics.uci.edu/1761
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I768ee9ac0a8d3c99c631086093a6b778b2e7588e
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Luo Chen <[email protected]>