Luo Chen has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/2971
Change subject: [ASTERIXDB-2453] Add Improved Constant Merge Policy
......................................................................
[ASTERIXDB-2453] Add Improved Constant Merge Policy
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- The current constant merge policy is unsuable because of its high
merge cost, i.e., O(N*N) where N is the number of flushes. This patch
replaces the previous constant merge policy with a more efficient policy
that still enforces a maximum number of components but greatly lowers
the merge cost.
- Extend AbstractLSMIndex with a method to return the total number of
flushes, based on the file name sequencer. This is required by the new
policy.
Change-Id: Ie5f83a4d5fdd3f036b823c906df1760f5110ae0a
---
M
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFileManager.java
M
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyFileManager.java
M
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
M
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java
M
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java
M
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFileManager.java
M
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFileManager.java
M
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesFileManager.java
M
hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/component/TestLsmIndexFileManager.java
9 files changed, 142 insertions(+), 80 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/71/2971/1
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFileManager.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFileManager.java
index 2240fd9..5b56ced 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFileManager.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFileManager.java
@@ -212,4 +212,10 @@
}
return new LSMComponentFileReferences(bTreeFileRef, null,
bloomFilterFileRef);
}
+
+ @Override
+ public long getCurrentComponentSequence() throws HyracksDataException {
+ checkAndInitializeComponentSequence(btreeFilter);
+ return super.getCurrentComponentSequence();
+ }
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyFileManager.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyFileManager.java
index 8fb3751..343ec5d 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyFileManager.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyFileManager.java
@@ -214,4 +214,9 @@
return new LSMComponentFileReferences(bTreeFileRef, buddyBTreeFileRef,
bloomFilterFileRef);
}
+ @Override
+ public long getCurrentComponentSequence() throws HyracksDataException {
+ checkAndInitializeComponentSequence(btreeFilter);
+ return super.getCurrentComponentSequence();
+ }
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
index 3d928a4..05967ad 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
@@ -901,4 +901,8 @@
protected abstract ILSMDiskComponent doFlush(ILSMIOOperation operation)
throws HyracksDataException;
protected abstract ILSMDiskComponent doMerge(ILSMIOOperation operation)
throws HyracksDataException;
+
+ public long getNumFlushes() throws HyracksDataException {
+ return ((AbstractLSMIndexFileManager)
fileManager).getCurrentComponentSequence() + 1;
+ }
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java
index 904029b..c0738c1 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java
@@ -356,10 +356,14 @@
}
protected String getNextComponentSequence(FilenameFilter filenameFilter)
throws HyracksDataException {
+ checkAndInitializeComponentSequence(filenameFilter);
+ return
IndexComponentFileReference.getFlushSequence(++lastUsedComponentSeq);
+ }
+
+ protected void checkAndInitializeComponentSequence(FilenameFilter
filenameFilter) throws HyracksDataException {
if (lastUsedComponentSeq == UNINITALIZED_COMPONENT_SEQ) {
lastUsedComponentSeq =
getOnDiskLastUsedComponentSequence(filenameFilter);
}
- return
IndexComponentFileReference.getFlushSequence(++lastUsedComponentSeq);
}
private long getOnDiskLastUsedComponentSequence(FilenameFilter
filenameFilter) throws HyracksDataException {
@@ -370,4 +374,8 @@
}
return maxComponentSeq;
}
+
+ public long getCurrentComponentSequence() throws HyracksDataException {
+ return lastUsedComponentSeq;
+ }
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java
index c642d82..6df28ce 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java
@@ -19,41 +19,117 @@
package org.apache.hyracks.storage.am.lsm.common.impls;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.logging.Logger;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.storage.am.common.impls.IndexAccessParameters;
import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters;
-import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
import
org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.ComponentState;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
-import org.apache.hyracks.storage.common.IIndexAccessParameters;
public class ConstantMergePolicy implements ILSMMergePolicy {
+ private static final Logger LOGGER =
Logger.getLogger(AbstractBinomialMergePolicy.class.getName());
+
private int numComponents;
+
+ private int[][] binomial;
@Override
public void diskComponentAdded(final ILSMIndex index, boolean
fullMergeIsRequested) throws HyracksDataException {
- List<ILSMDiskComponent> immutableComponents =
index.getDiskComponents();
-
- if (!areComponentsMergable(immutableComponents)) {
+ List<ILSMDiskComponent> immutableComponents = new
ArrayList<>(index.getDiskComponents());
+ if (!areComponentsReadableWritableState(immutableComponents)) {
return;
}
-
if (fullMergeIsRequested) {
- IIndexAccessParameters iap =
- new IndexAccessParameters(NoOpOperationCallback.INSTANCE,
NoOpOperationCallback.INSTANCE);
- ILSMIndexAccessor accessor = index.createAccessor(iap);
- accessor.scheduleFullMerge();
- } else if (immutableComponents.size() >= numComponents) {
ILSMIndexAccessor accessor =
index.createAccessor(NoOpIndexAccessParameters.INSTANCE);
- accessor.scheduleMerge(immutableComponents);
+ accessor.scheduleFullMerge();
+ return;
}
+ scheduleMerge(index);
+ }
+
+ private boolean scheduleMerge(final ILSMIndex index) throws
HyracksDataException {
+ int numFlushes = (int) ((AbstractLSMIndex) index).getNumFlushes();
+ List<ILSMDiskComponent> immutableComponents = new
ArrayList<>(index.getDiskComponents());
+ Collections.reverse(immutableComponents);
+ int size = immutableComponents.size();
+ int depth = 0;
+ while ((treeDepth(depth) < numFlushes)) {
+ depth++;
+ }
+ int mergedIndex =
+ binomialIndex(depth, Math.min(depth, numComponents) - 1,
numFlushes - treeDepth(depth - 1) - 1);
+ if (mergedIndex == size - 1) {
+ return false;
+ }
+ long mergeSize = 0;
+ List<ILSMDiskComponent> mergableComponents = new
ArrayList<ILSMDiskComponent>();
+ for (int i = mergedIndex; i < immutableComponents.size(); i++) {
+ mergeSize = mergeSize +
immutableComponents.get(i).getComponentSize();
+ mergableComponents.add(immutableComponents.get(i));
+ }
+ Collections.reverse(mergableComponents);
+ ILSMIndexAccessor accessor =
index.createAccessor(NoOpIndexAccessParameters.INSTANCE);
+ accessor.scheduleMerge(mergableComponents);
+ return true;
+ }
+
+ private int treeDepth(int d) {
+ if (d < 0) {
+ return 0;
+ }
+ return treeDepth(d - 1) + binomialChoose(d + Math.min(d,
numComponents) - 1, d);
+ }
+
+ private int binomialIndex(int d, int h, int t) {
+ if (t < 0 || t > binomialChoose(d + h, h)) {
+ throw new IllegalStateException("Illegal binomial values");
+ }
+ if (t == 0) {
+ return 0;
+ } else if (t < binomialChoose(d + h - 1, h)) {
+ return binomialIndex(d - 1, h, t);
+ }
+ return binomialIndex(d, h - 1, t - binomialChoose(d + h - 1, h)) + 1;
+ }
+
+ private int binomialChoose(int n, int k) {
+ if (k < 0 || k > n) {
+ return 0;
+ }
+ if (k == 0 || k == n) {
+ return 1;
+ }
+ // For efficiency, binomial is persisted to avoid re-computations for
every merge
+ if (binomial == null || binomial.length <= n) {
+ binomial = new int[n + 1][n + 1];
+ for (int r = 0; r <= n; r++) {
+ for (int c = 0; c <= r; c++) {
+ if (c == 0 || c == r) {
+ binomial[r][c] = 1;
+ } else {
+ binomial[r][c] = binomial[r - 1][c - 1] + binomial[r -
1][c];
+ }
+ }
+ }
+ }
+ return binomial[n][k];
+ }
+
+ private boolean areComponentsReadableWritableState(List<ILSMDiskComponent>
immutableComponents) {
+ for (ILSMComponent c : immutableComponents) {
+ if (c.getState() != ComponentState.READABLE_UNWRITABLE) {
+ return false;
+ }
+ }
+ return true;
}
@Override
@@ -63,77 +139,15 @@
@Override
public boolean isMergeLagging(ILSMIndex index) throws HyracksDataException
{
- // see PrefixMergePolicy.isMergeLagging() for the rationale behind
this code.
-
- /**
- * case 1.
- * if totalImmutableCommponentCount < threshold,
- * merge operation is not lagged ==> return false.
- * case 2.
- * if a) totalImmutableCommponentCount >= threshold && b) there is an
ongoing merge,
- * merge operation is lagged. ==> return true.
- * case 3. *SPECIAL CASE*
- * if a) totalImmutableCommponentCount >= threshold && b) there is
*NO* ongoing merge,
- * merge operation is lagged. ==> *schedule a merge operation* and
then return true.
- * This is a special case that requires to schedule a merge operation.
- * Otherwise, all flush operations will be hung.
- * This case can happen in a following situation:
- * The system may crash when
- * condition 1) the mergableImmutableCommponentCount >= threshold and
- * condition 2) merge operation is going on.
- * After the system is recovered, still condition 1) is true.
- * If there are flush operations in the same dataset partition after
the recovery,
- * all these flush operations may not proceed since there is no
ongoing merge and
- * there will be no new merge either in this situation.
- */
-
+ // TODO: for now, we simply block the ingestion when there is an
ongoing merge
List<ILSMDiskComponent> immutableComponents =
index.getDiskComponents();
- int totalImmutableComponentCount = immutableComponents.size();
-
- // [case 1]
- if (totalImmutableComponentCount < numComponents) {
- return false;
- }
-
boolean isMergeOngoing = isMergeOngoing(immutableComponents);
-
- // here, implicitly (totalImmutableComponentCount >= numComponents) is
true by passing case 1.
if (isMergeOngoing) {
- // [case 2]
- return true;
- } else {
- // [case 3]
- // schedule a merge operation after making sure that all
components are mergable
- if (!areComponentsMergable(immutableComponents)) {
- throw new IllegalStateException();
- }
- ILSMIndexAccessor accessor =
index.createAccessor(NoOpIndexAccessParameters.INSTANCE);
- accessor.scheduleMerge(immutableComponents);
return true;
}
+ return false;
}
- /**
- * checks whether all given components are mergable or not
- *
- * @param immutableComponents
- * @return true if all components are mergable, false otherwise.
- */
- private boolean areComponentsMergable(List<ILSMDiskComponent>
immutableComponents) {
- for (ILSMComponent c : immutableComponents) {
- if (c.getState() != ComponentState.READABLE_UNWRITABLE) {
- return false;
- }
- }
- return true;
- }
-
- /**
- * This method returns whether there is an ongoing merge operation or not
by checking
- * each component state of given components.
- *
- * @return true if there is an ongoing merge operation, false otherwise.
- */
private boolean isMergeOngoing(List<ILSMDiskComponent>
immutableComponents) {
int size = immutableComponents.size();
for (int i = 0; i < size; i++) {
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFileManager.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFileManager.java
index 4471102..5e82bd1 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFileManager.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFileManager.java
@@ -183,4 +183,10 @@
String file = dictBTreeFilePath.substring(0, index);
return file + DELIMITER + INVLISTS_SUFFIX;
}
+
+ @Override
+ public long getCurrentComponentSequence() throws HyracksDataException {
+ checkAndInitializeComponentSequence(deletedKeysBTreeFilter);
+ return super.getCurrentComponentSequence();
+ }
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFileManager.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFileManager.java
index 3348407..cbc2854 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFileManager.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFileManager.java
@@ -216,4 +216,10 @@
}
return new LSMComponentFileReferences(rTreeFileRef, bTreeFileRef,
bloomFilterFileRef);
}
+
+ @Override
+ public long getCurrentComponentSequence() throws HyracksDataException {
+ checkAndInitializeComponentSequence(btreeFilter);
+ return super.getCurrentComponentSequence();
+ }
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesFileManager.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesFileManager.java
index e39033f..e380a30 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesFileManager.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesFileManager.java
@@ -19,6 +19,7 @@
package org.apache.hyracks.storage.am.lsm.rtree.impls;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.api.io.IIOManager;
import org.apache.hyracks.storage.am.common.api.ITreeIndex;
@@ -31,4 +32,10 @@
TreeIndexFactory<? extends ITreeIndex> rtreeFactory) {
super(ioManager, file, rtreeFactory);
}
+
+ @Override
+ public long getCurrentComponentSequence() throws HyracksDataException {
+ checkAndInitializeComponentSequence(COMPONENT_FILES_FILTER);
+ return super.getCurrentComponentSequence();
+ }
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/component/TestLsmIndexFileManager.java
b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/component/TestLsmIndexFileManager.java
index c255ee5..c79ae19 100644
---
a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/component/TestLsmIndexFileManager.java
+++
b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/component/TestLsmIndexFileManager.java
@@ -22,6 +22,7 @@
import java.io.FilenameFilter;
import java.util.ArrayList;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.api.io.IIOManager;
import org.apache.hyracks.storage.am.common.api.ITreeIndex;
@@ -33,7 +34,7 @@
public class TestLsmIndexFileManager extends AbstractLSMIndexFileManager {
- private long componentSeq = 0;
+ private long componentSeq = -1;
public TestLsmIndexFileManager(IIOManager ioManager, FileReference file,
TreeIndexFactory<? extends ITreeIndex> treeIndexFactory) {
@@ -53,7 +54,12 @@
@Override
public LSMComponentFileReferences getRelFlushFileReference() {
- String sequence =
IndexComponentFileReference.getFlushSequence(componentSeq++);
+ String sequence =
IndexComponentFileReference.getFlushSequence(++componentSeq);
return new LSMComponentFileReferences(baseDir.getChild(sequence),
null, null);
}
+
+ @Override
+ public long getCurrentComponentSequence() throws HyracksDataException {
+ return componentSeq;
+ }
}
--
To view, visit https://asterix-gerrit.ics.uci.edu/2971
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: Ie5f83a4d5fdd3f036b823c906df1760f5110ae0a
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Luo Chen <[email protected]>