Luo Chen has submitted this change and it was merged. Change subject: [ASTERIXDB-2103][STO] Too many disk components for CorrelatedPolicy ......................................................................
[ASTERIXDB-2103][STO] Too many disk components for CorrelatedPolicy - user model changes: no - storage format changes: no - interface changes: yes Details: Currently CorrelatedMergePolicy uses component Ids to ensure disk components of primary and secondary indexes are merged together, but without synchronization. However, this results in too many disk components for secondary InvertedIndex. The reason is that secondary index could miss some round of merges, if the merge policy finds out the corresponding secondary components are not available (either being merged or being flushed). Even though flow-control on secondary indexes can guarantee the secondary index would catch up the next time, it is still possible that the primary component is finialized, which leaves the secondary components which miss this round of merge are never merged again. This patch fixes this bug by: - Add the mechanism of depending operations to LSM IO operation. An operation finishes only after all depending operations have finished. - For correlated merge policy, the flush/merge of the primary index depends on all flushes/merges of secondary indexes. This ensures when the correlated policy schedules merge, all related components of all indexes are available to merge. Change-Id: Ib6c06ee23f3bfd16b758802388389c00e29780b1 Reviewed-on: https://asterix-gerrit.ics.uci.edu/2018 Sonar-Qube: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Contrib: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> Reviewed-by: Jianfeng Jia <[email protected]> --- M asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java M asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFlushOperation.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeMergeOperation.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyMergeOperation.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractIoOperation.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexOperationContext.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ExternalIndexHarness.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/FlushOperation.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MergeOperation.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/PrefixMergePolicy.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ThreadCountingTracker.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/TracedIOOperation.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFlushOperation.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexMergeOperation.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/ExternalRTree.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFlushOperation.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeMergeOperation.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java M hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeFileManagerTest.java M hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeFilterMergeTestDriver.java M hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeMergeTestDriver.java M hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeModificationOperationCallbackTest.java M hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeScanDiskComponentsTest.java M hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeUpdateInPlaceScanDiskComponentsTest.java M hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeUpdateInPlaceTest.java M hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtree.java M hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/multithread/LSMBTreeTestWorker.java M hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/test/PrefixMergePolicyTest.java M hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/org/apache/hyracks/storage/am/lsm/invertedindex/LSMInvertedIndexMergeTest.java M hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/org/apache/hyracks/storage/am/lsm/invertedindex/PartitionedLSMInvertedIndexMergeTest.java M hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/org/apache/hyracks/storage/am/lsm/invertedindex/multithread/LSMInvertedIndexTestWorker.java M hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/org/apache/hyracks/storage/am/lsm/rtree/LSMRTreeMergeTestDriver.java M hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/org/apache/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeTestWorker.java M hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/org/apache/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeWithAntiMatterTuplesTestWorker.java 55 files changed, 435 insertions(+), 153 deletions(-) Approvals: Jenkins: Verified; No violations found; ; Verified Jianfeng Jia: Looks good to me, approved Objections: Anon. E. Moose #1000171: diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java index 76bec8c..5e0e072 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java @@ -132,9 +132,9 @@ public void createIndex() throws Exception { List<List<String>> partitioningKeys = new ArrayList<>(); partitioningKeys.add(Collections.singletonList("key")); - dataset = new TestDataset(DATAVERSE_NAME, DATASET_NAME, DATAVERSE_NAME, DATA_TYPE_NAME, - NODE_GROUP_NAME, null, null, new InternalDatasetDetails(null, PartitioningStrategy.HASH, - partitioningKeys, null, null, null, false, null, false), + dataset = new TestDataset(DATAVERSE_NAME, DATASET_NAME, DATAVERSE_NAME, DATA_TYPE_NAME, NODE_GROUP_NAME, null, + null, new InternalDatasetDetails(null, PartitioningStrategy.HASH, partitioningKeys, null, null, null, + false, null, false), null, DatasetType.INTERNAL, DATASET_ID, 0); PrimaryIndexInfo primaryIndexInfo = nc.createPrimaryIndex(dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, new NoMergePolicyFactory(), null, null, storageManager, KEY_INDEXES, KEY_INDICATORS_LIST); @@ -448,7 +448,7 @@ for (int i = 0; i < numMergedComponents; i++) { mergedComponents.add(diskComponents.get(i)); } - mergeAccessor.scheduleMerge(lsmBtree.getIOOperationCallback(), mergedComponents); + mergeAccessor.scheduleMerge(lsmBtree.getIOOperationCallback(), mergedComponents, null); merger.waitUntilCount(1); // now that we enetered, we will rollback Rollerback rollerback = new Rollerback(lsmBtree, new DiskComponentLsnPredicate(lsn)); @@ -635,7 +635,7 @@ for (int i = 0; i < numMergedComponents; i++) { mergedComponents.add(diskComponents.get(i)); } - mergeAccessor.scheduleMerge(lsmBtree.getIOOperationCallback(), mergedComponents); + mergeAccessor.scheduleMerge(lsmBtree.getIOOperationCallback(), mergedComponents, null); merger.waitUntilCount(1); // we will block search lsmBtree.clearSearchCallbacks(); @@ -707,7 +707,7 @@ for (int i = 0; i < numMergedComponents; i++) { mergedComponents.add(diskComponents.get(i)); } - mergeAccessor.scheduleMerge(lsmBtree.getIOOperationCallback(), mergedComponents); + mergeAccessor.scheduleMerge(lsmBtree.getIOOperationCallback(), mergedComponents, null); merger.waitUntilCount(1); // we will block search lsmBtree.clearSearchCallbacks(); @@ -736,7 +736,7 @@ } private class Rollerback { - private Thread task; + private final Thread task; private Exception failure; public Rollerback(TestLsmBtree lsmBtree, Predicate<ILSMComponent> predicate) { @@ -766,7 +766,7 @@ } private class Searcher { - private ExecutorService executor = Executors.newSingleThreadExecutor(); + private final ExecutorService executor = Executors.newSingleThreadExecutor(); private Future<Boolean> task; private volatile boolean entered = false; diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java index a20e660..e18181c 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java @@ -31,6 +31,7 @@ import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentId; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor; import org.apache.hyracks.storage.am.lsm.common.impls.PrefixMergePolicy; @@ -91,14 +92,17 @@ Set<IndexInfo> indexInfos = datasetLifecycleManager.getDatasetInfo(datasetId).getDatsetIndexInfos(); int partition = getIndexPartition(index, indexInfos); - triggerScheduledMerge(minID, maxID, + List<ILSMIOOperation> dependingMerges = scheduleSecondaryIndexes(minID, maxID, indexInfos.stream().filter(info -> info.getPartition() == partition).collect(Collectors.toSet())); + + schedulePrimaryIndex(minID, maxID, index, dependingMerges); + return true; } /** * Submit merge requests for all disk components within [minID, maxID] - * of all indexes of a given dataset in the given partition + * of all of secondary indexes of a given dataset in the given partition * * @param minID * @param maxID @@ -106,17 +110,39 @@ * @param indexInfos * @throws HyracksDataException */ - private void triggerScheduledMerge(long minID, long maxID, Set<IndexInfo> indexInfos) throws HyracksDataException { + private List<ILSMIOOperation> scheduleSecondaryIndexes(long minID, long maxID, Set<IndexInfo> indexInfos) + throws HyracksDataException { + List<ILSMIOOperation> mergeOps = new ArrayList<>(); for (IndexInfo info : indexInfos) { ILSMIndex lsmIndex = info.getIndex(); - - List<ILSMDiskComponent> immutableComponents = new ArrayList<>(lsmIndex.getDiskComponents()); - if (isMergeOngoing(immutableComponents)) { + List<ILSMDiskComponent> diskComponents = lsmIndex.getDiskComponents(); + if (lsmIndex.isPrimaryIndex() || isMergeOngoing(diskComponents)) { continue; } - List<ILSMDiskComponent> mergableComponents = new ArrayList<>(); - for (ILSMDiskComponent component : immutableComponents) { - ILSMDiskComponentId id = component.getComponentId(); + List<ILSMDiskComponent> mergeableComponents = collectMergeableComponents(minID, maxID, diskComponents); + ILSMIndexAccessor accessor = + lsmIndex.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); + mergeOps.add(accessor.scheduleMerge(lsmIndex.getIOOperationCallback(), mergeableComponents, null)); + } + return mergeOps; + } + + private void schedulePrimaryIndex(long minID, long maxID, ILSMIndex primaryIndex, + List<ILSMIOOperation> dependingMerges) throws HyracksDataException { + assert primaryIndex.isPrimaryIndex(); + List<ILSMDiskComponent> diskComponents = primaryIndex.getDiskComponents(); + List<ILSMDiskComponent> mergeableComponents = collectMergeableComponents(minID, maxID, diskComponents); + ILSMIndexAccessor accessor = + primaryIndex.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); + accessor.scheduleMerge(primaryIndex.getIOOperationCallback(), mergeableComponents, dependingMerges); + } + + private List<ILSMDiskComponent> collectMergeableComponents(long minID, long maxID, + List<ILSMDiskComponent> diskComponents) throws HyracksDataException { + List<ILSMDiskComponent> mergableComponents = new ArrayList<>(); + for (ILSMDiskComponent component : diskComponents) { + ILSMDiskComponentId id = component.getComponentId(); + if (!id.notFound()) { if (id.getMinId() >= minID && id.getMaxId() <= maxID) { mergableComponents.add(component); } @@ -126,10 +152,8 @@ break; } } - ILSMIndexAccessor accessor = - lsmIndex.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); - accessor.scheduleMerge(lsmIndex.getIOOperationCallback(), mergableComponents); } + return mergableComponents; } private int getIndexPartition(ILSMIndex index, Set<IndexInfo> indexInfos) { diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java index 71d4a96..0df8dcc 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java @@ -34,6 +34,7 @@ private boolean isRegistered; private boolean memoryAllocated; private boolean durable; + private boolean correlated; public DatasetInfo(int datasetID) { this.indexes = new HashMap<>(); @@ -41,6 +42,7 @@ this.datasetID = datasetID; this.setRegistered(false); this.setMemoryAllocated(false); + this.setCorrelated(false); } @Override @@ -195,4 +197,12 @@ public void setLastAccess(long lastAccess) { this.lastAccess = lastAccess; } + + public void setCorrelated(boolean correlated) { + this.correlated = correlated; + } + + public boolean isCorrelated() { + return correlated; + } } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java index 37bd789..ad9f6a5 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java @@ -223,7 +223,7 @@ if (iInfo.isOpen()) { ILSMIndexAccessor accessor = iInfo.getIndex().createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); - accessor.scheduleFlush(iInfo.getIndex().getIOOperationCallback()); + accessor.scheduleFlush(iInfo.getIndex().getIOOperationCallback(), null); } // Wait for the above flush op. @@ -417,16 +417,22 @@ } if (asyncFlush) { - for (IndexInfo iInfo : dsInfo.getIndexes().values()) { - ILSMIndexAccessor accessor = - iInfo.getIndex().createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); - accessor.scheduleFlush(iInfo.getIndex().getIOOperationCallback()); - } + PrimaryIndexOperationTracker.flushDatasetIndexes(dsInfo.getDatsetIndexInfos(), dsInfo.isCorrelated()); } else { + List<IndexInfo> primaryIndexes = new ArrayList<>(); for (IndexInfo iInfo : dsInfo.getIndexes().values()) { - // TODO: This is not efficient since we flush the indexes sequentially. - // Think of a way to allow submitting the flush requests concurrently. We don't do them concurrently because this - // may lead to a deadlock scenario between the DatasetLifeCycleManager and the PrimaryIndexOperationTracker. + if (iInfo.getIndex().isPrimaryIndex()) { + // primary indexes are flushed later to guarantee the correctness of the correlated merge policy + primaryIndexes.add(iInfo); + } else { + // TODO: This is not efficient since we flush the indexes sequentially. + // Think of a way to allow submitting the flush requests concurrently. + // We don't do them concurrently because this may lead to a deadlock scenario + // between the DatasetLifeCycleManager and the PrimaryIndexOperationTracker. + flushAndWaitForIO(dsInfo, iInfo); + } + } + for (IndexInfo iInfo : primaryIndexes) { flushAndWaitForIO(dsInfo, iInfo); } } @@ -591,4 +597,5 @@ } } } + } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java index f2f3b93..5eb3c02 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java @@ -23,6 +23,7 @@ import org.apache.asterix.common.dataflow.DatasetLocalResource; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; +import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex; import org.apache.hyracks.storage.common.IIndex; import org.apache.hyracks.storage.common.LocalResource; @@ -95,6 +96,10 @@ datasetInfo.setExternal(!index.hasMemoryComponents()); datasetInfo.setRegistered(true); datasetInfo.setDurable(((ILSMIndex) index).isDurable()); + //TODO use a general mechanism to set correlated property when we have more + // correlated merge policies + datasetInfo.setCorrelated( + ((AbstractLSMIndex) index).getMergePolicy() instanceof CorrelatedPrefixMergePolicy); } } } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java index 67b25b6..4eb1c3a 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java @@ -19,6 +19,9 @@ package org.apache.asterix.common.context; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; @@ -31,6 +34,7 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.ComponentState; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor; import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent; @@ -141,17 +145,16 @@ //This method is called sequentially by LogPage.notifyFlushTerminator in the sequence flushes were scheduled. public synchronized void triggerScheduleFlush(LogRecord logRecord) throws HyracksDataException { - for (ILSMIndex lsmIndex : dsInfo.getDatasetIndexes()) { - //get resource - ILSMIndexAccessor accessor = - lsmIndex.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); + Set<IndexInfo> indexInfos = dsInfo.getDatsetIndexInfos(); + for (IndexInfo iInfo : indexInfos) { //update resource lsn AbstractLSMIOOperationCallback ioOpCallback = - (AbstractLSMIOOperationCallback) lsmIndex.getIOOperationCallback(); + (AbstractLSMIOOperationCallback) iInfo.getIndex().getIOOperationCallback(); ioOpCallback.updateLastLSN(logRecord.getLSN()); - //schedule flush after update - accessor.scheduleFlush(lsmIndex.getIOOperationCallback()); } + + flushDatasetIndexes(indexInfos, dsInfo.isCorrelated()); + flushLogCreated = false; } @@ -198,4 +201,63 @@ return flushLogCreated; } + public static void flushDatasetIndexes(Set<IndexInfo> indexes, boolean correlated) throws HyracksDataException { + if (!correlated) { + // if not correlated, we simply schedule flushes of each index independently + for (IndexInfo iInfo : indexes) { + ILSMIndex lsmIndex = iInfo.getIndex(); + //get resource + ILSMIndexAccessor accessor = + lsmIndex.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); + //schedule flush after update + accessor.scheduleFlush(lsmIndex.getIOOperationCallback(), null); + } + } else { + // otherwise, we need to schedule indexes properly s.t. the primary index would depend on + // all secondary indexes in the same partition + + // collect partitions + Set<Integer> partitions = new HashSet<>(); + indexes.forEach(iInfo -> partitions.add(iInfo.getPartition())); + for (Integer partition : partitions) { + flushCorrelatedDatasetIndexes(indexes, partition); + } + + } + } + + private static void flushCorrelatedDatasetIndexes(Set<IndexInfo> indexes, int partition) + throws HyracksDataException { + ILSMIndex primaryIndex = null; + List<ILSMIOOperation> flushOps = new ArrayList<>(); + for (IndexInfo iInfo : indexes) { + if (iInfo.getPartition() != partition) { + continue; + } + ILSMIndex lsmIndex = iInfo.getIndex(); + if (lsmIndex.isPrimaryIndex()) { + primaryIndex = lsmIndex; + } else { + //get resource + ILSMIndexAccessor accessor = + lsmIndex.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); + //schedule flush + ILSMIOOperation flushOp = accessor.scheduleFlush(lsmIndex.getIOOperationCallback(), null); + if (flushOp != null) { + flushOps.add(flushOp); + } + } + } + + if (primaryIndex != null) { + //get resource + ILSMIndexAccessor accessor = + primaryIndex.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); + //schedule flush after update + accessor.scheduleFlush(primaryIndex.getIOOperationCallback(), flushOps); + + } + + } + } diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java index 9f071bb..ee795d5 100644 --- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java +++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java @@ -236,7 +236,7 @@ return null; } }).when(accessor).scheduleMerge(Mockito.any(ILSMIOOperationCallback.class), - Mockito.anyListOf(ILSMDiskComponent.class)); + Mockito.anyListOf(ILSMDiskComponent.class), Mockito.any()); Mockito.when(index.createAccessor(Mockito.any(IModificationOperationCallback.class), Mockito.any(ISearchOperationCallback.class))).thenReturn(accessor); diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java index 3775985..f5c3013 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java @@ -56,6 +56,7 @@ import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences; import org.apache.hyracks.storage.am.lsm.common.impls.LSMTreeIndexAccessor; import org.apache.hyracks.storage.am.lsm.common.impls.LSMTreeIndexAccessor.ICursorFactory; +import org.apache.hyracks.storage.am.lsm.common.impls.MergeOperation; import org.apache.hyracks.storage.am.lsm.common.impls.TreeIndexFactory; import org.apache.hyracks.storage.common.IIndexBulkLoader; import org.apache.hyracks.storage.common.IIndexCursor; @@ -170,7 +171,7 @@ // The only reason to override the following method is that it uses a different context object // in addition, determining whether or not to keep deleted tuples is different here @Override - public void scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) + public ILSMIOOperation scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) throws HyracksDataException { ExternalBTreeOpContext opCtx = createOpContext(NoOpOperationCallback.INSTANCE, -1); opCtx.setOperation(IndexOperation.MERGE); @@ -195,9 +196,11 @@ LSMComponentFileReferences relMergeFileRefs = fileManager.getRelMergeFileReference(firstFile.getFile().getName(), lastFile.getFile().getName()); ILSMIndexAccessor accessor = new LSMTreeIndexAccessor(getLsmHarness(), opCtx, cursorFactory); - ioScheduler.scheduleOperation(new LSMBTreeMergeOperation(accessor, cursor, + MergeOperation mergeOp = new LSMBTreeMergeOperation(accessor, cursor, relMergeFileRefs.getInsertIndexFileReference(), relMergeFileRefs.getBloomFilterFileReference(), - callback, fileManager.getBaseDir().getAbsolutePath())); + callback, fileManager.getBaseDir().getAbsolutePath(), ctx.getDependingOps()); + ioScheduler.scheduleOperation(mergeOp); + return mergeOp; } // This function should only be used when a transaction fail. it doesn't @@ -369,7 +372,7 @@ // Not supported @Override - public void scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) + public ILSMIOOperation scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) throws HyracksDataException { throw new UnsupportedOperationException("flush not supported in LSM-Disk-Only-BTree"); } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java index ff17905..071a4bd 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java @@ -319,7 +319,7 @@ } @Override - public void scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) + public ILSMIOOperation scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) throws HyracksDataException { throw HyracksDataException.create(ErrorCode.FLUSH_NOT_SUPPORTED_IN_EXTERNAL_INDEX); } @@ -342,7 +342,7 @@ } @Override - public void scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) + public ILSMIOOperation scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) throws HyracksDataException { ILSMIndexOperationContext bctx = createOpContext(NoOpOperationCallback.INSTANCE, 0); bctx.setOperation(IndexOperation.MERGE); @@ -363,11 +363,12 @@ keepDeleteTuples = mergingComponents.get(mergingComponents.size() - 1) != secondDiskComponents .get(secondDiskComponents.size() - 1); } - - ioScheduler.scheduleOperation( + ILSMIOOperation mergeOp = new LSMBTreeWithBuddyMergeOperation(accessor, cursor, relMergeFileRefs.getInsertIndexFileReference(), relMergeFileRefs.getDeleteIndexFileReference(), relMergeFileRefs.getBloomFilterFileReference(), - callback, fileManager.getBaseDir().getAbsolutePath(), keepDeleteTuples)); + callback, fileManager.getBaseDir().getAbsolutePath(), keepDeleteTuples, ctx.getDependingOps()); + ioScheduler.scheduleOperation(mergeOp); + return mergeOp; } // This method creates the appropriate opContext for the targeted version diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java index c7d45e1..00a489e 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java @@ -462,7 +462,8 @@ LSMComponentFileReferences componentFileRefs, ILSMIOOperationCallback callback) { ILSMIndexAccessor accessor = createAccessor(opCtx); return new LSMBTreeFlushOperation(accessor, componentFileRefs.getInsertIndexFileReference(), - componentFileRefs.getBloomFilterFileReference(), callback, fileManager.getBaseDir().getAbsolutePath()); + componentFileRefs.getBloomFilterFileReference(), callback, fileManager.getBaseDir().getAbsolutePath(), + opCtx.getDependingOps()); } @Override @@ -611,6 +612,7 @@ } ITreeIndexCursor cursor = new LSMBTreeRangeSearchCursor(opCtx, returnDeletedTuples); return new LSMBTreeMergeOperation(accessor, cursor, mergeFileRefs.getInsertIndexFileReference(), - mergeFileRefs.getBloomFilterFileReference(), callback, fileManager.getBaseDir().getAbsolutePath()); + mergeFileRefs.getBloomFilterFileReference(), callback, fileManager.getBaseDir().getAbsolutePath(), + opCtx.getDependingOps()); } } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFlushOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFlushOperation.java index e3424e5..92b53de 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFlushOperation.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFlushOperation.java @@ -18,7 +18,10 @@ */ package org.apache.hyracks.storage.am.lsm.btree.impls; +import java.util.List; + import org.apache.hyracks.api.io.FileReference; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor; import org.apache.hyracks.storage.am.lsm.common.impls.FlushOperation; @@ -27,8 +30,9 @@ private final FileReference bloomFilterFlushTarget; public LSMBTreeFlushOperation(ILSMIndexAccessor accessor, FileReference flushTarget, - FileReference bloomFilterFlushTarget, ILSMIOOperationCallback callback, String indexIdentifier) { - super(accessor, flushTarget, callback, indexIdentifier); + FileReference bloomFilterFlushTarget, ILSMIOOperationCallback callback, String indexIdentifier, + List<ILSMIOOperation> dependingOps) { + super(accessor, flushTarget, callback, indexIdentifier, dependingOps); this.bloomFilterFlushTarget = bloomFilterFlushTarget; } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeMergeOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeMergeOperation.java index ec96303..40ac7b1 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeMergeOperation.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeMergeOperation.java @@ -19,8 +19,11 @@ package org.apache.hyracks.storage.am.lsm.btree.impls; +import java.util.List; + import org.apache.hyracks.api.io.FileReference; import org.apache.hyracks.storage.am.common.api.ITreeIndexCursor; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor; import org.apache.hyracks.storage.am.lsm.common.impls.MergeOperation; @@ -30,8 +33,9 @@ private final FileReference bloomFilterMergeTarget; public LSMBTreeMergeOperation(ILSMIndexAccessor accessor, ITreeIndexCursor cursor, FileReference target, - FileReference bloomFilterMergeTarget, ILSMIOOperationCallback callback, String indexIdentifier) { - super(accessor, target, callback, indexIdentifier, cursor); + FileReference bloomFilterMergeTarget, ILSMIOOperationCallback callback, String indexIdentifier, + List<ILSMIOOperation> dependingOps) { + super(accessor, target, callback, indexIdentifier, cursor, dependingOps); this.bloomFilterMergeTarget = bloomFilterMergeTarget; } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyMergeOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyMergeOperation.java index f682bde..e0c1512 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyMergeOperation.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyMergeOperation.java @@ -18,8 +18,11 @@ */ package org.apache.hyracks.storage.am.lsm.btree.impls; +import java.util.List; + import org.apache.hyracks.api.io.FileReference; import org.apache.hyracks.storage.am.common.api.ITreeIndexCursor; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor; import org.apache.hyracks.storage.am.lsm.common.impls.MergeOperation; @@ -32,8 +35,8 @@ public LSMBTreeWithBuddyMergeOperation(ILSMIndexAccessor accessor, ITreeIndexCursor cursor, FileReference target, FileReference buddyBtreeMergeTarget, FileReference bloomFilterMergeTarget, ILSMIOOperationCallback callback, - String indexIdentifier, boolean keepDeletedTuples) { - super(accessor, target, callback, indexIdentifier, cursor); + String indexIdentifier, boolean keepDeletedTuples, List<ILSMIOOperation> dependingOps) { + super(accessor, target, callback, indexIdentifier, cursor, dependingOps); this.buddyBtreeMergeTarget = buddyBtreeMergeTarget; this.bloomFilterMergeTarget = bloomFilterMergeTarget; this.keepDeletedTuples = keepDeletedTuples; diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java index 89c8cb9..380b2f2 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java @@ -105,10 +105,12 @@ * * @param ctx * @param callback + * @return The scheduled merge operation, used for the caller to track its status * @throws HyracksDataException * @throws IndexException */ - void scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) throws HyracksDataException; + ILSMIOOperation scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) + throws HyracksDataException; /** * Schedule full merge @@ -135,9 +137,11 @@ * * @param ctx * @param callback + * @return The scheduled flush operation, used for the caller to track its status * @throws HyracksDataException */ - void scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) throws HyracksDataException; + ILSMIOOperation scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) + throws HyracksDataException; /** * Perform a flush diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java index c2ae786..ff1613b 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java @@ -18,6 +18,7 @@ */ package org.apache.hyracks.storage.am.lsm.common.api; +import java.util.List; import java.util.concurrent.Callable; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -63,7 +64,19 @@ FileReference getTarget(); /** + * <<<<<<< HEAD + * * @return the accessor of the operation */ ILSMIndexAccessor getAccessor(); + + /** + * @return whether this operation has finished + */ + boolean isFinished(); + + /** + * @return a list of operations that this operation depends on + */ + List<ILSMIOOperation> getDependingOps(); } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java index addeb27..5f43bcd 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java @@ -67,11 +67,13 @@ public void scanDiskComponents(ILSMIndexOperationContext ctx, IIndexCursor cursor) throws HyracksDataException; - void scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) throws HyracksDataException; + ILSMIOOperation scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) + throws HyracksDataException; ILSMDiskComponent flush(ILSMIOOperation operation) throws HyracksDataException; - void scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) throws HyracksDataException; + ILSMIOOperation scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) + throws HyracksDataException; ILSMDiskComponent merge(ILSMIOOperation operation) throws HyracksDataException; diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java index b8d64af..b303a39 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java @@ -46,9 +46,13 @@ * * @param callback * the IO operation callback + * @param dependingOps + * other operations that this operation depends on + * @return The scheduled flush operation * @throws HyracksDataException */ - void scheduleFlush(ILSMIOOperationCallback callback) throws HyracksDataException; + ILSMIOOperation scheduleFlush(ILSMIOOperationCallback callback, List<ILSMIOOperation> dependingOps) + throws HyracksDataException; /** * Schedule a merge operation @@ -57,11 +61,13 @@ * the merge operation callback * @param components * the components to be merged + * @param dependingOps + * other operations that this operation depends on + * @return The scheduled merge operation * @throws HyracksDataException - * @throws IndexException */ - void scheduleMerge(ILSMIOOperationCallback callback, List<ILSMDiskComponent> components) - throws HyracksDataException; + ILSMIOOperation scheduleMerge(ILSMIOOperationCallback callback, List<ILSMDiskComponent> components, + List<ILSMIOOperation> dependingOps) throws HyracksDataException; /** * Schedule a full merge diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java index 5b0378a..66af93f 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java @@ -56,4 +56,8 @@ PermutingTupleReference getFilterTuple(); MultiComparator getFilterCmp(); + + List<ILSMIOOperation> getDependingOps(); + + void setDependingOps(List<ILSMIOOperation> dependingOps); } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractIoOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractIoOperation.java index aee46f0..1d13c94 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractIoOperation.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractIoOperation.java @@ -18,6 +18,10 @@ */ package org.apache.hyracks.storage.am.lsm.common.impls; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.io.FileReference; import org.apache.hyracks.api.io.IODeviceHandle; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation; @@ -30,13 +34,32 @@ protected final FileReference target; protected final ILSMIOOperationCallback callback; protected final String indexIdentifier; + protected final List<ILSMIOOperation> dependingOps; + + protected AtomicBoolean isFinished = new AtomicBoolean(false); public AbstractIoOperation(ILSMIndexAccessor accessor, FileReference target, ILSMIOOperationCallback callback, - String indexIdentifier) { + String indexIdentifier, List<ILSMIOOperation> dependingOps) { this.accessor = accessor; this.target = target; this.callback = callback; this.indexIdentifier = indexIdentifier; + this.dependingOps = dependingOps; + } + + protected abstract void callInternal() throws HyracksDataException; + + @Override + public Boolean call() throws HyracksDataException { + try { + callInternal(); + } finally { + synchronized (this) { + isFinished.set(true); + notifyAll(); + } + } + return true; } @Override @@ -63,4 +86,14 @@ public String getIndexIdentifier() { return indexIdentifier; } + + @Override + public List<ILSMIOOperation> getDependingOps() { + return dependingOps; + } + + @Override + public boolean isFinished() { + return isFinished.get(); + } } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java index dc64f9b..e7b21cb 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java @@ -86,6 +86,7 @@ protected final int[] treeFields; protected final int[] filterFields; protected final boolean durable; + protected final ILSMMergePolicy mergePolicy; protected boolean isActive; protected final AtomicBoolean[] flushRequests; protected boolean memoryComponentsAllocated = false; @@ -113,6 +114,7 @@ this.inactiveDiskComponents = new LinkedList<>(); this.durable = durable; this.tracer = tracer; + this.mergePolicy = mergePolicy; lsmHarness = new LSMHarness(this, mergePolicy, opTracker, diskBufferCache.isReplicationEnabled(), tracer); isActive = false; diskComponents = new ArrayList<>(); @@ -135,6 +137,7 @@ this.ioScheduler = ioScheduler; this.ioOpCallback = ioOpCallback; this.durable = durable; + this.mergePolicy = mergePolicy; lsmHarness = new ExternalIndexHarness(this, mergePolicy, opTracker, diskBufferCache.isReplicationEnabled()); isActive = false; diskComponents = new LinkedList<>(); @@ -199,7 +202,7 @@ protected void flushMemoryComponent() throws HyracksDataException { BlockingIOOperationCallbackWrapper cb = new BlockingIOOperationCallbackWrapper(ioOpCallback); ILSMIndexAccessor accessor = createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); - accessor.scheduleFlush(cb); + accessor.scheduleFlush(cb, null); try { cb.waitForIO(); } catch (InterruptedException e) { @@ -326,19 +329,21 @@ } @Override - public void scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) + public ILSMIOOperation scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) throws HyracksDataException { LSMComponentFileReferences componentFileRefs = fileManager.getRelFlushFileReference(); AbstractLSMIndexOperationContext opCtx = createOpContext(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); opCtx.setOperation(ctx.getOperation()); opCtx.getComponentHolder().addAll(ctx.getComponentHolder()); + opCtx.setDependingOps(ctx.getDependingOps()); ILSMIOOperation flushOp = createFlushOperation(opCtx, componentFileRefs, callback); ioScheduler.scheduleOperation(TracedIOOperation.wrap(flushOp, tracer)); + return flushOp; } @Override - public void scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) + public ILSMIOOperation scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) throws HyracksDataException { List<ILSMComponent> mergingComponents = ctx.getComponentHolder(); // merge must create a different op ctx @@ -346,11 +351,13 @@ createOpContext(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); opCtx.setOperation(ctx.getOperation()); opCtx.getComponentHolder().addAll(mergingComponents); + opCtx.setDependingOps(ctx.getDependingOps()); ILSMDiskComponent firstComponent = (ILSMDiskComponent) mergingComponents.get(0); ILSMDiskComponent lastComponent = (ILSMDiskComponent) mergingComponents.get(mergingComponents.size() - 1); LSMComponentFileReferences mergeFileRefs = getMergeFileReferences(firstComponent, lastComponent); - ILSMIOOperation mergeOp = createMergeOperation(opCtx, mergeFileRefs, callback); + MergeOperation mergeOp = (MergeOperation) createMergeOperation(opCtx, mergeFileRefs, callback); ioScheduler.scheduleOperation(TracedIOOperation.wrap(mergeOp, tracer)); + return mergeOp; } private void addOperationalMutableComponents(List<ILSMComponent> operationalComponents) { @@ -628,6 +635,10 @@ : doMerge(operation); } + public ILSMMergePolicy getMergePolicy() { + return mergePolicy; + } + public abstract Set<String> getLSMComponentPhysicalFiles(ILSMComponent newComponent); protected abstract void allocateMemoryComponent(ILSMMemoryComponent c) throws HyracksDataException; diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexOperationContext.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexOperationContext.java index 065d465..5fdeafd 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexOperationContext.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexOperationContext.java @@ -27,6 +27,7 @@ import org.apache.hyracks.storage.am.common.tuples.PermutingTupleReference; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent; import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext; import org.apache.hyracks.storage.common.IModificationOperationCallback; import org.apache.hyracks.storage.common.ISearchOperationCallback; @@ -47,6 +48,7 @@ protected IndexOperation op; protected boolean accessingComponents = false; protected ISearchPredicate searchPredicate; + protected final List<ILSMIOOperation> dependingOps; public AbstractLSMIndexOperationContext(int[] treeFields, int[] filterFields, IBinaryComparatorFactory[] filterCmpFactories, ISearchOperationCallback searchCallback, @@ -56,6 +58,7 @@ this.componentHolder = new LinkedList<>(); this.componentsToBeMerged = new LinkedList<>(); this.componentsToBeReplicated = new LinkedList<>(); + this.dependingOps = new LinkedList<>(); if (filterFields != null) { indexTuple = new PermutingTupleReference(treeFields); filterCmp = MultiComparator.create(filterCmpFactories); @@ -153,4 +156,17 @@ public ISearchPredicate getSearchPredicate() { return searchPredicate; } + + @Override + public List<ILSMIOOperation> getDependingOps() { + return dependingOps; + } + + @Override + public void setDependingOps(List<ILSMIOOperation> dependingOps) { + this.dependingOps.clear(); + if (dependingOps != null) { + this.dependingOps.addAll(dependingOps); + } + } } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java index 847b882..7ac9bfb 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java @@ -49,7 +49,7 @@ } else if (immutableComponents.size() >= numComponents) { ILSMIndexAccessor accessor = index.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); - accessor.scheduleMerge(index.getIOOperationCallback(), immutableComponents); + accessor.scheduleMerge(index.getIOOperationCallback(), immutableComponents, null); } } @@ -106,7 +106,7 @@ } ILSMIndexAccessor accessor = index.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); - accessor.scheduleMerge(index.getIOOperationCallback(), immutableComponents); + accessor.scheduleMerge(index.getIOOperationCallback(), immutableComponents, null); return true; } } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ExternalIndexHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ExternalIndexHarness.java index 2f65b18..b93d943 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ExternalIndexHarness.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ExternalIndexHarness.java @@ -203,13 +203,13 @@ } @Override - public void scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) + public ILSMIOOperation scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) throws HyracksDataException { if (!getAndEnterComponents(ctx, LSMOperationType.MERGE, true)) { callback.afterFinalize(LSMOperationType.MERGE, null); - return; + return null; } - lsmIndex.scheduleMerge(ctx, callback); + return lsmIndex.scheduleMerge(ctx, callback); } @Override @@ -297,9 +297,10 @@ } @Override - public void scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) + public ILSMIOOperation scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) throws HyracksDataException { callback.afterFinalize(LSMOperationType.FLUSH, null); + return null; } @Override diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/FlushOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/FlushOperation.java index 7b7f950..750c690 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/FlushOperation.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/FlushOperation.java @@ -18,6 +18,7 @@ */ package org.apache.hyracks.storage.am.lsm.common.impls; +import java.util.List; import java.util.Objects; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -30,14 +31,13 @@ public class FlushOperation extends AbstractIoOperation implements Comparable<ILSMIOOperation> { public FlushOperation(ILSMIndexAccessor accessor, FileReference target, ILSMIOOperationCallback callback, - String indexIdentifier) { - super(accessor, target, callback, indexIdentifier); + String indexIdentifier, List<ILSMIOOperation> dependingOps) { + super(accessor, target, callback, indexIdentifier, dependingOps); } @Override - public Boolean call() throws HyracksDataException { + protected void callInternal() throws HyracksDataException { accessor.flush(this); - return true; } @Override diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java index 1069f8f..bc3a5a1 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java @@ -500,13 +500,13 @@ } @Override - public void scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) + public ILSMIOOperation scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) throws HyracksDataException { if (!getAndEnterComponents(ctx, LSMOperationType.FLUSH, true)) { callback.afterFinalize(LSMOperationType.FLUSH, null); - return; + return null; } - lsmIndex.scheduleFlush(ctx, callback); + return lsmIndex.scheduleFlush(ctx, callback); } @Override @@ -519,6 +519,7 @@ boolean failedOperation = false; try { newComponent = lsmIndex.flush(operation); + waitForDependingOps(operation); operation.getCallback().afterOperation(LSMOperationType.FLUSH, null, newComponent); newComponent.markAsValid(lsmIndex.isDurable()); } catch (Throwable e) { @@ -537,13 +538,13 @@ } @Override - public void scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) + public ILSMIOOperation scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) throws HyracksDataException { if (!getAndEnterComponents(ctx, LSMOperationType.MERGE, true)) { callback.afterFinalize(LSMOperationType.MERGE, null); - return; + return null; } - lsmIndex.scheduleMerge(ctx, callback); + return lsmIndex.scheduleMerge(ctx, callback); } @Override @@ -570,6 +571,7 @@ boolean failedOperation = false; try { newComponent = lsmIndex.merge(operation); + waitForDependingOps(operation); operation.getCallback().afterOperation(LSMOperationType.MERGE, ctx.getComponentHolder(), newComponent); newComponent.markAsValid(lsmIndex.isDurable()); } catch (Throwable e) { @@ -754,6 +756,32 @@ } } + /** + * Wait for depending operations to finish. + * + * @param op + */ + private void waitForDependingOps(ILSMIOOperation op) throws HyracksDataException { + List<ILSMIOOperation> dependingOps = op.getDependingOps(); + if (dependingOps == null) { + return; + } + for (ILSMIOOperation dependingOp : dependingOps) { + if (dependingOp != null && !dependingOp.isFinished()) { + synchronized (dependingOp) { + while (!dependingOp.isFinished()) { + try { + dependingOp.wait(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw HyracksDataException.create(e); + } + } + } + } + } + } + @Override public void deleteComponents(ILSMIndexOperationContext ctx, Predicate<ILSMComponent> predicate) throws HyracksDataException { diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java index c0fd443..f008fde 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java @@ -135,18 +135,21 @@ } @Override - public void scheduleFlush(ILSMIOOperationCallback callback) throws HyracksDataException { + public ILSMIOOperation scheduleFlush(ILSMIOOperationCallback callback, List<ILSMIOOperation> dependingOps) + throws HyracksDataException { ctx.setOperation(IndexOperation.FLUSH); - lsmHarness.scheduleFlush(ctx, callback); + ctx.setDependingOps(dependingOps); + return lsmHarness.scheduleFlush(ctx, callback); } @Override - public void scheduleMerge(ILSMIOOperationCallback callback, List<ILSMDiskComponent> components) - throws HyracksDataException { + public ILSMIOOperation scheduleMerge(ILSMIOOperationCallback callback, List<ILSMDiskComponent> components, + List<ILSMIOOperation> dependingOps) throws HyracksDataException { ctx.setOperation(IndexOperation.MERGE); ctx.getComponentsToBeMerged().clear(); ctx.getComponentsToBeMerged().addAll(components); - lsmHarness.scheduleMerge(ctx, callback); + ctx.setDependingOps(dependingOps); + return lsmHarness.scheduleMerge(ctx, callback); } @Override diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MergeOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MergeOperation.java index c83d534..2210fd0 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MergeOperation.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MergeOperation.java @@ -23,6 +23,7 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.io.FileReference; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor; import org.apache.hyracks.storage.common.IIndexCursor; @@ -31,19 +32,13 @@ protected final IIndexCursor cursor; public MergeOperation(ILSMIndexAccessor accessor, FileReference target, ILSMIOOperationCallback callback, - String indexIdentifier, IIndexCursor cursor) { - super(accessor, target, callback, indexIdentifier); + String indexIdentifier, IIndexCursor cursor, List<ILSMIOOperation> dependingOps) { + super(accessor, target, callback, indexIdentifier, dependingOps); this.cursor = cursor; } public List<ILSMComponent> getMergingComponents() { return accessor.getOpContext().getComponentHolder(); - } - - @Override - public Boolean call() throws HyracksDataException { - accessor.merge(this); - return true; } @Override @@ -54,4 +49,10 @@ public IIndexCursor getCursor() { return cursor; } + + @Override + protected void callInternal() throws HyracksDataException { + accessor.merge(this); + + } } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/PrefixMergePolicy.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/PrefixMergePolicy.java index 7d7266e..f159232 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/PrefixMergePolicy.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/PrefixMergePolicy.java @@ -247,7 +247,7 @@ Collections.reverse(mergableComponents); ILSMIndexAccessor accessor = index.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); - accessor.scheduleMerge(index.getIOOperationCallback(), mergableComponents); + accessor.scheduleMerge(index.getIOOperationCallback(), mergableComponents, null); } /** diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ThreadCountingTracker.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ThreadCountingTracker.java index 85081a1..7cdcc52 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ThreadCountingTracker.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ThreadCountingTracker.java @@ -59,7 +59,7 @@ && index.hasFlushRequestForCurrentMutableComponent()) { ILSMIndexAccessor accessor = index.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); - accessor.scheduleFlush(NoOpIOOperationCallbackFactory.INSTANCE.createIoOpCallback()); + accessor.scheduleFlush(NoOpIOOperationCallbackFactory.INSTANCE.createIoOpCallback(), null); } } } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/TracedIOOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/TracedIOOperation.java index 9cc8022..3e35e51 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/TracedIOOperation.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/TracedIOOperation.java @@ -19,6 +19,7 @@ package org.apache.hyracks.storage.am.lsm.common.impls; +import java.util.List; import java.util.logging.Logger; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -100,6 +101,16 @@ public ILSMIndexAccessor getAccessor() { return ioOp.getAccessor(); } + + @Override + public boolean isFinished() { + return ioOp.isFinished(); + } + + @Override + public List<ILSMIOOperation> getDependingOps() { + return ioOp.getDependingOps(); + } } class ComparableTracedIOOperation extends TracedIOOperation implements Comparable<ILSMIOOperation> { diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java index eb3924c..60da50a 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java @@ -691,17 +691,20 @@ LSMComponentFileReferences componentFileRefs, ILSMIOOperationCallback callback) throws HyracksDataException { return new LSMInvertedIndexFlushOperation(new LSMInvertedIndexAccessor(getLsmHarness(), opCtx), - componentFileRefs.getInsertIndexFileReference(), componentFileRefs.getDeleteIndexFileReference(), - componentFileRefs.getBloomFilterFileReference(), callback, fileManager.getBaseDir().getAbsolutePath()); + componentFileRefs.getInsertIndexFileReference(), + componentFileRefs.getDeleteIndexFileReference(), componentFileRefs.getBloomFilterFileReference(), + callback, fileManager.getBaseDir().getAbsolutePath(), opCtx.getDependingOps()); } @Override protected ILSMIOOperation createMergeOperation(AbstractLSMIndexOperationContext opCtx, - LSMComponentFileReferences mergeFileRefs, ILSMIOOperationCallback callback) throws HyracksDataException { + LSMComponentFileReferences mergeFileRefs, + ILSMIOOperationCallback callback) throws HyracksDataException { ILSMIndexAccessor accessor = new LSMInvertedIndexAccessor(getLsmHarness(), opCtx); IIndexCursor cursor = new LSMInvertedIndexRangeSearchCursor(opCtx); - return new LSMInvertedIndexMergeOperation(accessor, cursor, mergeFileRefs.getInsertIndexFileReference(), - mergeFileRefs.getDeleteIndexFileReference(), mergeFileRefs.getBloomFilterFileReference(), callback, - fileManager.getBaseDir().getAbsolutePath()); + return new LSMInvertedIndexMergeOperation(accessor, cursor, + mergeFileRefs.getInsertIndexFileReference(), mergeFileRefs.getDeleteIndexFileReference(), + mergeFileRefs.getBloomFilterFileReference(), callback, fileManager.getBaseDir().getAbsolutePath(), + opCtx.getDependingOps()); } } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java index 61fc84e..242bc83 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java @@ -85,9 +85,11 @@ } @Override - public void scheduleFlush(ILSMIOOperationCallback callback) throws HyracksDataException { + public ILSMIOOperation scheduleFlush(ILSMIOOperationCallback callback, List<ILSMIOOperation> dependingOps) + throws HyracksDataException { ctx.setOperation(IndexOperation.FLUSH); - lsmHarness.scheduleFlush(ctx, callback); + ctx.setDependingOps(dependingOps); + return lsmHarness.scheduleFlush(ctx, callback); } @Override @@ -96,12 +98,13 @@ } @Override - public void scheduleMerge(ILSMIOOperationCallback callback, List<ILSMDiskComponent> components) - throws HyracksDataException { + public ILSMIOOperation scheduleMerge(ILSMIOOperationCallback callback, List<ILSMDiskComponent> components, + List<ILSMIOOperation> dependingOps) throws HyracksDataException { ctx.setOperation(IndexOperation.MERGE); ctx.getComponentsToBeMerged().clear(); ctx.getComponentsToBeMerged().addAll(components); - lsmHarness.scheduleMerge(ctx, callback); + ctx.setDependingOps(dependingOps); + return lsmHarness.scheduleMerge(ctx, callback); } @Override @@ -116,6 +119,7 @@ @Override public void scheduleFullMerge(ILSMIOOperationCallback callback) throws HyracksDataException { ctx.setOperation(IndexOperation.FULL_MERGE); + ctx.setDependingOps(null); lsmHarness.scheduleFullMerge(ctx, callback); } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFlushOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFlushOperation.java index 2106f6a..30e1cec 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFlushOperation.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFlushOperation.java @@ -19,7 +19,10 @@ package org.apache.hyracks.storage.am.lsm.invertedindex.impls; +import java.util.List; + import org.apache.hyracks.api.io.FileReference; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor; import org.apache.hyracks.storage.am.lsm.common.impls.FlushOperation; @@ -30,8 +33,8 @@ public LSMInvertedIndexFlushOperation(ILSMIndexAccessor accessor, FileReference flushTarget, FileReference deletedKeysBTreeFlushTarget, FileReference bloomFilterFlushTarget, - ILSMIOOperationCallback callback, String indexIdentifier) { - super(accessor, flushTarget, callback, indexIdentifier); + ILSMIOOperationCallback callback, String indexIdentifier, List<ILSMIOOperation> dependingOps) { + super(accessor, flushTarget, callback, indexIdentifier, dependingOps); this.deletedKeysBTreeFlushTarget = deletedKeysBTreeFlushTarget; this.bloomFilterFlushTarget = bloomFilterFlushTarget; } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexMergeOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexMergeOperation.java index 2c1db0f..8361f24 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexMergeOperation.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexMergeOperation.java @@ -19,7 +19,10 @@ package org.apache.hyracks.storage.am.lsm.invertedindex.impls; +import java.util.List; + import org.apache.hyracks.api.io.FileReference; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor; import org.apache.hyracks.storage.am.lsm.common.impls.MergeOperation; @@ -31,8 +34,8 @@ public LSMInvertedIndexMergeOperation(ILSMIndexAccessor accessor, IIndexCursor cursor, FileReference target, FileReference deletedKeysBTreeMergeTarget, FileReference bloomFilterMergeTarget, - ILSMIOOperationCallback callback, String indexIdentifier) { - super(accessor, target, callback, indexIdentifier, cursor); + ILSMIOOperationCallback callback, String indexIdentifier, List<ILSMIOOperation> dependingOps) { + super(accessor, target, callback, indexIdentifier, cursor, dependingOps); this.deletedKeysBTreeMergeTarget = deletedKeysBTreeMergeTarget; this.bloomFilterMergeTarget = bloomFilterMergeTarget; } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/ExternalRTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/ExternalRTree.java index 6595403..110d873 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/ExternalRTree.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/ExternalRTree.java @@ -416,7 +416,7 @@ // Not supported @Override - public void scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) + public ILSMIOOperation scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) throws HyracksDataException { throw new UnsupportedOperationException("flush not supported in LSM-Disk-Only-RTree"); } @@ -623,7 +623,7 @@ // The only change the the schedule merge is the method used to create the // opCtx. first line <- in schedule merge, we-> @Override - public void scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) + public ILSMIOOperation scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) throws HyracksDataException { ILSMIndexOperationContext rctx = createOpContext(NoOpOperationCallback.INSTANCE, -1); rctx.setOperation(IndexOperation.MERGE); @@ -634,10 +634,12 @@ (ILSMDiskComponent) mergingComponents.get(mergingComponents.size() - 1)); ILSMIndexAccessor accessor = new LSMRTreeAccessor(getLsmHarness(), rctx, buddyBTreeFields); // create the merge operation. - LSMRTreeMergeOperation mergeOp = new LSMRTreeMergeOperation(accessor, cursor, - relMergeFileRefs.getInsertIndexFileReference(), relMergeFileRefs.getDeleteIndexFileReference(), - relMergeFileRefs.getBloomFilterFileReference(), callback, fileManager.getBaseDir().getAbsolutePath()); + ILSMIOOperation mergeOp = + new LSMRTreeMergeOperation(accessor, cursor, relMergeFileRefs.getInsertIndexFileReference(), + relMergeFileRefs.getDeleteIndexFileReference(), relMergeFileRefs.getBloomFilterFileReference(), + callback, fileManager.getBaseDir().getAbsolutePath(), ctx.getDependingOps()); ioScheduler.scheduleOperation(mergeOp); + return mergeOp; } @Override diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java index ca0e4e1..a4d6a65 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java @@ -420,7 +420,7 @@ LSMRTreeAccessor accessor = new LSMRTreeAccessor(getLsmHarness(), opCtx, buddyBTreeFields); return new LSMRTreeFlushOperation(accessor, componentFileRefs.getInsertIndexFileReference(), componentFileRefs.getDeleteIndexFileReference(), componentFileRefs.getBloomFilterFileReference(), - callback, fileManager.getBaseDir().getAbsolutePath()); + callback, fileManager.getBaseDir().getAbsolutePath(), opCtx.getDependingOps()); } @Override @@ -430,6 +430,6 @@ ILSMIndexAccessor accessor = new LSMRTreeAccessor(getLsmHarness(), opCtx, buddyBTreeFields); return new LSMRTreeMergeOperation(accessor, cursor, mergeFileRefs.getInsertIndexFileReference(), mergeFileRefs.getDeleteIndexFileReference(), mergeFileRefs.getBloomFilterFileReference(), callback, - fileManager.getBaseDir().getAbsolutePath()); + fileManager.getBaseDir().getAbsolutePath(), opCtx.getDependingOps()); } } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFlushOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFlushOperation.java index 6991c56..a08d854 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFlushOperation.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFlushOperation.java @@ -18,7 +18,10 @@ */ package org.apache.hyracks.storage.am.lsm.rtree.impls; +import java.util.List; + import org.apache.hyracks.api.io.FileReference; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor; import org.apache.hyracks.storage.am.lsm.common.impls.FlushOperation; @@ -29,8 +32,9 @@ private final FileReference bloomFilterFlushTarget; public LSMRTreeFlushOperation(ILSMIndexAccessor accessor, FileReference flushTarget, FileReference btreeFlushTarget, - FileReference bloomFilterFlushTarget, ILSMIOOperationCallback callback, String indexIdentifier) { - super(accessor, flushTarget, callback, indexIdentifier); + FileReference bloomFilterFlushTarget, ILSMIOOperationCallback callback, String indexIdentifier, + List<ILSMIOOperation> dependingOps) { + super(accessor, flushTarget, callback, indexIdentifier, dependingOps); this.btreeFlushTarget = btreeFlushTarget; this.bloomFilterFlushTarget = bloomFilterFlushTarget; } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeMergeOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeMergeOperation.java index 83872cf..a07e57b 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeMergeOperation.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeMergeOperation.java @@ -18,8 +18,11 @@ */ package org.apache.hyracks.storage.am.lsm.rtree.impls; +import java.util.List; + import org.apache.hyracks.api.io.FileReference; import org.apache.hyracks.storage.am.common.api.ITreeIndexCursor; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor; import org.apache.hyracks.storage.am.lsm.common.impls.MergeOperation; @@ -30,8 +33,8 @@ public LSMRTreeMergeOperation(ILSMIndexAccessor accessor, ITreeIndexCursor cursor, FileReference target, FileReference btreeMergeTarget, FileReference bloomFilterMergeTarget, ILSMIOOperationCallback callback, - String indexIdentifier) { - super(accessor, target, callback, indexIdentifier, cursor); + String indexIdentifier, List<ILSMIOOperation> dependingOps) { + super(accessor, target, callback, indexIdentifier, cursor, dependingOps); this.btreeMergeTarget = btreeMergeTarget; this.bloomFilterMergeTarget = bloomFilterMergeTarget; } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java index 1e15455..a18f10c 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java @@ -332,7 +332,7 @@ throws HyracksDataException { ILSMIndexAccessor accessor = new LSMTreeIndexAccessor(getLsmHarness(), opCtx, cursorFactory); return new LSMRTreeFlushOperation(accessor, componentFileRefs.getInsertIndexFileReference(), null, null, - callback, fileManager.getBaseDir().getAbsolutePath()); + callback, fileManager.getBaseDir().getAbsolutePath(), opCtx.getDependingOps()); } @Override @@ -346,6 +346,6 @@ ITreeIndexCursor cursor = new LSMRTreeWithAntiMatterTuplesSearchCursor(opCtx, returnDeletedTuples); ILSMIndexAccessor accessor = new LSMTreeIndexAccessor(getLsmHarness(), opCtx, cursorFactory); return new MergeOperation(accessor, mergeFileRefs.getInsertIndexFileReference(), callback, - fileManager.getBaseDir().getAbsolutePath(), cursor); + fileManager.getBaseDir().getAbsolutePath(), cursor, opCtx.getDependingOps()); } } diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeFileManagerTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeFileManagerTest.java index 5d6d8de..4f7515e 100644 --- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeFileManagerTest.java +++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeFileManagerTest.java @@ -72,7 +72,7 @@ accessor.insert(tuple); // Flush to generate a disk component - accessor.scheduleFlush(NoOpIOOperationCallbackFactory.INSTANCE.createIoOpCallback()); + accessor.scheduleFlush(NoOpIOOperationCallbackFactory.INSTANCE.createIoOpCallback(), null); // Make sure the disk component was generated LSMBTree btree = (LSMBTree) ctx.getIndex(); diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeFilterMergeTestDriver.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeFilterMergeTestDriver.java index c5eb97c..98faf01 100644 --- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeFilterMergeTestDriver.java +++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeFilterMergeTestDriver.java @@ -117,7 +117,7 @@ StubIOOperationCallback stub = new StubIOOperationCallback(); BlockingIOOperationCallbackWrapper waiter = new BlockingIOOperationCallbackWrapper(stub); - accessor.scheduleFlush(waiter); + accessor.scheduleFlush(waiter, null); waiter.waitForIO(); if (minMax != null) { Pair<ITupleReference, ITupleReference> obsMinMax = @@ -146,7 +146,7 @@ } } accessor.scheduleMerge(NoOpIOOperationCallbackFactory.INSTANCE.createIoOpCallback(), - ((LSMBTree) ctx.getIndex()).getDiskComponents()); + ((LSMBTree) ctx.getIndex()).getDiskComponents(), null); flushedComponents = ((LSMBTree) ctx.getIndex()).getDiskComponents(); Pair<ITupleReference, ITupleReference> mergedMinMax = diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeMergeTestDriver.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeMergeTestDriver.java index 7dac1e5..15dbfdb 100644 --- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeMergeTestDriver.java +++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeMergeTestDriver.java @@ -76,7 +76,7 @@ ILSMIndexAccessor accessor = (ILSMIndexAccessor) ctx.getIndexAccessor(); accessor.scheduleMerge(NoOpIOOperationCallbackFactory.INSTANCE.createIoOpCallback(), - ((LSMBTree) ctx.getIndex()).getDiskComponents()); + ((LSMBTree) ctx.getIndex()).getDiskComponents(), null); orderedIndexTestUtils.checkPointSearches(ctx); orderedIndexTestUtils.checkScan(ctx); diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeModificationOperationCallbackTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeModificationOperationCallbackTest.java index b633614..11ecf0b 100644 --- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeModificationOperationCallbackTest.java +++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeModificationOperationCallbackTest.java @@ -81,7 +81,7 @@ } if (j == 1) { - accessor.scheduleFlush(ioOpCallback); + accessor.scheduleFlush(ioOpCallback, null); ioOpCallback.waitForIO(); isFoundNull = true; } else { @@ -94,7 +94,7 @@ } if (j == 1) { - accessor.scheduleFlush(ioOpCallback); + accessor.scheduleFlush(ioOpCallback, null); ioOpCallback.waitForIO(); isFoundNull = true; } else { @@ -106,7 +106,7 @@ accessor.delete(tuple); } - accessor.scheduleFlush(ioOpCallback); + accessor.scheduleFlush(ioOpCallback, null); ioOpCallback.waitForIO(); } } diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeScanDiskComponentsTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeScanDiskComponentsTest.java index acbeaef..f3cf9de 100644 --- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeScanDiskComponentsTest.java +++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeScanDiskComponentsTest.java @@ -19,7 +19,7 @@ package org.apache.hyracks.storage.am.lsm.btree; -import static org.junit.Assert.fail; +import static org.junit.Assert.*; import java.io.ByteArrayInputStream; import java.io.DataInput; @@ -116,17 +116,17 @@ //component 2 contains 1 and 2 upsertTuple(ctx, fieldSerdes, getValue(1, fieldSerdes)); upsertTuple(ctx, fieldSerdes, getValue(2, fieldSerdes)); - accessor.scheduleFlush(NoOpIOOperationCallbackFactory.INSTANCE.createIoOpCallback()); + accessor.scheduleFlush(NoOpIOOperationCallbackFactory.INSTANCE.createIoOpCallback(), null); //component 1 contains 1 and -2 upsertTuple(ctx, fieldSerdes, getValue(1, fieldSerdes)); deleteTuple(ctx, fieldSerdes, getValue(2, fieldSerdes)); - accessor.scheduleFlush(NoOpIOOperationCallbackFactory.INSTANCE.createIoOpCallback()); + accessor.scheduleFlush(NoOpIOOperationCallbackFactory.INSTANCE.createIoOpCallback(), null); //component 0 contains 2 and 3 upsertTuple(ctx, fieldSerdes, getValue(3, fieldSerdes)); upsertTuple(ctx, fieldSerdes, getValue(2, fieldSerdes)); - accessor.scheduleFlush(NoOpIOOperationCallbackFactory.INSTANCE.createIoOpCallback()); + accessor.scheduleFlush(NoOpIOOperationCallbackFactory.INSTANCE.createIoOpCallback(), null); LSMBTree btree = (LSMBTree) ctx.getIndex(); Assert.assertEquals("Check disk components", 3, btree.getDiskComponents().size()); diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeUpdateInPlaceScanDiskComponentsTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeUpdateInPlaceScanDiskComponentsTest.java index fbcbcc2..a7efd6d 100644 --- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeUpdateInPlaceScanDiskComponentsTest.java +++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeUpdateInPlaceScanDiskComponentsTest.java @@ -19,7 +19,7 @@ package org.apache.hyracks.storage.am.lsm.btree; -import static org.junit.Assert.fail; +import static org.junit.Assert.*; import java.io.ByteArrayInputStream; import java.io.DataInput; @@ -352,7 +352,7 @@ op1.performOperation(ctx, AccessMethodTestsConfig.BTREE_NUM_TUPLES_TO_INSERT); op2.performOperation(ctx, AccessMethodTestsConfig.BTREE_NUM_TUPLES_TO_INSERT / AccessMethodTestsConfig.BTREE_NUM_INSERT_ROUNDS); - accessor.scheduleFlush(NoOpIOOperationCallbackFactory.INSTANCE.createIoOpCallback()); + accessor.scheduleFlush(NoOpIOOperationCallbackFactory.INSTANCE.createIoOpCallback(), null); LSMBTree btree = (LSMBTree) ctx.getIndex(); Assert.assertEquals("Check disk components", 1, btree.getDiskComponents().size()); diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeUpdateInPlaceTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeUpdateInPlaceTest.java index e059faa..adcb3d9 100644 --- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeUpdateInPlaceTest.java +++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeUpdateInPlaceTest.java @@ -109,7 +109,7 @@ } if (j == 1) { - lsmAccessor.scheduleFlush(ioOpCallback); + lsmAccessor.scheduleFlush(ioOpCallback, null); ioOpCallback.waitForIO(); isFoundNull = true; isUpdated = false; @@ -124,7 +124,7 @@ } if (j == 1) { - lsmAccessor.scheduleFlush(ioOpCallback); + lsmAccessor.scheduleFlush(ioOpCallback, null); ioOpCallback.waitForIO(); } else { isFoundNull = false; diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtree.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtree.java index 6c1a406..0e16280 100644 --- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtree.java +++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtree.java @@ -103,17 +103,19 @@ } @Override - public void scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) + public ILSMIOOperation scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) throws HyracksDataException { - super.scheduleFlush(ctx, callback); + ILSMIOOperation flushOp = super.scheduleFlush(ctx, callback); numScheduledFlushes++; + return flushOp; } @Override - public void scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) + public ILSMIOOperation scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) throws HyracksDataException { - super.scheduleMerge(ctx, callback); + ILSMIOOperation mergeOp = super.scheduleMerge(ctx, callback); numScheduledMerges++; + return mergeOp; } @Override diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/multithread/LSMBTreeTestWorker.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/multithread/LSMBTreeTestWorker.java index 1667e47..377ce05 100644 --- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/multithread/LSMBTreeTestWorker.java +++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/multithread/LSMBTreeTestWorker.java @@ -119,7 +119,7 @@ case MERGE: accessor.scheduleMerge(NoOpIOOperationCallbackFactory.INSTANCE.createIoOpCallback(), - lsmBTree.getDiskComponents()); + lsmBTree.getDiskComponents(), null); break; default: diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/test/PrefixMergePolicyTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/test/PrefixMergePolicyTest.java index e521b4b..eaa0321 100644 --- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/test/PrefixMergePolicyTest.java +++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/test/PrefixMergePolicyTest.java @@ -221,7 +221,7 @@ return null; } }).when(accessor).scheduleMerge(Mockito.any(ILSMIOOperationCallback.class), - Mockito.anyListOf(ILSMDiskComponent.class)); + Mockito.anyListOf(ILSMDiskComponent.class), Mockito.any()); Mockito.when(index.createAccessor(Mockito.any(IModificationOperationCallback.class), Mockito.any(ISearchOperationCallback.class))).thenReturn(accessor); diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/org/apache/hyracks/storage/am/lsm/invertedindex/LSMInvertedIndexMergeTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/org/apache/hyracks/storage/am/lsm/invertedindex/LSMInvertedIndexMergeTest.java index d093aac..fd744b0 100644 --- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/org/apache/hyracks/storage/am/lsm/invertedindex/LSMInvertedIndexMergeTest.java +++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/org/apache/hyracks/storage/am/lsm/invertedindex/LSMInvertedIndexMergeTest.java @@ -58,7 +58,7 @@ } // Perform merge. invIndexAccessor.scheduleMerge(NoOpIOOperationCallbackFactory.INSTANCE.createIoOpCallback(), - ((LSMInvertedIndex) invIndex).getDiskComponents()); + ((LSMInvertedIndex) invIndex).getDiskComponents(), null); validateAndCheckIndex(testCtx); runTinySearchWorkload(testCtx, tupleGen); } diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/org/apache/hyracks/storage/am/lsm/invertedindex/PartitionedLSMInvertedIndexMergeTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/org/apache/hyracks/storage/am/lsm/invertedindex/PartitionedLSMInvertedIndexMergeTest.java index 3dc7262..cfca8f1 100644 --- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/org/apache/hyracks/storage/am/lsm/invertedindex/PartitionedLSMInvertedIndexMergeTest.java +++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/org/apache/hyracks/storage/am/lsm/invertedindex/PartitionedLSMInvertedIndexMergeTest.java @@ -60,7 +60,7 @@ } // Perform merge. invIndexAccessor.scheduleMerge(NoOpIOOperationCallbackFactory.INSTANCE.createIoOpCallback(), - ((LSMInvertedIndex) invIndex).getDiskComponents()); + ((LSMInvertedIndex) invIndex).getDiskComponents(), null); validateAndCheckIndex(testCtx); runTinySearchWorkload(testCtx, tupleGen); } diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/org/apache/hyracks/storage/am/lsm/invertedindex/multithread/LSMInvertedIndexTestWorker.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/org/apache/hyracks/storage/am/lsm/invertedindex/multithread/LSMInvertedIndexTestWorker.java index 2345698..594e019 100644 --- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/org/apache/hyracks/storage/am/lsm/invertedindex/multithread/LSMInvertedIndexTestWorker.java +++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/org/apache/hyracks/storage/am/lsm/invertedindex/multithread/LSMInvertedIndexTestWorker.java @@ -116,7 +116,7 @@ case MERGE: { accessor.scheduleMerge(NoOpIOOperationCallbackFactory.INSTANCE.createIoOpCallback(), - invIndex.getDiskComponents()); + invIndex.getDiskComponents(), null); break; } diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/org/apache/hyracks/storage/am/lsm/rtree/LSMRTreeMergeTestDriver.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/org/apache/hyracks/storage/am/lsm/rtree/LSMRTreeMergeTestDriver.java index 9209a3e..58c71dd 100644 --- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/org/apache/hyracks/storage/am/lsm/rtree/LSMRTreeMergeTestDriver.java +++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/org/apache/hyracks/storage/am/lsm/rtree/LSMRTreeMergeTestDriver.java @@ -78,7 +78,7 @@ ILSMIndexAccessor accessor = (ILSMIndexAccessor) ctx.getIndexAccessor(); accessor.scheduleMerge(NoOpIOOperationCallbackFactory.INSTANCE.createIoOpCallback(), - ((AbstractLSMRTree) ctx.getIndex()).getDiskComponents()); + ((AbstractLSMRTree) ctx.getIndex()).getDiskComponents(), null); rTreeTestUtils.checkScan(ctx); rTreeTestUtils.checkDiskOrderScan(ctx); diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/org/apache/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeTestWorker.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/org/apache/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeTestWorker.java index fe4870b..4630c28 100644 --- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/org/apache/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeTestWorker.java +++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/org/apache/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeTestWorker.java @@ -79,7 +79,7 @@ case MERGE: accessor.scheduleMerge(NoOpIOOperationCallbackFactory.INSTANCE.createIoOpCallback(), - lsmRTree.getDiskComponents()); + lsmRTree.getDiskComponents(), null); break; default: diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/org/apache/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeWithAntiMatterTuplesTestWorker.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/org/apache/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeWithAntiMatterTuplesTestWorker.java index 2855f2e..bbada89 100644 --- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/org/apache/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeWithAntiMatterTuplesTestWorker.java +++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/org/apache/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeWithAntiMatterTuplesTestWorker.java @@ -68,7 +68,7 @@ case MERGE: accessor.scheduleMerge(NoOpIOOperationCallbackFactory.INSTANCE.createIoOpCallback(), - ((AbstractLSMRTree) lsmRTree).getDiskComponents()); + ((AbstractLSMRTree) lsmRTree).getDiskComponents(), null); break; default: -- To view, visit https://asterix-gerrit.ics.uci.edu/2018 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: Ib6c06ee23f3bfd16b758802388389c00e29780b1 Gerrit-PatchSet: 18 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Luo Chen <[email protected]> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Ian Maxon <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Jianfeng Jia <[email protected]> Gerrit-Reviewer: Luo Chen <[email protected]> Gerrit-Reviewer: Yingyi Bu <[email protected]> Gerrit-Reviewer: abdullah alamoudi <[email protected]>
