abdullah alamoudi has submitted this change and it was merged. Change subject: Implemented Disk Components Alignment Based on IDs ......................................................................
Implemented Disk Components Alignment Based on IDs - Added IDs (using LSN) for disk components. When a disk component is flushed, its initial ID is set as the LSN. When components are merged, the result ID is the union of all IDs to be merged. - Changed the correlated merge policy to correlate disk components of the primary and secondaries using IDs. Change-Id: I768ee9ac0a8d3c99c631086093a6b778b2e7588e Reviewed-on: https://asterix-gerrit.ics.uci.edu/1761 Reviewed-by: abdullah alamoudi <[email protected]> Tested-by: Jenkins <[email protected]> BAD: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> --- M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/IndexInfo.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java A asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponent.java A hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponentId.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java A hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMDiskComponentId.java 11 files changed, 750 insertions(+), 59 deletions(-) Approvals: abdullah alamoudi: Looks good to me, approved Jenkins: Verified; No violations found; Verified Objections: Jenkins: Violations found diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java index cd1d95e..d28b991 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; import org.apache.asterix.common.api.IDatasetLifecycleManager; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -31,9 +32,11 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.ComponentState; import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentId; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor; import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicy; +import org.apache.hyracks.storage.am.lsm.common.impls.PrefixMergePolicy; public class CorrelatedPrefixMergePolicy implements ILSMMergePolicy { @@ -57,66 +60,30 @@ // all such components for which the sum of their sizes exceeds MaxMrgCompSz. Schedule a merge of those components into a new component. // 2. If a merge from 1 doesn't happen, see if the set of candidate components for merging exceeds MaxTolCompCnt. If so, schedule // a merge all of the current candidates into a new single component. - List<ILSMDiskComponent> immutableComponents = new ArrayList<>(index.getImmutableComponents()); - // Reverse the components order so that we look at components from oldest to newest. - Collections.reverse(immutableComponents); - for (ILSMDiskComponent c : immutableComponents) { - if (c.getState() != ComponentState.READABLE_UNWRITABLE) { + if (fullMergeIsRequested) { + //full merge request is handled by each index separately, since it is possible that + //when a primary index wants to send full merge requests for all secondaries, + //one secondary index is being merged and the request cannot be scheduled + List<ILSMDiskComponent> immutableComponents = new ArrayList<>(index.getImmutableComponents()); + if (!areComponentsReadableUnwritableState(immutableComponents)) { return; } - } - if (fullMergeIsRequested) { + ILSMIndexAccessor accessor = index.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); accessor.scheduleFullMerge(index.getIOOperationCallback()); return; } + if (!index.isPrimaryIndex()) { return; } - long totalSize = 0; - int startIndex = -1; - int minNumComponents = Integer.MAX_VALUE; - - Set<ILSMIndex> indexes = datasetLifecycleManager.getDatasetInfo(datasetId).getDatasetIndexes(); - for (ILSMIndex lsmIndex : indexes) { - minNumComponents = Math.min(minNumComponents, lsmIndex.getImmutableComponents().size()); + List<ILSMDiskComponent> immutableComponents = new ArrayList<>(index.getImmutableComponents()); + if (!areComponentsReadableUnwritableState(immutableComponents)) { + return; } - - for (int i = 0; i < minNumComponents; i++) { - ILSMComponent c = immutableComponents.get(i); - long componentSize = ((ILSMDiskComponent) c).getComponentSize(); - if (componentSize > maxMergableComponentSize) { - startIndex = i; - totalSize = 0; - continue; - } - totalSize += componentSize; - boolean isLastComponent = i + 1 == minNumComponents ? true : false; - if (totalSize > maxMergableComponentSize - || (isLastComponent && i - startIndex >= maxToleranceComponentCount)) { - - for (ILSMIndex lsmIndex : indexes) { - List<ILSMDiskComponent> reversedImmutableComponents = - new ArrayList<>(lsmIndex.getImmutableComponents()); - // Reverse the components order so that we look at components from oldest to newest. - Collections.reverse(reversedImmutableComponents); - - List<ILSMDiskComponent> mergableComponents = new ArrayList<>(); - for (int j = startIndex + 1; j <= i; j++) { - mergableComponents.add(reversedImmutableComponents.get(j)); - } - // Reverse the components order back to its original order - Collections.reverse(mergableComponents); - - ILSMIndexAccessor accessor = - lsmIndex.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); - accessor.scheduleMerge(lsmIndex.getIOOperationCallback(), mergableComponents); - } - break; - } - } + scheduleMerge(index); } @Override @@ -127,9 +94,215 @@ Integer.parseInt(properties.get(CorrelatedPrefixMergePolicyFactory.KEY_MAX_COMPONENT_COUNT)); } + /** + * Adopts the similar logic to decide merge lagging based on {@link PrefixMergePolicy} + * + * @throws HyracksDataException + */ @Override - public boolean isMergeLagging(ILSMIndex index) { - //TODO implement properly according to the merge policy + public boolean isMergeLagging(ILSMIndex index) throws HyracksDataException { + /** + * case 1. + * if mergableImmutableCommponentCount < threshold, + * merge operation is not lagged ==> return false. + * case 2. + * if a) mergableImmutableCommponentCount >= threshold && b) there is an ongoing merge, + * merge operation is lagged. ==> return true. + * case 3. *SPECIAL CASE* + * if a) mergableImmutableCommponentCount >= threshold && b) there is *NO* ongoing merge, + * merge operation is lagged. ==> *schedule a merge operation* and then return true. + * This is a special case that requires to schedule a merge operation. + * Otherwise, all flush operations will be hung. + * This case can happen in a following situation: + * The system may crash when + * condition 1) the mergableImmutableCommponentCount >= threshold and + * condition 2) merge operation is going on. + * After the system is recovered, still condition 1) is true. + * If there are flush operations in the same dataset partition after the recovery, + * all these flush operations may not proceed since there is no ongoing merge and + * there will be no new merge either in this situation. + * Note for case 3, we only let the primary index to schedule merge operations on behalf + * of all indexes. + */ + + List<ILSMDiskComponent> immutableComponents = index.getImmutableComponents(); + int mergableImmutableComponentCount = getMergableImmutableComponentCount(immutableComponents); + + // [case 1] + if (mergableImmutableComponentCount < maxToleranceComponentCount) { + return false; + } + + boolean isMergeOngoing = isMergeOngoing(immutableComponents); + + if (isMergeOngoing) { + // [case 2] + return true; + } + + if (index.isPrimaryIndex()) { + // [case 3] + // make sure that all components are of READABLE_UNWRITABLE state. + if (!areComponentsReadableUnwritableState(immutableComponents)) { + throw new IllegalStateException(); + } + // schedule a merge operation + boolean isMergeTriggered = scheduleMerge(index); + if (!isMergeTriggered) { + throw new IllegalStateException(); + } + return true; + } else { + //[case 3] + //if the index is secondary then ignore the merge request (since the merge should be + //triggered by the primary) and here we simply treat it as not lagged. + return false; + } + } + + private boolean scheduleMerge(ILSMIndex index) throws HyracksDataException { + List<ILSMDiskComponent> immutableComponents = new ArrayList<>(index.getImmutableComponents()); + Collections.reverse(immutableComponents); + + long totalSize = 0; + int startIndex = -1; + + int numComponents = immutableComponents.size(); + + for (int i = 0; i < numComponents; i++) { + ILSMComponent c = immutableComponents.get(i); + long componentSize = ((ILSMDiskComponent) c).getComponentSize(); + if (componentSize > maxMergableComponentSize || ((ILSMDiskComponent) c).getComponentId().notFound()) { + startIndex = i; + totalSize = 0; + continue; + } + totalSize += componentSize; + boolean isLastComponent = i + 1 == numComponents ? true : false; + if (totalSize > maxMergableComponentSize + || (isLastComponent && i - startIndex >= maxToleranceComponentCount)) { + //merge disk components from startIndex+1 to i + long minID = Long.MAX_VALUE; + long maxID = Long.MIN_VALUE; + for (int j = startIndex + 1; j <= i; j++) { + ILSMDiskComponentId id = immutableComponents.get(j).getComponentId(); + if (minID > id.getMinId()) { + minID = id.getMinId(); + } + if (maxID < id.getMaxId()) { + maxID = id.getMaxId(); + } + } + Set<IndexInfo> indexInfos = datasetLifecycleManager.getDatasetInfo(datasetId).getDatsetIndexInfos(); + int partition = getIndexPartition(index, indexInfos); + triggerScheduledMerge(minID, maxID, indexInfos.stream().filter(info -> info.getPartition() == partition) + .collect(Collectors.toSet())); + return true; + } + } return false; } + + /** + * Submit merge requests for all disk components within [minID, maxID] + * of all indexes of a given dataset in the given partition + * + * @param minID + * @param maxID + * @param partition + * @param indexInfos + * @throws HyracksDataException + */ + private void triggerScheduledMerge(long minID, long maxID, Set<IndexInfo> indexInfos) throws HyracksDataException { + for (IndexInfo info : indexInfos) { + ILSMIndex lsmIndex = info.getIndex(); + + List<ILSMDiskComponent> immutableComponents = new ArrayList<>(lsmIndex.getImmutableComponents()); + if (isMergeOngoing(immutableComponents)) { + continue; + } + List<ILSMDiskComponent> mergableComponents = new ArrayList<>(); + for (ILSMDiskComponent component : immutableComponents) { + ILSMDiskComponentId id = component.getComponentId(); + if (!id.notFound()) { + if (id.getMinId() >= minID && id.getMaxId() <= maxID) { + mergableComponents.add(component); + } + if (id.getMaxId() < minID) { + //disk components are ordered from latest (with largest IDs) to oldest (with smallest IDs) + //if the component.maxID < minID, we can safely skip the rest disk components in the list + break; + } + } + } + ILSMIndexAccessor accessor = + lsmIndex.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); + accessor.scheduleMerge(lsmIndex.getIOOperationCallback(), mergableComponents); + } + } + + /** + * This method returns the number of mergable components among the given list + * of immutable components that are ordered from the latest component to order ones. A caller + * need to make sure the order in the list. + * + * @param immutableComponents + * @return the number of mergable component + * @throws HyracksDataException + */ + private int getMergableImmutableComponentCount(List<ILSMDiskComponent> immutableComponents) + throws HyracksDataException { + int count = 0; + for (ILSMComponent c : immutableComponents) { + long componentSize = ((ILSMDiskComponent) c).getComponentSize(); + //stop when the first non-mergable component is found. + if (c.getState() != ComponentState.READABLE_UNWRITABLE || componentSize > maxMergableComponentSize + || ((ILSMDiskComponent) c).getComponentId().notFound()) { + break; + } + ++count; + } + return count; + } + + /** + * This method returns whether there is an ongoing merge operation or not by checking + * each component state of given components. + * + * @param immutableComponents + * @return true if there is an ongoing merge operation, false otherwise. + */ + private boolean isMergeOngoing(List<ILSMDiskComponent> immutableComponents) { + int size = immutableComponents.size(); + for (int i = 0; i < size; i++) { + if (immutableComponents.get(i).getState() == ComponentState.READABLE_MERGING) { + return true; + } + } + return false; + } + + /** + * checks whether all given components are of READABLE_UNWRITABLE state + * + * @param immutableComponents + * @return true if all components are of READABLE_UNWRITABLE state, false otherwise. + */ + private boolean areComponentsReadableUnwritableState(List<ILSMDiskComponent> immutableComponents) { + for (ILSMComponent c : immutableComponents) { + if (c.getState() != ComponentState.READABLE_UNWRITABLE) { + return false; + } + } + return true; + } + + private int getIndexPartition(ILSMIndex index, Set<IndexInfo> indexInfos) { + for (IndexInfo info : indexInfos) { + if (info.getIndex() == index) { + return info.getPartition(); + } + } + return -1; + } } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java index 6a2cc56..71d4a96 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java @@ -76,6 +76,17 @@ return datasetIndexes; } + public synchronized Set<IndexInfo> getDatsetIndexInfos() { + Set<IndexInfo> infos = new HashSet<>(); + for (IndexInfo iInfo : getIndexes().values()) { + if (iInfo.isOpen()) { + infos.add(iInfo); + } + } + + return infos; + } + @Override public int compareTo(DatasetInfo i) { // sort by (isOpen, referenceCount, lastAccess) ascending, where true < false diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java index 9529366..51a535a 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java @@ -93,12 +93,12 @@ public synchronized void register(String resourcePath, IIndex index) throws HyracksDataException { validateDatasetLifecycleManagerState(); int did = getDIDfromResourcePath(resourcePath); - long resourceID = getResourceIDfromResourcePath(resourcePath); + LocalResource resource = resourceRepository.get(resourcePath); DatasetResource datasetResource = datasets.get(did); if (datasetResource == null) { datasetResource = getDatasetLifecycle(did); } - datasetResource.register(resourceID, index); + datasetResource.register(resource, index); } public int getDIDfromResourcePath(String resourcePath) throws HyracksDataException { diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java index a880ce2..f2f3b93 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java @@ -20,9 +20,11 @@ import java.util.Map; +import org.apache.asterix.common.dataflow.DatasetLocalResource; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; import org.apache.hyracks.storage.common.IIndex; +import org.apache.hyracks.storage.common.LocalResource; /** * A dataset can be in one of two states { EVICTED , LOADED }. @@ -41,8 +43,7 @@ private final PrimaryIndexOperationTracker datasetPrimaryOpTracker; private final DatasetVirtualBufferCaches datasetVirtualBufferCaches; - public DatasetResource(DatasetInfo datasetInfo, - PrimaryIndexOperationTracker datasetPrimaryOpTracker, + public DatasetResource(DatasetInfo datasetInfo, PrimaryIndexOperationTracker datasetPrimaryOpTracker, DatasetVirtualBufferCaches datasetVirtualBufferCaches) { this.datasetInfo = datasetInfo; this.datasetPrimaryOpTracker = datasetPrimaryOpTracker; @@ -86,7 +87,8 @@ return (iInfo == null) ? null : iInfo.getIndex(); } - public void register(long resourceID, IIndex index) throws HyracksDataException { + public void register(LocalResource resource, IIndex index) throws HyracksDataException { + long resourceID = resource.getId(); if (!datasetInfo.isRegistered()) { synchronized (datasetInfo) { if (!datasetInfo.isRegistered()) { @@ -102,8 +104,8 @@ if (index == null) { throw new HyracksDataException("Attempt to register a null index"); } - datasetInfo.getIndexes().put(resourceID, - new IndexInfo((ILSMIndex) index, datasetInfo.getDatasetID(), resourceID)); + datasetInfo.getIndexes().put(resourceID, new IndexInfo((ILSMIndex) index, datasetInfo.getDatasetID(), + resourceID, ((DatasetLocalResource) resource.getResource()).getPartition())); } public DatasetInfo getDatasetInfo() { diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/IndexInfo.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/IndexInfo.java index 34ccce0..9eb5b6c 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/IndexInfo.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/IndexInfo.java @@ -22,13 +22,15 @@ public class IndexInfo extends Info { private final ILSMIndex index; - private final long resourceId; private final int datasetId; + private final long resourceId; + private final int partition; - public IndexInfo(ILSMIndex index, int datasetId, long resourceId) { + public IndexInfo(ILSMIndex index, int datasetId, long resourceId, int partition) { this.index = index; this.datasetId = datasetId; this.resourceId = resourceId; + this.partition = partition; } public ILSMIndex getIndex() { @@ -39,6 +41,10 @@ return resourceId; } + public int getPartition() { + return partition; + } + public int getDatasetId() { return datasetId; } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java index ec7df85..f357aea 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java @@ -20,6 +20,8 @@ package org.apache.asterix.common.ioopcallbacks; import java.util.List; +import java.util.logging.Level; +import java.util.logging.Logger; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.data.std.primitive.LongPointable; @@ -27,13 +29,17 @@ import org.apache.hyracks.storage.am.common.freepage.MutableArrayValueReference; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent; import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentId; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType; import org.apache.hyracks.storage.am.lsm.common.impls.DiskComponentMetadata; +import org.apache.hyracks.storage.am.lsm.common.impls.LSMDiskComponentId; import org.apache.hyracks.storage.am.lsm.common.utils.ComponentMetadataUtil; // A single LSMIOOperationCallback per LSM index used to perform actions around Flush and Merge operations public abstract class AbstractLSMIOOperationCallback implements ILSMIOOperationCallback { + private static final Logger logger = Logger.getLogger(AbstractLSMIOOperationCallback.class.getName()); + public static final MutableArrayValueReference LSN_KEY = new MutableArrayValueReference("LSN".getBytes()); public static final long INVALID = -1L; @@ -106,6 +112,42 @@ return pointable.getLength() == 0 ? INVALID : pointable.longValue(); } + private ILSMDiskComponentId getComponentId(List<ILSMComponent> oldComponents) throws HyracksDataException { + if (oldComponents == null) { + //if oldComponents == null, then getComponentLSN would treat it as a flush operation, + //and return the LSN for the flushed component + long id = getComponentLSN(null); + if (id == 0) { + logger.log(Level.WARNING, "Flushing a memory component without setting the LSN"); + id = ILSMDiskComponentId.NOT_FOUND; + } + return new LSMDiskComponentId(id, id); + } else { + long minId = Long.MAX_VALUE; + long maxId = Long.MIN_VALUE; + for (ILSMComponent oldComponent : oldComponents) { + ILSMDiskComponentId oldComponentId = ((ILSMDiskComponent) oldComponent).getComponentId(); + if (oldComponentId.getMinId() < minId) { + minId = oldComponentId.getMinId(); + } + if (oldComponentId.getMaxId() > maxId) { + maxId = oldComponentId.getMaxId(); + } + } + return new LSMDiskComponentId(minId, maxId); + } + } + + private void putComponentIdIntoMetadata(ILSMDiskComponent component, List<ILSMComponent> oldComponents) + throws HyracksDataException { + DiskComponentMetadata metadata = component.getMetadata(); + ILSMDiskComponentId componentId = getComponentId(oldComponents); + metadata.put(ILSMDiskComponentId.COMPONENT_ID_MIN_KEY, + LongPointable.FACTORY.createPointable(componentId.getMinId())); + metadata.put(ILSMDiskComponentId.COMPONENT_ID_MAX_KEY, + LongPointable.FACTORY.createPointable(componentId.getMaxId())); + } + public synchronized void updateLastLSN(long lastLSN) { if (!flushRequested[writeIndex]) { //if the memory component pointed by writeIndex is being flushed, we should ignore this update call @@ -144,6 +186,7 @@ //TODO: Copying Filters and all content of the metadata pages for flush operation should be done here if (newComponent != null) { putLSNIntoMetadata(newComponent, oldComponents); + putComponentIdIntoMetadata(newComponent, oldComponents); if (opType == LSMOperationType.MERGE) { LongPointable markerLsn = LongPointable.FACTORY .createPointable(ComponentMetadataUtil.getLong(oldComponents.get(0).getMetadata(), diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java new file mode 100644 index 0000000..e36f4a8 --- /dev/null +++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java @@ -0,0 +1,312 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.asterix.test.context; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.asterix.common.api.IDatasetLifecycleManager; +import org.apache.asterix.common.context.CorrelatedPrefixMergePolicy; +import org.apache.asterix.common.context.CorrelatedPrefixMergePolicyFactory; +import org.apache.asterix.common.context.DatasetInfo; +import org.apache.asterix.common.context.IndexInfo; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.ComponentState; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentId; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicy; +import org.apache.hyracks.storage.am.lsm.common.impls.LSMDiskComponentId; +import org.apache.hyracks.storage.common.IModificationOperationCallback; +import org.apache.hyracks.storage.common.ISearchOperationCallback; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import junit.framework.TestCase; + +public class CorrelatedPrefixMergePolicyTest extends TestCase { + + private final long DEFAULT_COMPONENT_SIZE = 1L; + + private final int MAX_COMPONENT_SIZE = 3; + + private final int MAX_COMPONENT_COUNT = 3; + + private final int DATASET_ID = 1; + + @Test + public void testBasic() { + try { + List<ILSMDiskComponentId> componentIDs = + Arrays.asList(new LSMDiskComponentId(5, 5), new LSMDiskComponentId(4, 4), + new LSMDiskComponentId(3, 3), new LSMDiskComponentId(2, 2), new LSMDiskComponentId(1, 1)); + + List<ILSMDiskComponentId> resultPrimaryIDs = new ArrayList<>(); + IndexInfo primary = mockIndex(true, componentIDs, resultPrimaryIDs, 0); + + List<ILSMDiskComponentId> resultSecondaryIDs = new ArrayList<>(); + IndexInfo secondary = mockIndex(false, componentIDs, resultSecondaryIDs, 0); + + ILSMMergePolicy policy = mockMergePolicy(primary, secondary); + policy.diskComponentAdded(secondary.getIndex(), false); + Assert.assertTrue(resultPrimaryIDs.isEmpty()); + Assert.assertTrue(resultSecondaryIDs.isEmpty()); + + policy.diskComponentAdded(primary.getIndex(), false); + + Assert.assertEquals(Arrays.asList(new LSMDiskComponentId(4, 4), new LSMDiskComponentId(3, 3), + new LSMDiskComponentId(2, 2), new LSMDiskComponentId(1, 1)), resultPrimaryIDs); + Assert.assertEquals(Arrays.asList(new LSMDiskComponentId(4, 4), new LSMDiskComponentId(3, 3), + new LSMDiskComponentId(2, 2), new LSMDiskComponentId(1, 1)), resultSecondaryIDs); + + } catch (HyracksDataException e) { + Assert.fail(e.getMessage()); + } + + } + + @Test + public void testIDIntervals() { + try { + List<ILSMDiskComponentId> componentIDs = Arrays.asList(new LSMDiskComponentId(40, 50), + new LSMDiskComponentId(30, 35), new LSMDiskComponentId(25, 29), new LSMDiskComponentId(20, 24), + new LSMDiskComponentId(10, 19)); + + List<ILSMDiskComponentId> resultPrimaryIDs = new ArrayList<>(); + IndexInfo primary = mockIndex(true, componentIDs, resultPrimaryIDs, 0); + + List<ILSMDiskComponentId> resultSecondaryIDs = new ArrayList<>(); + IndexInfo secondary = mockIndex(false, componentIDs, resultSecondaryIDs, 0); + + ILSMMergePolicy policy = mockMergePolicy(primary, secondary); + policy.diskComponentAdded(secondary.getIndex(), false); + Assert.assertTrue(resultPrimaryIDs.isEmpty()); + Assert.assertTrue(resultSecondaryIDs.isEmpty()); + + policy.diskComponentAdded(primary.getIndex(), false); + + Assert.assertEquals(Arrays.asList(new LSMDiskComponentId(30, 35), new LSMDiskComponentId(25, 29), + new LSMDiskComponentId(20, 24), new LSMDiskComponentId(10, 19)), resultPrimaryIDs); + Assert.assertEquals(Arrays.asList(new LSMDiskComponentId(30, 35), new LSMDiskComponentId(25, 29), + new LSMDiskComponentId(20, 24), new LSMDiskComponentId(10, 19)), resultSecondaryIDs); + + } catch (HyracksDataException e) { + Assert.fail(e.getMessage()); + } + } + + @Test + public void testSecondaryMissing() { + try { + List<ILSMDiskComponentId> primaryComponentIDs = Arrays.asList(new LSMDiskComponentId(40, 50), + new LSMDiskComponentId(30, 35), new LSMDiskComponentId(25, 29), new LSMDiskComponentId(20, 24), + new LSMDiskComponentId(10, 19)); + List<ILSMDiskComponentId> resultPrimaryIDs = new ArrayList<>(); + IndexInfo primary = mockIndex(true, primaryComponentIDs, resultPrimaryIDs, 0); + + List<ILSMDiskComponentId> secondaryComponentIDs = Arrays.asList(new LSMDiskComponentId(30, 35), + new LSMDiskComponentId(25, 29), new LSMDiskComponentId(20, 24)); + List<ILSMDiskComponentId> resultSecondaryIDs = new ArrayList<>(); + IndexInfo secondary = mockIndex(false, secondaryComponentIDs, resultSecondaryIDs, 0); + + ILSMMergePolicy policy = mockMergePolicy(primary, secondary); + + policy.diskComponentAdded(secondary.getIndex(), false); + Assert.assertTrue(resultPrimaryIDs.isEmpty()); + Assert.assertTrue(resultSecondaryIDs.isEmpty()); + + policy.diskComponentAdded(primary.getIndex(), false); + Assert.assertEquals(Arrays.asList(new LSMDiskComponentId(30, 35), new LSMDiskComponentId(25, 29), + new LSMDiskComponentId(20, 24), new LSMDiskComponentId(10, 19)), resultPrimaryIDs); + Assert.assertEquals(Arrays.asList(new LSMDiskComponentId(30, 35), new LSMDiskComponentId(25, 29), + new LSMDiskComponentId(20, 24)), resultSecondaryIDs); + + } catch (HyracksDataException e) { + Assert.fail(e.getMessage()); + } + } + + @Test + public void testPrimaryNotFound() { + try { + List<ILSMDiskComponentId> primaryComponentIDs = Arrays.asList(new LSMDiskComponentId(40, 50), + new LSMDiskComponentId(30, 35), new LSMDiskComponentId(25, 29), + new LSMDiskComponentId(ILSMDiskComponentId.NOT_FOUND, ILSMDiskComponentId.NOT_FOUND), + new LSMDiskComponentId(10, 19)); + List<ILSMDiskComponentId> resultPrimaryIDs = new ArrayList<>(); + IndexInfo primary = mockIndex(true, primaryComponentIDs, resultPrimaryIDs, 0); + + List<ILSMDiskComponentId> secondaryComponentIDs = Arrays.asList(new LSMDiskComponentId(30, 35), + new LSMDiskComponentId(25, 29), new LSMDiskComponentId(20, 24)); + List<ILSMDiskComponentId> resultSecondaryIDs = new ArrayList<>(); + IndexInfo secondary = mockIndex(false, secondaryComponentIDs, resultSecondaryIDs, 0); + + ILSMMergePolicy policy = mockMergePolicy(primary, secondary); + + policy.diskComponentAdded(secondary.getIndex(), false); + Assert.assertTrue(resultPrimaryIDs.isEmpty()); + Assert.assertTrue(resultSecondaryIDs.isEmpty()); + + policy.diskComponentAdded(primary.getIndex(), false); + Assert.assertEquals(Arrays.asList(new LSMDiskComponentId(40, 50), new LSMDiskComponentId(30, 35), + new LSMDiskComponentId(25, 29)), resultPrimaryIDs); + Assert.assertEquals(Arrays.asList(new LSMDiskComponentId(30, 35), new LSMDiskComponentId(25, 29)), + resultSecondaryIDs); + } catch (HyracksDataException e) { + Assert.fail(e.getMessage()); + } + } + + @Test + public void testSecondaryNotFound() { + try { + List<ILSMDiskComponentId> primaryComponentIDs = Arrays.asList(new LSMDiskComponentId(40, 50), + new LSMDiskComponentId(30, 35), new LSMDiskComponentId(25, 29), new LSMDiskComponentId(20, 24), + new LSMDiskComponentId(10, 19)); + List<ILSMDiskComponentId> resultPrimaryIDs = new ArrayList<>(); + IndexInfo primary = mockIndex(true, primaryComponentIDs, resultPrimaryIDs, 0); + + List<ILSMDiskComponentId> secondaryComponentIDs = Arrays.asList(new LSMDiskComponentId(30, 35), + new LSMDiskComponentId(ILSMDiskComponentId.NOT_FOUND, ILSMDiskComponentId.NOT_FOUND), + new LSMDiskComponentId(20, 24)); + List<ILSMDiskComponentId> resultSecondaryIDs = new ArrayList<>(); + IndexInfo secondary = mockIndex(false, secondaryComponentIDs, resultSecondaryIDs, 0); + + ILSMMergePolicy policy = mockMergePolicy(primary, secondary); + + policy.diskComponentAdded(secondary.getIndex(), false); + Assert.assertTrue(resultPrimaryIDs.isEmpty()); + Assert.assertTrue(resultSecondaryIDs.isEmpty()); + + policy.diskComponentAdded(primary.getIndex(), false); + Assert.assertEquals(Arrays.asList(new LSMDiskComponentId(30, 35), new LSMDiskComponentId(25, 29), + new LSMDiskComponentId(20, 24), new LSMDiskComponentId(10, 19)), resultPrimaryIDs); + Assert.assertEquals(Arrays.asList(new LSMDiskComponentId(30, 35), new LSMDiskComponentId(20, 24)), + resultSecondaryIDs); + + } catch (HyracksDataException e) { + Assert.fail(e.getMessage()); + } + } + + @Test + public void testMultiPartition() { + try { + List<ILSMDiskComponentId> componentIDs = Arrays.asList(new LSMDiskComponentId(40, 50), + new LSMDiskComponentId(30, 35), new LSMDiskComponentId(25, 29), new LSMDiskComponentId(20, 24), + new LSMDiskComponentId(10, 19)); + + List<ILSMDiskComponentId> resultPrimaryIDs = new ArrayList<>(); + IndexInfo primary = mockIndex(true, componentIDs, resultPrimaryIDs, 0); + + List<ILSMDiskComponentId> resultSecondaryIDs = new ArrayList<>(); + IndexInfo secondary = mockIndex(false, componentIDs, resultSecondaryIDs, 0); + + List<ILSMDiskComponentId> resultSecondaryIDs1 = new ArrayList<>(); + IndexInfo secondary1 = mockIndex(false, componentIDs, resultSecondaryIDs, 1); + + ILSMMergePolicy policy = mockMergePolicy(primary, secondary, secondary1); + policy.diskComponentAdded(secondary.getIndex(), false); + Assert.assertTrue(resultPrimaryIDs.isEmpty()); + Assert.assertTrue(resultSecondaryIDs.isEmpty()); + + policy.diskComponentAdded(primary.getIndex(), false); + + Assert.assertEquals(Arrays.asList(new LSMDiskComponentId(30, 35), new LSMDiskComponentId(25, 29), + new LSMDiskComponentId(20, 24), new LSMDiskComponentId(10, 19)), resultPrimaryIDs); + Assert.assertEquals(Arrays.asList(new LSMDiskComponentId(30, 35), new LSMDiskComponentId(25, 29), + new LSMDiskComponentId(20, 24), new LSMDiskComponentId(10, 19)), resultSecondaryIDs); + Assert.assertTrue(resultSecondaryIDs1.isEmpty()); + } catch (HyracksDataException e) { + Assert.fail(e.getMessage()); + } + } + + private ILSMMergePolicy mockMergePolicy(IndexInfo... indexes) { + Map<String, String> properties = new HashMap<>(); + properties.put(CorrelatedPrefixMergePolicyFactory.KEY_MAX_COMPONENT_COUNT, String.valueOf(MAX_COMPONENT_COUNT)); + properties.put(CorrelatedPrefixMergePolicyFactory.KEY_MAX_COMPONENT_SIZE, String.valueOf(MAX_COMPONENT_SIZE)); + + Set<IndexInfo> indexInfos = new HashSet<>(); + for (IndexInfo info : indexes) { + indexInfos.add(info); + } + + DatasetInfo dsInfo = Mockito.mock(DatasetInfo.class); + Mockito.when(dsInfo.getDatsetIndexInfos()).thenReturn(indexInfos); + + IDatasetLifecycleManager manager = Mockito.mock(IDatasetLifecycleManager.class); + Mockito.when(manager.getDatasetInfo(DATASET_ID)).thenReturn(dsInfo); + + ILSMMergePolicy policy = new CorrelatedPrefixMergePolicy(manager, DATASET_ID); + policy.configure(properties); + return policy; + } + + private IndexInfo mockIndex(boolean isPrimary, List<ILSMDiskComponentId> componentIDs, + List<ILSMDiskComponentId> resultComponentIDs, int partition) throws HyracksDataException { + List<ILSMDiskComponent> components = new ArrayList<>(); + for (ILSMDiskComponentId id : componentIDs) { + ILSMDiskComponent component = Mockito.mock(ILSMDiskComponent.class); + Mockito.when(component.getComponentId()).thenReturn(id); + Mockito.when(component.getComponentSize()).thenReturn(DEFAULT_COMPONENT_SIZE); + Mockito.when(component.getState()).thenReturn(ComponentState.READABLE_UNWRITABLE); + components.add(component); + } + + ILSMIndex index = Mockito.mock(ILSMIndex.class); + Mockito.when(index.getImmutableComponents()).thenReturn(components); + + ILSMIndexAccessor accessor = Mockito.mock(ILSMIndexAccessor.class); + Mockito.doAnswer(new Answer<Void>() { + + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + List<ILSMDiskComponent> mergedComponents = invocation.getArgumentAt(1, List.class); + mergedComponents.forEach(component -> { + try { + resultComponentIDs.add(component.getComponentId()); + } catch (HyracksDataException e) { + e.printStackTrace(); + } + }); + return null; + } + }).when(accessor).scheduleMerge(Mockito.any(ILSMIOOperationCallback.class), + Mockito.anyListOf(ILSMDiskComponent.class)); + + Mockito.when(index.createAccessor(Mockito.any(IModificationOperationCallback.class), + Mockito.any(ISearchOperationCallback.class))).thenReturn(accessor); + Mockito.when(index.isPrimaryIndex()).thenReturn(isPrimary); + + return new IndexInfo(index, DATASET_ID, 0, partition); + } + +} diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponent.java index 335e84e..aed641d 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponent.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponent.java @@ -46,4 +46,12 @@ * @throws HyracksDataException */ void destroy() throws HyracksDataException; + + /** + * Return the component Id of this disk component from its metadata + * @return + * @throws HyracksDataException + */ + ILSMDiskComponentId getComponentId() throws HyracksDataException; + } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponentId.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponentId.java new file mode 100644 index 0000000..5d38ace --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponentId.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.storage.am.lsm.common.api; + +import org.apache.hyracks.storage.am.common.freepage.MutableArrayValueReference; + +/** + * Stores the id of the disk component, which is a interval (minId, maxId). + * When a disk component is formed by the flush operation, its initial minId and maxId are the same, and + * currently are set as the flush LSN. + * When a disk component is formed by the merge operation, its [minId, maxId] is set as the union of + * all ids of merged disk components. + * + * @author luochen + * + */ +public interface ILSMDiskComponentId { + + public static final long NOT_FOUND = -1; + + public static final MutableArrayValueReference COMPONENT_ID_MIN_KEY = + new MutableArrayValueReference("Component_Id_Min".getBytes()); + + public static final MutableArrayValueReference COMPONENT_ID_MAX_KEY = + new MutableArrayValueReference("Component_Id_Max".getBytes()); + + long getMinId(); + + long getMaxId(); + + default boolean notFound() { + return getMinId() == NOT_FOUND || getMaxId() == NOT_FOUND; + } + +} diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java index 508a6cc..9e5a230 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java @@ -22,7 +22,9 @@ import org.apache.hyracks.storage.am.common.api.IMetadataPageManager; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilter; import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentId; import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType; +import org.apache.hyracks.storage.am.lsm.common.utils.ComponentMetadataUtil; public abstract class AbstractLSMDiskComponent extends AbstractLSMComponent implements ILSMDiskComponent { @@ -96,4 +98,14 @@ public DiskComponentMetadata getMetadata() { return metadata; } + + @Override + public ILSMDiskComponentId getComponentId() throws HyracksDataException { + long minID = ComponentMetadataUtil.getLong(metadata, ILSMDiskComponentId.COMPONENT_ID_MIN_KEY, + ILSMDiskComponentId.NOT_FOUND); + long maxID = ComponentMetadataUtil.getLong(metadata, ILSMDiskComponentId.COMPONENT_ID_MAX_KEY, + ILSMDiskComponentId.NOT_FOUND); + //TODO: do we need to throw an exception when ID is not found? + return new LSMDiskComponentId(minID, maxID); + } } \ No newline at end of file diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMDiskComponentId.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMDiskComponentId.java new file mode 100644 index 0000000..f448c84 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMDiskComponentId.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hyracks.storage.am.lsm.common.impls; + +import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentId; + +public class LSMDiskComponentId implements ILSMDiskComponentId { + + private final long minId; + + private final long maxId; + + public LSMDiskComponentId(long minId, long maxId) { + this.minId = minId; + this.maxId = maxId; + } + + @Override + public long getMinId() { + return this.minId; + } + + @Override + public long getMaxId() { + return this.maxId; + } + + @Override + public String toString() { + return "[" + minId + "," + maxId + "]"; + } + + @Override + public int hashCode() { + return 31 * Long.hashCode(minId) + Long.hashCode(maxId); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (!(obj instanceof LSMDiskComponentId)) { + return false; + } + LSMDiskComponentId other = (LSMDiskComponentId) obj; + if (maxId != other.maxId) { + return false; + } + if (minId != other.minId) { + return false; + } + return true; + } + +} -- To view, visit https://asterix-gerrit.ics.uci.edu/1761 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: I768ee9ac0a8d3c99c631086093a6b778b2e7588e Gerrit-PatchSet: 12 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Luo Chen <[email protected]> Gerrit-Reviewer: Ian Maxon <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Jianfeng Jia <[email protected]> Gerrit-Reviewer: Luo Chen <[email protected]> Gerrit-Reviewer: Till Westmann <[email protected]> Gerrit-Reviewer: Yingyi Bu <[email protected]> Gerrit-Reviewer: Yingyi Bu <[email protected]> Gerrit-Reviewer: abdullah alamoudi <[email protected]>
