Repository: asterixdb Updated Branches: refs/heads/master 8bbf08131 -> 1aeb8b6ce
[ASTERIXDB-2453] Add Improved Constant Merge Policy - user model changes: no - storage format changes: no - interface changes: no Details: - The current constant merge policy is unsuable because of its high merge cost, i.e., O(N*N) where N is the number of flushes. This patch replaces the previous constant merge policy with a more efficient policy that still enforces a maximum number of components but greatly lowers the merge cost. - Extend AbstractLSMIndex with a method to return the total number of flushes, based on the file name sequencer. This is required by the new policy. Change-Id: Ie5f83a4d5fdd3f036b823c906df1760f5110ae0a Reviewed-on: https://asterix-gerrit.ics.uci.edu/2971 Sonar-Qube: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Reviewed-by: Murtadha Hubail <mhub...@apache.org> Contrib: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/1aeb8b6c Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/1aeb8b6c Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/1aeb8b6c Branch: refs/heads/master Commit: 1aeb8b6cef3a50d7639749c96538ec2cecaaad9c Parents: 8bbf081 Author: luochen01 <cl...@uci.edu> Authored: Thu Sep 27 14:36:06 2018 -0700 Committer: Luo Chen <cl...@uci.edu> Committed: Thu Sep 27 16:16:57 2018 -0700 ---------------------------------------------------------------------- .../am/lsm/common/impls/AbstractLSMIndex.java | 15 ++ .../lsm/common/impls/ConstantMergePolicy.java | 161 ++++++++++--------- 2 files changed, 102 insertions(+), 74 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1aeb8b6c/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java index 9199fbb..d3133ce 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java @@ -20,12 +20,14 @@ package org.apache.hyracks.storage.am.lsm.common.impls; import java.io.IOException; +import java.nio.file.Paths; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -901,4 +903,17 @@ public abstract class AbstractLSMIndex implements ILSMIndex { protected abstract ILSMDiskComponent doFlush(ILSMIOOperation operation) throws HyracksDataException; protected abstract ILSMDiskComponent doMerge(ILSMIOOperation operation) throws HyracksDataException; + + public Optional<Long> getLatestDiskComponentSequence() { + if (diskComponents.isEmpty()) { + return Optional.empty(); + } + final ILSMDiskComponent latestDiskComponent = diskComponents.get(0); + final Set<String> diskComponentPhysicalFiles = latestDiskComponent.getLSMComponentPhysicalFiles(); + final String fileName = diskComponentPhysicalFiles.stream().findAny() + .orElseThrow(() -> new IllegalStateException("Disk component without any physical files")); + return Optional + .of(IndexComponentFileReference.of(Paths.get(fileName).getFileName().toString()).getSequenceEnd()); + } + } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1aeb8b6c/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java index c642d82..5b71770 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java @@ -19,107 +19,114 @@ package org.apache.hyracks.storage.am.lsm.common.impls; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Optional; import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.storage.am.common.impls.IndexAccessParameters; import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters; -import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.ComponentState; import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor; import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicy; -import org.apache.hyracks.storage.common.IIndexAccessParameters; public class ConstantMergePolicy implements ILSMMergePolicy { private int numComponents; + private int[][] binomial; + @Override public void diskComponentAdded(final ILSMIndex index, boolean fullMergeIsRequested) throws HyracksDataException { - List<ILSMDiskComponent> immutableComponents = index.getDiskComponents(); - - if (!areComponentsMergable(immutableComponents)) { + List<ILSMDiskComponent> immutableComponents = new ArrayList<>(index.getDiskComponents()); + if (!areComponentsReadableWritableState(immutableComponents)) { return; } - if (fullMergeIsRequested) { - IIndexAccessParameters iap = - new IndexAccessParameters(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); - ILSMIndexAccessor accessor = index.createAccessor(iap); - accessor.scheduleFullMerge(); - } else if (immutableComponents.size() >= numComponents) { ILSMIndexAccessor accessor = index.createAccessor(NoOpIndexAccessParameters.INSTANCE); - accessor.scheduleMerge(immutableComponents); + accessor.scheduleFullMerge(); + return; } + scheduleMerge(index); } - @Override - public void configure(Map<String, String> properties) { - numComponents = Integer.parseInt(properties.get(ConstantMergePolicyFactory.NUM_COMPONENTS)); + private boolean scheduleMerge(final ILSMIndex index) throws HyracksDataException { + Optional<Long> latestSeq = ((AbstractLSMIndex) index).getLatestDiskComponentSequence(); + if (!latestSeq.isPresent()) { + return false; + } + // sequence number starts from 0, and thus latestSeq + 1 gives the number of flushes + int numFlushes = latestSeq.get().intValue() + 1; + List<ILSMDiskComponent> immutableComponents = new ArrayList<>(index.getDiskComponents()); + Collections.reverse(immutableComponents); + int size = immutableComponents.size(); + int depth = 0; + while (treeDepth(depth) < numFlushes) { + depth++; + } + int mergedIndex = + binomialIndex(depth, Math.min(depth, numComponents) - 1, numFlushes - treeDepth(depth - 1) - 1); + if (mergedIndex == size - 1) { + return false; + } + long mergeSize = 0; + List<ILSMDiskComponent> mergableComponents = new ArrayList<ILSMDiskComponent>(); + for (int i = mergedIndex; i < immutableComponents.size(); i++) { + mergeSize = mergeSize + immutableComponents.get(i).getComponentSize(); + mergableComponents.add(immutableComponents.get(i)); + } + Collections.reverse(mergableComponents); + ILSMIndexAccessor accessor = index.createAccessor(NoOpIndexAccessParameters.INSTANCE); + accessor.scheduleMerge(mergableComponents); + return true; } - @Override - public boolean isMergeLagging(ILSMIndex index) throws HyracksDataException { - // see PrefixMergePolicy.isMergeLagging() for the rationale behind this code. - - /** - * case 1. - * if totalImmutableCommponentCount < threshold, - * merge operation is not lagged ==> return false. - * case 2. - * if a) totalImmutableCommponentCount >= threshold && b) there is an ongoing merge, - * merge operation is lagged. ==> return true. - * case 3. *SPECIAL CASE* - * if a) totalImmutableCommponentCount >= threshold && b) there is *NO* ongoing merge, - * merge operation is lagged. ==> *schedule a merge operation* and then return true. - * This is a special case that requires to schedule a merge operation. - * Otherwise, all flush operations will be hung. - * This case can happen in a following situation: - * The system may crash when - * condition 1) the mergableImmutableCommponentCount >= threshold and - * condition 2) merge operation is going on. - * After the system is recovered, still condition 1) is true. - * If there are flush operations in the same dataset partition after the recovery, - * all these flush operations may not proceed since there is no ongoing merge and - * there will be no new merge either in this situation. - */ - - List<ILSMDiskComponent> immutableComponents = index.getDiskComponents(); - int totalImmutableComponentCount = immutableComponents.size(); - - // [case 1] - if (totalImmutableComponentCount < numComponents) { - return false; + private int treeDepth(int d) { + if (d < 0) { + return 0; } + return treeDepth(d - 1) + binomialChoose(d + Math.min(d, numComponents) - 1, d); + } - boolean isMergeOngoing = isMergeOngoing(immutableComponents); + private int binomialIndex(int d, int h, int t) { + if (t < 0 || t > binomialChoose(d + h, h)) { + throw new IllegalStateException("Illegal binomial values"); + } + if (t == 0) { + return 0; + } else if (t < binomialChoose(d + h - 1, h)) { + return binomialIndex(d - 1, h, t); + } + return binomialIndex(d, h - 1, t - binomialChoose(d + h - 1, h)) + 1; + } - // here, implicitly (totalImmutableComponentCount >= numComponents) is true by passing case 1. - if (isMergeOngoing) { - // [case 2] - return true; - } else { - // [case 3] - // schedule a merge operation after making sure that all components are mergable - if (!areComponentsMergable(immutableComponents)) { - throw new IllegalStateException(); + private int binomialChoose(int n, int k) { + if (k < 0 || k > n) { + return 0; + } + if (k == 0 || k == n) { + return 1; + } + // For efficiency, binomial is persisted to avoid re-computations for every merge + if (binomial == null || binomial.length <= n) { + binomial = new int[n + 1][n + 1]; + for (int r = 0; r <= n; r++) { + for (int c = 0; c <= r; c++) { + if (c == 0 || c == r) { + binomial[r][c] = 1; + } else { + binomial[r][c] = binomial[r - 1][c - 1] + binomial[r - 1][c]; + } + } } - ILSMIndexAccessor accessor = index.createAccessor(NoOpIndexAccessParameters.INSTANCE); - accessor.scheduleMerge(immutableComponents); - return true; } + return binomial[n][k]; } - /** - * checks whether all given components are mergable or not - * - * @param immutableComponents - * @return true if all components are mergable, false otherwise. - */ - private boolean areComponentsMergable(List<ILSMDiskComponent> immutableComponents) { + private boolean areComponentsReadableWritableState(List<ILSMDiskComponent> immutableComponents) { for (ILSMComponent c : immutableComponents) { if (c.getState() != ComponentState.READABLE_UNWRITABLE) { return false; @@ -128,12 +135,18 @@ public class ConstantMergePolicy implements ILSMMergePolicy { return true; } - /** - * This method returns whether there is an ongoing merge operation or not by checking - * each component state of given components. - * - * @return true if there is an ongoing merge operation, false otherwise. - */ + @Override + public void configure(Map<String, String> properties) { + numComponents = Integer.parseInt(properties.get(ConstantMergePolicyFactory.NUM_COMPONENTS)); + } + + @Override + public boolean isMergeLagging(ILSMIndex index) throws HyracksDataException { + // TODO: for now, we simply block the ingestion when there is an ongoing merge + List<ILSMDiskComponent> immutableComponents = index.getDiskComponents(); + return isMergeOngoing(immutableComponents); + } + private boolean isMergeOngoing(List<ILSMDiskComponent> immutableComponents) { int size = immutableComponents.size(); for (int i = 0; i < size; i++) {