Luo Chen has uploaded a new change for review. https://asterix-gerrit.ics.uci.edu/1818
Change subject: Avoid always merging old components in prefix policy ...................................................................... Avoid always merging old components in prefix policy Current, the prefix policy always looks at the components from oldest to newest to schedule merge. One negative consequence is that the oldest (largest) component gets merged over and over again until it reaches the size limit. This is undesirable since it takes O(n^2) disk IOs (n is the number of flushed components) to produce a final component. This patch is a temporary fix of this behavior, taken from the idea of HBase compaction policy (https://www.ngdata.com/visualizing-hbase -flushes-and-compactions/). The basic idea is that it introduces some size factor (for now it's 1.2) to control the merge behavior. When the prefix policy finds a sequence of components to merge, we also check the oldest (largest) component in the sequence should be smaller than 1.2*the total size of all younger components. By doing so, we can avoid merging oldest components over and over again, making the disk IOs O(nlog n). Change-Id: I464da3fed38cded0aee7b319a35664eae069a2ba --- M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/PrefixMergePolicy.java M hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/pom.xml A hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/test/PrefixMergePolicyTest.java 4 files changed, 250 insertions(+), 40 deletions(-) git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/18/1818/1 diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java index 2a0631c..03f156c 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java @@ -18,8 +18,8 @@ */ package org.apache.asterix.api.common; -import static org.apache.asterix.api.common.AsterixHyracksIntegrationUtil.LoggerHolder.LOGGER; -import static org.apache.hyracks.util.file.FileUtil.joinPath; +import static org.apache.asterix.api.common.AsterixHyracksIntegrationUtil.LoggerHolder.*; +import static org.apache.hyracks.util.file.FileUtil.*; import java.io.File; import java.io.IOException; @@ -272,7 +272,7 @@ } } }); - System.setProperty(GlobalConfig.CONFIG_FILE_PROPERTY, "asterix-build-configuration.xml"); + System.setProperty(GlobalConfig.CONFIG_FILE_PROPERTY, "asterix-build-configuration-lsm.xml"); init(cleanupOnStart); while (true) { diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/PrefixMergePolicy.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/PrefixMergePolicy.java index 23646b9..ae6b98d 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/PrefixMergePolicy.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/PrefixMergePolicy.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.Map; +import org.apache.commons.lang3.tuple.Pair; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent; @@ -38,10 +39,19 @@ private long maxMergableComponentSize; private int maxToleranceComponentCount; + /** + * This parameter is used to avoid merging a big component with a sequence of small components. + * If a component is larger than ratio * all younger disk components in the merge list, then + * this old (large) component is ignored in this round. + * Since it's a temporary fix, we don't set this parameter as configurable in order not to + * disturb the users + */ + private final double maxMergableComponentSizeRatio = 1.2; + @Override public void diskComponentAdded(final ILSMIndex index, boolean fullMergeIsRequested) throws HyracksDataException { - ArrayList<ILSMDiskComponent> immutableComponents = new ArrayList<>(index.getImmutableComponents()); + List<ILSMDiskComponent> immutableComponents = new ArrayList<>(index.getImmutableComponents()); if (!areComponentsReadableWritableState(immutableComponents)) { return; @@ -129,7 +139,8 @@ * there will be no new merge either in this situation. */ - List<ILSMDiskComponent> immutableComponents = index.getImmutableComponents(); + List<ILSMDiskComponent> immutableComponents = new ArrayList<>(index.getImmutableComponents()); + Collections.reverse(immutableComponents); int mergableImmutableComponentCount = getMergableImmutableComponentCount(immutableComponents); // [case 1] @@ -177,23 +188,19 @@ /** * This method returns the number of mergable components among the given list - * of immutable components that are ordered from the latest component to order ones. A caller + * of immutable components that are ordered from the oldest component to newer ones. A caller * need to make sure the order in the list. * * @param immutableComponents * @return the number of mergable component */ private int getMergableImmutableComponentCount(List<ILSMDiskComponent> immutableComponents) { - int count = 0; - for (ILSMComponent c : immutableComponents) { - long componentSize = ((ILSMDiskComponent) c).getComponentSize(); - //stop when the first non-mergable component is found. - if (c.getState() != ComponentState.READABLE_UNWRITABLE || componentSize > maxMergableComponentSize) { - break; - } - ++count; + Pair<Integer, Integer> mergableIndexes = getMergableComponentsIndex(immutableComponents); + if (mergableIndexes != null) { + return mergableIndexes.getRight() - mergableIndexes.getLeft() + 1; + } else { + return 0; } - return count; } /** @@ -220,41 +227,71 @@ * @throws IndexException */ private boolean scheduleMerge(final ILSMIndex index) throws HyracksDataException { - // 1. Look at the candidate components for merging in oldest-first order. If one exists, identify the prefix of the sequence of - // all such components for which the sum of their sizes exceeds MaxMrgCompSz. Schedule a merge of those components into a new component. - // 2. If a merge from 1 doesn't happen, see if the set of candidate components for merging exceeds MaxTolCompCnt. If so, schedule - // a merge all of the current candidates into a new single component. List<ILSMDiskComponent> immutableComponents = new ArrayList<>(index.getImmutableComponents()); // Reverse the components order so that we look at components from oldest to newest. Collections.reverse(immutableComponents); - long totalSize = 0; - int startIndex = -1; + Pair<Integer, Integer> mergeableIndexes = getMergableComponentsIndex(immutableComponents); + if (mergeableIndexes != null) { + scheduleMerge(index, immutableComponents, mergeableIndexes.getLeft(), mergeableIndexes.getRight()); + return true; + } else { + return false; + } + } + + private void scheduleMerge(ILSMIndex index, List<ILSMDiskComponent> immutableComponents, int startIndex, + int endIndex) throws HyracksDataException { + List<ILSMDiskComponent> mergableComponents = new ArrayList<>(); + for (int i = startIndex; i <= endIndex; i++) { + mergableComponents.add(immutableComponents.get(i)); + } + // Reverse the components order back to its original order + Collections.reverse(mergableComponents); + ILSMIndexAccessor accessor = + index.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); + accessor.scheduleMerge(index.getIOOperationCallback(), mergableComponents); + } + + private Pair<Integer, Integer> getMergableComponentsIndex(List<ILSMDiskComponent> immutableComponents) { + // 1. Look at the candidate components for merging in oldest-first order. If one exists, identify the + // prefix of the sequence of all such components for which the sum of their sizes exceeds MaxMrgCompSz. + // 2. If a merge from 1 doesn't happen, see if the set of candidate components for merging exceeds + // MaxTolCompCnt. + // 3. If we find a sequence from 1 or 2, and the first (oldest) component in the sequence is smaller than + // ratio * total size of the younger components in the sequence, schedule a merge of all sequences. + // Otherwise, go back to step 1 with the next component. for (int i = 0; i < immutableComponents.size(); i++) { - ILSMComponent c = immutableComponents.get(i); - long componentSize = ((ILSMDiskComponent) c).getComponentSize(); - if (componentSize > maxMergableComponentSize) { - startIndex = i; - totalSize = 0; + if (immutableComponents.get(i).getComponentSize() > maxMergableComponentSize + || immutableComponents.get(i).getState() != ComponentState.READABLE_UNWRITABLE) { continue; } - totalSize += componentSize; - boolean isLastComponent = i + 1 == immutableComponents.size() ? true : false; - if (totalSize > maxMergableComponentSize - || (isLastComponent && i - startIndex >= maxToleranceComponentCount)) { - List<ILSMDiskComponent> mergableComponents = new ArrayList<>(); - for (int j = startIndex + 1; j <= i; j++) { - mergableComponents.add(immutableComponents.get(j)); + + long totalSize = 0; + long startComponentSize = 0; + for (int j = i; j < immutableComponents.size(); j++) { + long componentSize = immutableComponents.get(j).getComponentSize(); + if (immutableComponents.get(j).getComponentSize() > maxMergableComponentSize + || immutableComponents.get(i).getState() != ComponentState.READABLE_UNWRITABLE) { + i = j + 1; + break; } - // Reverse the components order back to its original order - Collections.reverse(mergableComponents); - ILSMIndexAccessor accessor = - index.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); - accessor.scheduleMerge(index.getIOOperationCallback(), mergableComponents); - return true; + totalSize += componentSize; + if (startComponentSize == 0) { + startComponentSize = componentSize; + } + + boolean isLastComponent = j + 1 == immutableComponents.size() ? true : false; + + if ((totalSize > maxMergableComponentSize + || (isLastComponent && j - i + 1 >= maxToleranceComponentCount)) + && startComponentSize < maxMergableComponentSizeRatio * (totalSize - startComponentSize)) { + return Pair.of(i, j); + } } } - return false; + + return null; } } diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/pom.xml b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/pom.xml index 9ce4237..06907d7 100644 --- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/pom.xml +++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/pom.xml @@ -88,5 +88,11 @@ <groupId>junit</groupId> <artifactId>junit</artifactId> </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + <version>2.0.2-beta</version> + <scope>test</scope> + </dependency> </dependencies> </project> diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/test/PrefixMergePolicyTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/test/PrefixMergePolicyTest.java new file mode 100644 index 0000000..3616efa --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/test/PrefixMergePolicyTest.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hyracks.storage.am.lsm.common.test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.ComponentState; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicy; +import org.apache.hyracks.storage.am.lsm.common.impls.PrefixMergePolicy; +import org.apache.hyracks.storage.common.IModificationOperationCallback; +import org.apache.hyracks.storage.common.ISearchOperationCallback; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import junit.framework.TestCase; + +public class PrefixMergePolicyTest extends TestCase { + + private final int MAX_COMPONENT_SIZE = 100; + + private final int MAX_COMPONENT_COUNT = 3; + + public void testBasic() { + try { + List<Long> sizes = Arrays.asList(1L, 2L, 3L); + List<Long> resultSizes = new ArrayList<>(); + ILSMIndex index = mockIndex(sizes, resultSizes); + ILSMMergePolicy policy = mockMergePolicy(); + policy.diskComponentAdded(index, false); + assertEquals(3, resultSizes.size()); + assertEquals(Arrays.asList(1L, 2L, 3L), resultSizes); + } catch (HyracksDataException e) { + Assert.fail(e.getMessage()); + } + } + + @Test + public void testSmallComponents() { + try { + List<Long> sizes = Arrays.asList(1L, 2L, 3L, 4L, 5L); + List<Long> resultSizes = new ArrayList<>(); + ILSMIndex index = mockIndex(sizes, resultSizes); + ILSMMergePolicy policy = mockMergePolicy(); + policy.diskComponentAdded(index, false); + assertEquals(5, resultSizes.size()); + assertEquals(Arrays.asList(1L, 2L, 3L, 4L, 5L), resultSizes); + } catch (HyracksDataException e) { + Assert.fail(e.getMessage()); + } + } + + @Test + public void testSkipComponents() { + try { + List<Long> sizes = Arrays.asList(1L, 2L, 3L, 101L, 101L); + List<Long> resultSizes = new ArrayList<>(); + ILSMIndex index = mockIndex(sizes, resultSizes); + ILSMMergePolicy policy = mockMergePolicy(); + policy.diskComponentAdded(index, false); + assertEquals(3, resultSizes.size()); + assertEquals(Arrays.asList(1L, 2L, 3L), resultSizes); + } catch (HyracksDataException e) { + Assert.fail(e.getMessage()); + } + } + + @Test + public void testSkipLargeComponents() { + try { + List<Long> sizes = Arrays.asList(1L, 2L, 3L, 50L); + List<Long> resultSizes = new ArrayList<>(); + ILSMIndex index = mockIndex(sizes, resultSizes); + ILSMMergePolicy policy = mockMergePolicy(); + policy.diskComponentAdded(index, false); + assertEquals(3, resultSizes.size()); + assertEquals(Arrays.asList(1L, 2L, 3L), resultSizes); + } catch (HyracksDataException e) { + Assert.fail(e.getMessage()); + } + } + + @Test + public void testMergeLargeComponents() { + try { + List<Long> sizes = Arrays.asList(3L, 3L, 45L, 50L); + List<Long> resultSizes = new ArrayList<>(); + ILSMIndex index = mockIndex(sizes, resultSizes); + ILSMMergePolicy policy = mockMergePolicy(); + policy.diskComponentAdded(index, false); + assertEquals(4, resultSizes.size()); + assertEquals(Arrays.asList(3L, 3L, 45L, 50L), resultSizes); + } catch (HyracksDataException e) { + Assert.fail(e.getMessage()); + } + } + + private ILSMMergePolicy mockMergePolicy() { + Map<String, String> properties = new HashMap<>(); + properties.put("max-mergable-component-size", String.valueOf(MAX_COMPONENT_SIZE)); + properties.put("max-tolerance-component-count", String.valueOf(MAX_COMPONENT_COUNT)); + ILSMMergePolicy policy = new PrefixMergePolicy(); + policy.configure(properties); + return policy; + } + + private ILSMIndex mockIndex(List<Long> componentSizes, List<Long> mergedSizes) throws HyracksDataException { + List<ILSMDiskComponent> components = new ArrayList<>(); + for (Long size : componentSizes) { + ILSMDiskComponent component = Mockito.mock(ILSMDiskComponent.class); + Mockito.when(component.getComponentSize()).thenReturn(size); + Mockito.when(component.getState()).thenReturn(ComponentState.READABLE_UNWRITABLE); + components.add(component); + } + + ILSMIndex index = Mockito.mock(ILSMIndex.class); + Mockito.when(index.getImmutableComponents()).thenReturn(components); + + ILSMIndexAccessor accessor = Mockito.mock(ILSMIndexAccessor.class); + Mockito.doAnswer(new Answer<Void>() { + + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + List<ILSMDiskComponent> mergedComponents = invocation.getArgumentAt(1, List.class); + mergedComponents.forEach(component -> { + mergedSizes.add(component.getComponentSize()); + }); + return null; + } + }).when(accessor).scheduleMerge(Mockito.any(ILSMIOOperationCallback.class), + Mockito.anyListOf(ILSMDiskComponent.class)); + + Mockito.when(index.createAccessor(Mockito.any(IModificationOperationCallback.class), + Mockito.any(ISearchOperationCallback.class))).thenReturn(accessor); + + return index; + } + +} -- To view, visit https://asterix-gerrit.ics.uci.edu/1818 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I464da3fed38cded0aee7b319a35664eae069a2ba Gerrit-PatchSet: 1 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Luo Chen <[email protected]>
