Luo Chen has submitted this change and it was merged. Change subject: Revert "[ASTERIXDB-2103][STO] Too many disk components for CorrelatedPolicy" ......................................................................
Revert "[ASTERIXDB-2103][STO] Too many disk components for CorrelatedPolicy" This reverts commit 21ed0f72681a20ccb6a654f9aa4d54b8d0ea9c5c. Change-Id: I670545acd09c678f21be25313353ab306be86202 Reviewed-on: https://asterix-gerrit.ics.uci.edu/2063 Tested-by: Jenkins <[email protected]> Contrib: Jenkins <[email protected]> Reviewed-by: Ian Maxon <[email protected]> Integration-Tests: Jenkins <[email protected]> --- M asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java M asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFlushOperation.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeMergeOperation.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyMergeOperation.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractIoOperation.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexOperationContext.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ExternalIndexHarness.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/FlushOperation.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MergeOperation.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/PrefixMergePolicy.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ThreadCountingTracker.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/TracedIOOperation.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFlushOperation.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexMergeOperation.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/ExternalRTree.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFlushOperation.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeMergeOperation.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java M hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeFileManagerTest.java M hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeFilterMergeTestDriver.java M hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeMergeTestDriver.java M hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeModificationOperationCallbackTest.java M hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeScanDiskComponentsTest.java M hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeUpdateInPlaceScanDiskComponentsTest.java M hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeUpdateInPlaceTest.java M hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtree.java M hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/multithread/LSMBTreeTestWorker.java M hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/test/PrefixMergePolicyTest.java M hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/org/apache/hyracks/storage/am/lsm/invertedindex/LSMInvertedIndexMergeTest.java M hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/org/apache/hyracks/storage/am/lsm/invertedindex/PartitionedLSMInvertedIndexMergeTest.java M hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/org/apache/hyracks/storage/am/lsm/invertedindex/multithread/LSMInvertedIndexTestWorker.java M hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/org/apache/hyracks/storage/am/lsm/rtree/LSMRTreeMergeTestDriver.java M hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/org/apache/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeTestWorker.java M hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/org/apache/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeWithAntiMatterTuplesTestWorker.java 55 files changed, 153 insertions(+), 435 deletions(-) Approvals: Jenkins: Verified; ; Verified Ian Maxon: Looks good to me, approved Objections: Jenkins: Violations found diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java index 5e0e072..76bec8c 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java @@ -132,9 +132,9 @@ public void createIndex() throws Exception { List<List<String>> partitioningKeys = new ArrayList<>(); partitioningKeys.add(Collections.singletonList("key")); - dataset = new TestDataset(DATAVERSE_NAME, DATASET_NAME, DATAVERSE_NAME, DATA_TYPE_NAME, NODE_GROUP_NAME, null, - null, new InternalDatasetDetails(null, PartitioningStrategy.HASH, partitioningKeys, null, null, null, - false, null, false), + dataset = new TestDataset(DATAVERSE_NAME, DATASET_NAME, DATAVERSE_NAME, DATA_TYPE_NAME, + NODE_GROUP_NAME, null, null, new InternalDatasetDetails(null, PartitioningStrategy.HASH, + partitioningKeys, null, null, null, false, null, false), null, DatasetType.INTERNAL, DATASET_ID, 0); PrimaryIndexInfo primaryIndexInfo = nc.createPrimaryIndex(dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, new NoMergePolicyFactory(), null, null, storageManager, KEY_INDEXES, KEY_INDICATORS_LIST); @@ -448,7 +448,7 @@ for (int i = 0; i < numMergedComponents; i++) { mergedComponents.add(diskComponents.get(i)); } - mergeAccessor.scheduleMerge(lsmBtree.getIOOperationCallback(), mergedComponents, null); + mergeAccessor.scheduleMerge(lsmBtree.getIOOperationCallback(), mergedComponents); merger.waitUntilCount(1); // now that we enetered, we will rollback Rollerback rollerback = new Rollerback(lsmBtree, new DiskComponentLsnPredicate(lsn)); @@ -635,7 +635,7 @@ for (int i = 0; i < numMergedComponents; i++) { mergedComponents.add(diskComponents.get(i)); } - mergeAccessor.scheduleMerge(lsmBtree.getIOOperationCallback(), mergedComponents, null); + mergeAccessor.scheduleMerge(lsmBtree.getIOOperationCallback(), mergedComponents); merger.waitUntilCount(1); // we will block search lsmBtree.clearSearchCallbacks(); @@ -707,7 +707,7 @@ for (int i = 0; i < numMergedComponents; i++) { mergedComponents.add(diskComponents.get(i)); } - mergeAccessor.scheduleMerge(lsmBtree.getIOOperationCallback(), mergedComponents, null); + mergeAccessor.scheduleMerge(lsmBtree.getIOOperationCallback(), mergedComponents); merger.waitUntilCount(1); // we will block search lsmBtree.clearSearchCallbacks(); @@ -736,7 +736,7 @@ } private class Rollerback { - private final Thread task; + private Thread task; private Exception failure; public Rollerback(TestLsmBtree lsmBtree, Predicate<ILSMComponent> predicate) { @@ -766,7 +766,7 @@ } private class Searcher { - private final ExecutorService executor = Executors.newSingleThreadExecutor(); + private ExecutorService executor = Executors.newSingleThreadExecutor(); private Future<Boolean> task; private volatile boolean entered = false; diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java index e18181c..a20e660 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java @@ -31,7 +31,6 @@ import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentId; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor; import org.apache.hyracks.storage.am.lsm.common.impls.PrefixMergePolicy; @@ -92,17 +91,14 @@ Set<IndexInfo> indexInfos = datasetLifecycleManager.getDatasetInfo(datasetId).getDatsetIndexInfos(); int partition = getIndexPartition(index, indexInfos); - List<ILSMIOOperation> dependingMerges = scheduleSecondaryIndexes(minID, maxID, + triggerScheduledMerge(minID, maxID, indexInfos.stream().filter(info -> info.getPartition() == partition).collect(Collectors.toSet())); - - schedulePrimaryIndex(minID, maxID, index, dependingMerges); - return true; } /** * Submit merge requests for all disk components within [minID, maxID] - * of all of secondary indexes of a given dataset in the given partition + * of all indexes of a given dataset in the given partition * * @param minID * @param maxID @@ -110,39 +106,17 @@ * @param indexInfos * @throws HyracksDataException */ - private List<ILSMIOOperation> scheduleSecondaryIndexes(long minID, long maxID, Set<IndexInfo> indexInfos) - throws HyracksDataException { - List<ILSMIOOperation> mergeOps = new ArrayList<>(); + private void triggerScheduledMerge(long minID, long maxID, Set<IndexInfo> indexInfos) throws HyracksDataException { for (IndexInfo info : indexInfos) { ILSMIndex lsmIndex = info.getIndex(); - List<ILSMDiskComponent> diskComponents = lsmIndex.getDiskComponents(); - if (lsmIndex.isPrimaryIndex() || isMergeOngoing(diskComponents)) { + + List<ILSMDiskComponent> immutableComponents = new ArrayList<>(lsmIndex.getDiskComponents()); + if (isMergeOngoing(immutableComponents)) { continue; } - List<ILSMDiskComponent> mergeableComponents = collectMergeableComponents(minID, maxID, diskComponents); - ILSMIndexAccessor accessor = - lsmIndex.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); - mergeOps.add(accessor.scheduleMerge(lsmIndex.getIOOperationCallback(), mergeableComponents, null)); - } - return mergeOps; - } - - private void schedulePrimaryIndex(long minID, long maxID, ILSMIndex primaryIndex, - List<ILSMIOOperation> dependingMerges) throws HyracksDataException { - assert primaryIndex.isPrimaryIndex(); - List<ILSMDiskComponent> diskComponents = primaryIndex.getDiskComponents(); - List<ILSMDiskComponent> mergeableComponents = collectMergeableComponents(minID, maxID, diskComponents); - ILSMIndexAccessor accessor = - primaryIndex.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); - accessor.scheduleMerge(primaryIndex.getIOOperationCallback(), mergeableComponents, dependingMerges); - } - - private List<ILSMDiskComponent> collectMergeableComponents(long minID, long maxID, - List<ILSMDiskComponent> diskComponents) throws HyracksDataException { - List<ILSMDiskComponent> mergableComponents = new ArrayList<>(); - for (ILSMDiskComponent component : diskComponents) { - ILSMDiskComponentId id = component.getComponentId(); - if (!id.notFound()) { + List<ILSMDiskComponent> mergableComponents = new ArrayList<>(); + for (ILSMDiskComponent component : immutableComponents) { + ILSMDiskComponentId id = component.getComponentId(); if (id.getMinId() >= minID && id.getMaxId() <= maxID) { mergableComponents.add(component); } @@ -152,8 +126,10 @@ break; } } + ILSMIndexAccessor accessor = + lsmIndex.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); + accessor.scheduleMerge(lsmIndex.getIOOperationCallback(), mergableComponents); } - return mergableComponents; } private int getIndexPartition(ILSMIndex index, Set<IndexInfo> indexInfos) { diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java index 0df8dcc..71d4a96 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java @@ -34,7 +34,6 @@ private boolean isRegistered; private boolean memoryAllocated; private boolean durable; - private boolean correlated; public DatasetInfo(int datasetID) { this.indexes = new HashMap<>(); @@ -42,7 +41,6 @@ this.datasetID = datasetID; this.setRegistered(false); this.setMemoryAllocated(false); - this.setCorrelated(false); } @Override @@ -196,13 +194,5 @@ public void setLastAccess(long lastAccess) { this.lastAccess = lastAccess; - } - - public void setCorrelated(boolean correlated) { - this.correlated = correlated; - } - - public boolean isCorrelated() { - return correlated; } } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java index ad9f6a5..37bd789 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java @@ -223,7 +223,7 @@ if (iInfo.isOpen()) { ILSMIndexAccessor accessor = iInfo.getIndex().createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); - accessor.scheduleFlush(iInfo.getIndex().getIOOperationCallback(), null); + accessor.scheduleFlush(iInfo.getIndex().getIOOperationCallback()); } // Wait for the above flush op. @@ -417,22 +417,16 @@ } if (asyncFlush) { - PrimaryIndexOperationTracker.flushDatasetIndexes(dsInfo.getDatsetIndexInfos(), dsInfo.isCorrelated()); - } else { - List<IndexInfo> primaryIndexes = new ArrayList<>(); for (IndexInfo iInfo : dsInfo.getIndexes().values()) { - if (iInfo.getIndex().isPrimaryIndex()) { - // primary indexes are flushed later to guarantee the correctness of the correlated merge policy - primaryIndexes.add(iInfo); - } else { - // TODO: This is not efficient since we flush the indexes sequentially. - // Think of a way to allow submitting the flush requests concurrently. - // We don't do them concurrently because this may lead to a deadlock scenario - // between the DatasetLifeCycleManager and the PrimaryIndexOperationTracker. - flushAndWaitForIO(dsInfo, iInfo); - } + ILSMIndexAccessor accessor = + iInfo.getIndex().createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); + accessor.scheduleFlush(iInfo.getIndex().getIOOperationCallback()); } - for (IndexInfo iInfo : primaryIndexes) { + } else { + for (IndexInfo iInfo : dsInfo.getIndexes().values()) { + // TODO: This is not efficient since we flush the indexes sequentially. + // Think of a way to allow submitting the flush requests concurrently. We don't do them concurrently because this + // may lead to a deadlock scenario between the DatasetLifeCycleManager and the PrimaryIndexOperationTracker. flushAndWaitForIO(dsInfo, iInfo); } } @@ -597,5 +591,4 @@ } } } - } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java index 5eb3c02..f2f3b93 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java @@ -23,7 +23,6 @@ import org.apache.asterix.common.dataflow.DatasetLocalResource; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; -import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex; import org.apache.hyracks.storage.common.IIndex; import org.apache.hyracks.storage.common.LocalResource; @@ -96,10 +95,6 @@ datasetInfo.setExternal(!index.hasMemoryComponents()); datasetInfo.setRegistered(true); datasetInfo.setDurable(((ILSMIndex) index).isDurable()); - //TODO use a general mechanism to set correlated property when we have more - // correlated merge policies - datasetInfo.setCorrelated( - ((AbstractLSMIndex) index).getMergePolicy() instanceof CorrelatedPrefixMergePolicy); } } } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java index 4eb1c3a..67b25b6 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java @@ -19,9 +19,6 @@ package org.apache.asterix.common.context; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; @@ -34,7 +31,6 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.ComponentState; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor; import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent; @@ -145,16 +141,17 @@ //This method is called sequentially by LogPage.notifyFlushTerminator in the sequence flushes were scheduled. public synchronized void triggerScheduleFlush(LogRecord logRecord) throws HyracksDataException { - Set<IndexInfo> indexInfos = dsInfo.getDatsetIndexInfos(); - for (IndexInfo iInfo : indexInfos) { + for (ILSMIndex lsmIndex : dsInfo.getDatasetIndexes()) { + //get resource + ILSMIndexAccessor accessor = + lsmIndex.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); //update resource lsn AbstractLSMIOOperationCallback ioOpCallback = - (AbstractLSMIOOperationCallback) iInfo.getIndex().getIOOperationCallback(); + (AbstractLSMIOOperationCallback) lsmIndex.getIOOperationCallback(); ioOpCallback.updateLastLSN(logRecord.getLSN()); + //schedule flush after update + accessor.scheduleFlush(lsmIndex.getIOOperationCallback()); } - - flushDatasetIndexes(indexInfos, dsInfo.isCorrelated()); - flushLogCreated = false; } @@ -199,65 +196,6 @@ public boolean isFlushLogCreated() { return flushLogCreated; - } - - public static void flushDatasetIndexes(Set<IndexInfo> indexes, boolean correlated) throws HyracksDataException { - if (!correlated) { - // if not correlated, we simply schedule flushes of each index independently - for (IndexInfo iInfo : indexes) { - ILSMIndex lsmIndex = iInfo.getIndex(); - //get resource - ILSMIndexAccessor accessor = - lsmIndex.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); - //schedule flush after update - accessor.scheduleFlush(lsmIndex.getIOOperationCallback(), null); - } - } else { - // otherwise, we need to schedule indexes properly s.t. the primary index would depend on - // all secondary indexes in the same partition - - // collect partitions - Set<Integer> partitions = new HashSet<>(); - indexes.forEach(iInfo -> partitions.add(iInfo.getPartition())); - for (Integer partition : partitions) { - flushCorrelatedDatasetIndexes(indexes, partition); - } - - } - } - - private static void flushCorrelatedDatasetIndexes(Set<IndexInfo> indexes, int partition) - throws HyracksDataException { - ILSMIndex primaryIndex = null; - List<ILSMIOOperation> flushOps = new ArrayList<>(); - for (IndexInfo iInfo : indexes) { - if (iInfo.getPartition() != partition) { - continue; - } - ILSMIndex lsmIndex = iInfo.getIndex(); - if (lsmIndex.isPrimaryIndex()) { - primaryIndex = lsmIndex; - } else { - //get resource - ILSMIndexAccessor accessor = - lsmIndex.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); - //schedule flush - ILSMIOOperation flushOp = accessor.scheduleFlush(lsmIndex.getIOOperationCallback(), null); - if (flushOp != null) { - flushOps.add(flushOp); - } - } - } - - if (primaryIndex != null) { - //get resource - ILSMIndexAccessor accessor = - primaryIndex.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); - //schedule flush after update - accessor.scheduleFlush(primaryIndex.getIOOperationCallback(), flushOps); - - } - } } diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java index ee795d5..9f071bb 100644 --- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java +++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java @@ -236,7 +236,7 @@ return null; } }).when(accessor).scheduleMerge(Mockito.any(ILSMIOOperationCallback.class), - Mockito.anyListOf(ILSMDiskComponent.class), Mockito.any()); + Mockito.anyListOf(ILSMDiskComponent.class)); Mockito.when(index.createAccessor(Mockito.any(IModificationOperationCallback.class), Mockito.any(ISearchOperationCallback.class))).thenReturn(accessor); diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java index f5c3013..3775985 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java @@ -56,7 +56,6 @@ import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences; import org.apache.hyracks.storage.am.lsm.common.impls.LSMTreeIndexAccessor; import org.apache.hyracks.storage.am.lsm.common.impls.LSMTreeIndexAccessor.ICursorFactory; -import org.apache.hyracks.storage.am.lsm.common.impls.MergeOperation; import org.apache.hyracks.storage.am.lsm.common.impls.TreeIndexFactory; import org.apache.hyracks.storage.common.IIndexBulkLoader; import org.apache.hyracks.storage.common.IIndexCursor; @@ -171,7 +170,7 @@ // The only reason to override the following method is that it uses a different context object // in addition, determining whether or not to keep deleted tuples is different here @Override - public ILSMIOOperation scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) + public void scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) throws HyracksDataException { ExternalBTreeOpContext opCtx = createOpContext(NoOpOperationCallback.INSTANCE, -1); opCtx.setOperation(IndexOperation.MERGE); @@ -196,11 +195,9 @@ LSMComponentFileReferences relMergeFileRefs = fileManager.getRelMergeFileReference(firstFile.getFile().getName(), lastFile.getFile().getName()); ILSMIndexAccessor accessor = new LSMTreeIndexAccessor(getLsmHarness(), opCtx, cursorFactory); - MergeOperation mergeOp = new LSMBTreeMergeOperation(accessor, cursor, + ioScheduler.scheduleOperation(new LSMBTreeMergeOperation(accessor, cursor, relMergeFileRefs.getInsertIndexFileReference(), relMergeFileRefs.getBloomFilterFileReference(), - callback, fileManager.getBaseDir().getAbsolutePath(), ctx.getDependingOps()); - ioScheduler.scheduleOperation(mergeOp); - return mergeOp; + callback, fileManager.getBaseDir().getAbsolutePath())); } // This function should only be used when a transaction fail. it doesn't @@ -372,7 +369,7 @@ // Not supported @Override - public ILSMIOOperation scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) + public void scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) throws HyracksDataException { throw new UnsupportedOperationException("flush not supported in LSM-Disk-Only-BTree"); } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java index 071a4bd..ff17905 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java @@ -319,7 +319,7 @@ } @Override - public ILSMIOOperation scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) + public void scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) throws HyracksDataException { throw HyracksDataException.create(ErrorCode.FLUSH_NOT_SUPPORTED_IN_EXTERNAL_INDEX); } @@ -342,7 +342,7 @@ } @Override - public ILSMIOOperation scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) + public void scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) throws HyracksDataException { ILSMIndexOperationContext bctx = createOpContext(NoOpOperationCallback.INSTANCE, 0); bctx.setOperation(IndexOperation.MERGE); @@ -363,12 +363,11 @@ keepDeleteTuples = mergingComponents.get(mergingComponents.size() - 1) != secondDiskComponents .get(secondDiskComponents.size() - 1); } - ILSMIOOperation mergeOp = + + ioScheduler.scheduleOperation( new LSMBTreeWithBuddyMergeOperation(accessor, cursor, relMergeFileRefs.getInsertIndexFileReference(), relMergeFileRefs.getDeleteIndexFileReference(), relMergeFileRefs.getBloomFilterFileReference(), - callback, fileManager.getBaseDir().getAbsolutePath(), keepDeleteTuples, ctx.getDependingOps()); - ioScheduler.scheduleOperation(mergeOp); - return mergeOp; + callback, fileManager.getBaseDir().getAbsolutePath(), keepDeleteTuples)); } // This method creates the appropriate opContext for the targeted version diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java index 00a489e..c7d45e1 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java @@ -462,8 +462,7 @@ LSMComponentFileReferences componentFileRefs, ILSMIOOperationCallback callback) { ILSMIndexAccessor accessor = createAccessor(opCtx); return new LSMBTreeFlushOperation(accessor, componentFileRefs.getInsertIndexFileReference(), - componentFileRefs.getBloomFilterFileReference(), callback, fileManager.getBaseDir().getAbsolutePath(), - opCtx.getDependingOps()); + componentFileRefs.getBloomFilterFileReference(), callback, fileManager.getBaseDir().getAbsolutePath()); } @Override @@ -612,7 +611,6 @@ } ITreeIndexCursor cursor = new LSMBTreeRangeSearchCursor(opCtx, returnDeletedTuples); return new LSMBTreeMergeOperation(accessor, cursor, mergeFileRefs.getInsertIndexFileReference(), - mergeFileRefs.getBloomFilterFileReference(), callback, fileManager.getBaseDir().getAbsolutePath(), - opCtx.getDependingOps()); + mergeFileRefs.getBloomFilterFileReference(), callback, fileManager.getBaseDir().getAbsolutePath()); } } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFlushOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFlushOperation.java index 92b53de..e3424e5 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFlushOperation.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFlushOperation.java @@ -18,10 +18,7 @@ */ package org.apache.hyracks.storage.am.lsm.btree.impls; -import java.util.List; - import org.apache.hyracks.api.io.FileReference; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor; import org.apache.hyracks.storage.am.lsm.common.impls.FlushOperation; @@ -30,9 +27,8 @@ private final FileReference bloomFilterFlushTarget; public LSMBTreeFlushOperation(ILSMIndexAccessor accessor, FileReference flushTarget, - FileReference bloomFilterFlushTarget, ILSMIOOperationCallback callback, String indexIdentifier, - List<ILSMIOOperation> dependingOps) { - super(accessor, flushTarget, callback, indexIdentifier, dependingOps); + FileReference bloomFilterFlushTarget, ILSMIOOperationCallback callback, String indexIdentifier) { + super(accessor, flushTarget, callback, indexIdentifier); this.bloomFilterFlushTarget = bloomFilterFlushTarget; } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeMergeOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeMergeOperation.java index 40ac7b1..ec96303 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeMergeOperation.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeMergeOperation.java @@ -19,11 +19,8 @@ package org.apache.hyracks.storage.am.lsm.btree.impls; -import java.util.List; - import org.apache.hyracks.api.io.FileReference; import org.apache.hyracks.storage.am.common.api.ITreeIndexCursor; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor; import org.apache.hyracks.storage.am.lsm.common.impls.MergeOperation; @@ -33,9 +30,8 @@ private final FileReference bloomFilterMergeTarget; public LSMBTreeMergeOperation(ILSMIndexAccessor accessor, ITreeIndexCursor cursor, FileReference target, - FileReference bloomFilterMergeTarget, ILSMIOOperationCallback callback, String indexIdentifier, - List<ILSMIOOperation> dependingOps) { - super(accessor, target, callback, indexIdentifier, cursor, dependingOps); + FileReference bloomFilterMergeTarget, ILSMIOOperationCallback callback, String indexIdentifier) { + super(accessor, target, callback, indexIdentifier, cursor); this.bloomFilterMergeTarget = bloomFilterMergeTarget; } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyMergeOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyMergeOperation.java index e0c1512..f682bde 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyMergeOperation.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyMergeOperation.java @@ -18,11 +18,8 @@ */ package org.apache.hyracks.storage.am.lsm.btree.impls; -import java.util.List; - import org.apache.hyracks.api.io.FileReference; import org.apache.hyracks.storage.am.common.api.ITreeIndexCursor; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor; import org.apache.hyracks.storage.am.lsm.common.impls.MergeOperation; @@ -35,8 +32,8 @@ public LSMBTreeWithBuddyMergeOperation(ILSMIndexAccessor accessor, ITreeIndexCursor cursor, FileReference target, FileReference buddyBtreeMergeTarget, FileReference bloomFilterMergeTarget, ILSMIOOperationCallback callback, - String indexIdentifier, boolean keepDeletedTuples, List<ILSMIOOperation> dependingOps) { - super(accessor, target, callback, indexIdentifier, cursor, dependingOps); + String indexIdentifier, boolean keepDeletedTuples) { + super(accessor, target, callback, indexIdentifier, cursor); this.buddyBtreeMergeTarget = buddyBtreeMergeTarget; this.bloomFilterMergeTarget = bloomFilterMergeTarget; this.keepDeletedTuples = keepDeletedTuples; diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java index 380b2f2..89c8cb9 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java @@ -105,12 +105,10 @@ * * @param ctx * @param callback - * @return The scheduled merge operation, used for the caller to track its status * @throws HyracksDataException * @throws IndexException */ - ILSMIOOperation scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) - throws HyracksDataException; + void scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) throws HyracksDataException; /** * Schedule full merge @@ -137,11 +135,9 @@ * * @param ctx * @param callback - * @return The scheduled flush operation, used for the caller to track its status * @throws HyracksDataException */ - ILSMIOOperation scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) - throws HyracksDataException; + void scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) throws HyracksDataException; /** * Perform a flush diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java index ff1613b..c2ae786 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java @@ -18,7 +18,6 @@ */ package org.apache.hyracks.storage.am.lsm.common.api; -import java.util.List; import java.util.concurrent.Callable; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -64,19 +63,7 @@ FileReference getTarget(); /** - * <<<<<<< HEAD - * * @return the accessor of the operation */ ILSMIndexAccessor getAccessor(); - - /** - * @return whether this operation has finished - */ - boolean isFinished(); - - /** - * @return a list of operations that this operation depends on - */ - List<ILSMIOOperation> getDependingOps(); } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java index 5f43bcd..addeb27 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java @@ -67,13 +67,11 @@ public void scanDiskComponents(ILSMIndexOperationContext ctx, IIndexCursor cursor) throws HyracksDataException; - ILSMIOOperation scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) - throws HyracksDataException; + void scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) throws HyracksDataException; ILSMDiskComponent flush(ILSMIOOperation operation) throws HyracksDataException; - ILSMIOOperation scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) - throws HyracksDataException; + void scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) throws HyracksDataException; ILSMDiskComponent merge(ILSMIOOperation operation) throws HyracksDataException; diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java index b303a39..b8d64af 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java @@ -46,13 +46,9 @@ * * @param callback * the IO operation callback - * @param dependingOps - * other operations that this operation depends on - * @return The scheduled flush operation * @throws HyracksDataException */ - ILSMIOOperation scheduleFlush(ILSMIOOperationCallback callback, List<ILSMIOOperation> dependingOps) - throws HyracksDataException; + void scheduleFlush(ILSMIOOperationCallback callback) throws HyracksDataException; /** * Schedule a merge operation @@ -61,13 +57,11 @@ * the merge operation callback * @param components * the components to be merged - * @param dependingOps - * other operations that this operation depends on - * @return The scheduled merge operation * @throws HyracksDataException + * @throws IndexException */ - ILSMIOOperation scheduleMerge(ILSMIOOperationCallback callback, List<ILSMDiskComponent> components, - List<ILSMIOOperation> dependingOps) throws HyracksDataException; + void scheduleMerge(ILSMIOOperationCallback callback, List<ILSMDiskComponent> components) + throws HyracksDataException; /** * Schedule a full merge diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java index 66af93f..5b0378a 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java @@ -56,8 +56,4 @@ PermutingTupleReference getFilterTuple(); MultiComparator getFilterCmp(); - - List<ILSMIOOperation> getDependingOps(); - - void setDependingOps(List<ILSMIOOperation> dependingOps); } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractIoOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractIoOperation.java index 1d13c94..aee46f0 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractIoOperation.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractIoOperation.java @@ -18,10 +18,6 @@ */ package org.apache.hyracks.storage.am.lsm.common.impls; -import java.util.List; -import java.util.concurrent.atomic.AtomicBoolean; - -import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.io.FileReference; import org.apache.hyracks.api.io.IODeviceHandle; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation; @@ -34,32 +30,13 @@ protected final FileReference target; protected final ILSMIOOperationCallback callback; protected final String indexIdentifier; - protected final List<ILSMIOOperation> dependingOps; - - protected AtomicBoolean isFinished = new AtomicBoolean(false); public AbstractIoOperation(ILSMIndexAccessor accessor, FileReference target, ILSMIOOperationCallback callback, - String indexIdentifier, List<ILSMIOOperation> dependingOps) { + String indexIdentifier) { this.accessor = accessor; this.target = target; this.callback = callback; this.indexIdentifier = indexIdentifier; - this.dependingOps = dependingOps; - } - - protected abstract void callInternal() throws HyracksDataException; - - @Override - public Boolean call() throws HyracksDataException { - try { - callInternal(); - } finally { - synchronized (this) { - isFinished.set(true); - notifyAll(); - } - } - return true; } @Override @@ -85,15 +62,5 @@ @Override public String getIndexIdentifier() { return indexIdentifier; - } - - @Override - public List<ILSMIOOperation> getDependingOps() { - return dependingOps; - } - - @Override - public boolean isFinished() { - return isFinished.get(); } } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java index e7b21cb..dc64f9b 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java @@ -86,7 +86,6 @@ protected final int[] treeFields; protected final int[] filterFields; protected final boolean durable; - protected final ILSMMergePolicy mergePolicy; protected boolean isActive; protected final AtomicBoolean[] flushRequests; protected boolean memoryComponentsAllocated = false; @@ -114,7 +113,6 @@ this.inactiveDiskComponents = new LinkedList<>(); this.durable = durable; this.tracer = tracer; - this.mergePolicy = mergePolicy; lsmHarness = new LSMHarness(this, mergePolicy, opTracker, diskBufferCache.isReplicationEnabled(), tracer); isActive = false; diskComponents = new ArrayList<>(); @@ -137,7 +135,6 @@ this.ioScheduler = ioScheduler; this.ioOpCallback = ioOpCallback; this.durable = durable; - this.mergePolicy = mergePolicy; lsmHarness = new ExternalIndexHarness(this, mergePolicy, opTracker, diskBufferCache.isReplicationEnabled()); isActive = false; diskComponents = new LinkedList<>(); @@ -202,7 +199,7 @@ protected void flushMemoryComponent() throws HyracksDataException { BlockingIOOperationCallbackWrapper cb = new BlockingIOOperationCallbackWrapper(ioOpCallback); ILSMIndexAccessor accessor = createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); - accessor.scheduleFlush(cb, null); + accessor.scheduleFlush(cb); try { cb.waitForIO(); } catch (InterruptedException e) { @@ -329,21 +326,19 @@ } @Override - public ILSMIOOperation scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) + public void scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) throws HyracksDataException { LSMComponentFileReferences componentFileRefs = fileManager.getRelFlushFileReference(); AbstractLSMIndexOperationContext opCtx = createOpContext(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); opCtx.setOperation(ctx.getOperation()); opCtx.getComponentHolder().addAll(ctx.getComponentHolder()); - opCtx.setDependingOps(ctx.getDependingOps()); ILSMIOOperation flushOp = createFlushOperation(opCtx, componentFileRefs, callback); ioScheduler.scheduleOperation(TracedIOOperation.wrap(flushOp, tracer)); - return flushOp; } @Override - public ILSMIOOperation scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) + public void scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) throws HyracksDataException { List<ILSMComponent> mergingComponents = ctx.getComponentHolder(); // merge must create a different op ctx @@ -351,13 +346,11 @@ createOpContext(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); opCtx.setOperation(ctx.getOperation()); opCtx.getComponentHolder().addAll(mergingComponents); - opCtx.setDependingOps(ctx.getDependingOps()); ILSMDiskComponent firstComponent = (ILSMDiskComponent) mergingComponents.get(0); ILSMDiskComponent lastComponent = (ILSMDiskComponent) mergingComponents.get(mergingComponents.size() - 1); LSMComponentFileReferences mergeFileRefs = getMergeFileReferences(firstComponent, lastComponent); - MergeOperation mergeOp = (MergeOperation) createMergeOperation(opCtx, mergeFileRefs, callback); + ILSMIOOperation mergeOp = createMergeOperation(opCtx, mergeFileRefs, callback); ioScheduler.scheduleOperation(TracedIOOperation.wrap(mergeOp, tracer)); - return mergeOp; } private void addOperationalMutableComponents(List<ILSMComponent> operationalComponents) { @@ -633,10 +626,6 @@ ILSMIndexOperationContext opCtx = accessor.getOpContext(); return opCtx.getOperation() == IndexOperation.DELETE_DISK_COMPONENTS ? EmptyComponent.INSTANCE : doMerge(operation); - } - - public ILSMMergePolicy getMergePolicy() { - return mergePolicy; } public abstract Set<String> getLSMComponentPhysicalFiles(ILSMComponent newComponent); diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexOperationContext.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexOperationContext.java index 5fdeafd..065d465 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexOperationContext.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexOperationContext.java @@ -27,7 +27,6 @@ import org.apache.hyracks.storage.am.common.tuples.PermutingTupleReference; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent; import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext; import org.apache.hyracks.storage.common.IModificationOperationCallback; import org.apache.hyracks.storage.common.ISearchOperationCallback; @@ -48,7 +47,6 @@ protected IndexOperation op; protected boolean accessingComponents = false; protected ISearchPredicate searchPredicate; - protected final List<ILSMIOOperation> dependingOps; public AbstractLSMIndexOperationContext(int[] treeFields, int[] filterFields, IBinaryComparatorFactory[] filterCmpFactories, ISearchOperationCallback searchCallback, @@ -58,7 +56,6 @@ this.componentHolder = new LinkedList<>(); this.componentsToBeMerged = new LinkedList<>(); this.componentsToBeReplicated = new LinkedList<>(); - this.dependingOps = new LinkedList<>(); if (filterFields != null) { indexTuple = new PermutingTupleReference(treeFields); filterCmp = MultiComparator.create(filterCmpFactories); @@ -155,18 +152,5 @@ @Override public ISearchPredicate getSearchPredicate() { return searchPredicate; - } - - @Override - public List<ILSMIOOperation> getDependingOps() { - return dependingOps; - } - - @Override - public void setDependingOps(List<ILSMIOOperation> dependingOps) { - this.dependingOps.clear(); - if (dependingOps != null) { - this.dependingOps.addAll(dependingOps); - } } } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java index 7ac9bfb..847b882 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java @@ -49,7 +49,7 @@ } else if (immutableComponents.size() >= numComponents) { ILSMIndexAccessor accessor = index.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); - accessor.scheduleMerge(index.getIOOperationCallback(), immutableComponents, null); + accessor.scheduleMerge(index.getIOOperationCallback(), immutableComponents); } } @@ -106,7 +106,7 @@ } ILSMIndexAccessor accessor = index.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); - accessor.scheduleMerge(index.getIOOperationCallback(), immutableComponents, null); + accessor.scheduleMerge(index.getIOOperationCallback(), immutableComponents); return true; } } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ExternalIndexHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ExternalIndexHarness.java index b93d943..2f65b18 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ExternalIndexHarness.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ExternalIndexHarness.java @@ -203,13 +203,13 @@ } @Override - public ILSMIOOperation scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) + public void scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) throws HyracksDataException { if (!getAndEnterComponents(ctx, LSMOperationType.MERGE, true)) { callback.afterFinalize(LSMOperationType.MERGE, null); - return null; + return; } - return lsmIndex.scheduleMerge(ctx, callback); + lsmIndex.scheduleMerge(ctx, callback); } @Override @@ -297,10 +297,9 @@ } @Override - public ILSMIOOperation scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) + public void scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) throws HyracksDataException { callback.afterFinalize(LSMOperationType.FLUSH, null); - return null; } @Override diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/FlushOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/FlushOperation.java index 750c690..7b7f950 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/FlushOperation.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/FlushOperation.java @@ -18,7 +18,6 @@ */ package org.apache.hyracks.storage.am.lsm.common.impls; -import java.util.List; import java.util.Objects; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -31,13 +30,14 @@ public class FlushOperation extends AbstractIoOperation implements Comparable<ILSMIOOperation> { public FlushOperation(ILSMIndexAccessor accessor, FileReference target, ILSMIOOperationCallback callback, - String indexIdentifier, List<ILSMIOOperation> dependingOps) { - super(accessor, target, callback, indexIdentifier, dependingOps); + String indexIdentifier) { + super(accessor, target, callback, indexIdentifier); } @Override - protected void callInternal() throws HyracksDataException { + public Boolean call() throws HyracksDataException { accessor.flush(this); + return true; } @Override diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java index bc3a5a1..1069f8f 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java @@ -500,13 +500,13 @@ } @Override - public ILSMIOOperation scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) + public void scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) throws HyracksDataException { if (!getAndEnterComponents(ctx, LSMOperationType.FLUSH, true)) { callback.afterFinalize(LSMOperationType.FLUSH, null); - return null; + return; } - return lsmIndex.scheduleFlush(ctx, callback); + lsmIndex.scheduleFlush(ctx, callback); } @Override @@ -519,7 +519,6 @@ boolean failedOperation = false; try { newComponent = lsmIndex.flush(operation); - waitForDependingOps(operation); operation.getCallback().afterOperation(LSMOperationType.FLUSH, null, newComponent); newComponent.markAsValid(lsmIndex.isDurable()); } catch (Throwable e) { @@ -538,13 +537,13 @@ } @Override - public ILSMIOOperation scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) + public void scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) throws HyracksDataException { if (!getAndEnterComponents(ctx, LSMOperationType.MERGE, true)) { callback.afterFinalize(LSMOperationType.MERGE, null); - return null; + return; } - return lsmIndex.scheduleMerge(ctx, callback); + lsmIndex.scheduleMerge(ctx, callback); } @Override @@ -571,7 +570,6 @@ boolean failedOperation = false; try { newComponent = lsmIndex.merge(operation); - waitForDependingOps(operation); operation.getCallback().afterOperation(LSMOperationType.MERGE, ctx.getComponentHolder(), newComponent); newComponent.markAsValid(lsmIndex.isDurable()); } catch (Throwable e) { @@ -750,32 +748,6 @@ if (LOGGER.isLoggable(Level.WARNING)) { LOGGER.log(Level.WARNING, "Ignoring interrupt while waiting for lagging merge on " + lsmIndex, e); - } - } - } - } - } - - /** - * Wait for depending operations to finish. - * - * @param op - */ - private void waitForDependingOps(ILSMIOOperation op) throws HyracksDataException { - List<ILSMIOOperation> dependingOps = op.getDependingOps(); - if (dependingOps == null) { - return; - } - for (ILSMIOOperation dependingOp : dependingOps) { - if (dependingOp != null && !dependingOp.isFinished()) { - synchronized (dependingOp) { - while (!dependingOp.isFinished()) { - try { - dependingOp.wait(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw HyracksDataException.create(e); - } } } } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java index f008fde..c0fd443 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java @@ -135,21 +135,18 @@ } @Override - public ILSMIOOperation scheduleFlush(ILSMIOOperationCallback callback, List<ILSMIOOperation> dependingOps) - throws HyracksDataException { + public void scheduleFlush(ILSMIOOperationCallback callback) throws HyracksDataException { ctx.setOperation(IndexOperation.FLUSH); - ctx.setDependingOps(dependingOps); - return lsmHarness.scheduleFlush(ctx, callback); + lsmHarness.scheduleFlush(ctx, callback); } @Override - public ILSMIOOperation scheduleMerge(ILSMIOOperationCallback callback, List<ILSMDiskComponent> components, - List<ILSMIOOperation> dependingOps) throws HyracksDataException { + public void scheduleMerge(ILSMIOOperationCallback callback, List<ILSMDiskComponent> components) + throws HyracksDataException { ctx.setOperation(IndexOperation.MERGE); ctx.getComponentsToBeMerged().clear(); ctx.getComponentsToBeMerged().addAll(components); - ctx.setDependingOps(dependingOps); - return lsmHarness.scheduleMerge(ctx, callback); + lsmHarness.scheduleMerge(ctx, callback); } @Override diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MergeOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MergeOperation.java index 2210fd0..c83d534 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MergeOperation.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MergeOperation.java @@ -23,7 +23,6 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.io.FileReference; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor; import org.apache.hyracks.storage.common.IIndexCursor; @@ -32,13 +31,19 @@ protected final IIndexCursor cursor; public MergeOperation(ILSMIndexAccessor accessor, FileReference target, ILSMIOOperationCallback callback, - String indexIdentifier, IIndexCursor cursor, List<ILSMIOOperation> dependingOps) { - super(accessor, target, callback, indexIdentifier, dependingOps); + String indexIdentifier, IIndexCursor cursor) { + super(accessor, target, callback, indexIdentifier); this.cursor = cursor; } public List<ILSMComponent> getMergingComponents() { return accessor.getOpContext().getComponentHolder(); + } + + @Override + public Boolean call() throws HyracksDataException { + accessor.merge(this); + return true; } @Override @@ -48,11 +53,5 @@ public IIndexCursor getCursor() { return cursor; - } - - @Override - protected void callInternal() throws HyracksDataException { - accessor.merge(this); - } } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/PrefixMergePolicy.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/PrefixMergePolicy.java index f159232..7d7266e 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/PrefixMergePolicy.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/PrefixMergePolicy.java @@ -247,7 +247,7 @@ Collections.reverse(mergableComponents); ILSMIndexAccessor accessor = index.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); - accessor.scheduleMerge(index.getIOOperationCallback(), mergableComponents, null); + accessor.scheduleMerge(index.getIOOperationCallback(), mergableComponents); } /** diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ThreadCountingTracker.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ThreadCountingTracker.java index 7cdcc52..85081a1 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ThreadCountingTracker.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ThreadCountingTracker.java @@ -59,7 +59,7 @@ && index.hasFlushRequestForCurrentMutableComponent()) { ILSMIndexAccessor accessor = index.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); - accessor.scheduleFlush(NoOpIOOperationCallbackFactory.INSTANCE.createIoOpCallback(), null); + accessor.scheduleFlush(NoOpIOOperationCallbackFactory.INSTANCE.createIoOpCallback()); } } } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/TracedIOOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/TracedIOOperation.java index 3e35e51..9cc8022 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/TracedIOOperation.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/TracedIOOperation.java @@ -19,7 +19,6 @@ package org.apache.hyracks.storage.am.lsm.common.impls; -import java.util.List; import java.util.logging.Logger; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -100,16 +99,6 @@ @Override public ILSMIndexAccessor getAccessor() { return ioOp.getAccessor(); - } - - @Override - public boolean isFinished() { - return ioOp.isFinished(); - } - - @Override - public List<ILSMIOOperation> getDependingOps() { - return ioOp.getDependingOps(); } } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java index 60da50a..eb3924c 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java @@ -691,20 +691,17 @@ LSMComponentFileReferences componentFileRefs, ILSMIOOperationCallback callback) throws HyracksDataException { return new LSMInvertedIndexFlushOperation(new LSMInvertedIndexAccessor(getLsmHarness(), opCtx), - componentFileRefs.getInsertIndexFileReference(), - componentFileRefs.getDeleteIndexFileReference(), componentFileRefs.getBloomFilterFileReference(), - callback, fileManager.getBaseDir().getAbsolutePath(), opCtx.getDependingOps()); + componentFileRefs.getInsertIndexFileReference(), componentFileRefs.getDeleteIndexFileReference(), + componentFileRefs.getBloomFilterFileReference(), callback, fileManager.getBaseDir().getAbsolutePath()); } @Override protected ILSMIOOperation createMergeOperation(AbstractLSMIndexOperationContext opCtx, - LSMComponentFileReferences mergeFileRefs, - ILSMIOOperationCallback callback) throws HyracksDataException { + LSMComponentFileReferences mergeFileRefs, ILSMIOOperationCallback callback) throws HyracksDataException { ILSMIndexAccessor accessor = new LSMInvertedIndexAccessor(getLsmHarness(), opCtx); IIndexCursor cursor = new LSMInvertedIndexRangeSearchCursor(opCtx); - return new LSMInvertedIndexMergeOperation(accessor, cursor, - mergeFileRefs.getInsertIndexFileReference(), mergeFileRefs.getDeleteIndexFileReference(), - mergeFileRefs.getBloomFilterFileReference(), callback, fileManager.getBaseDir().getAbsolutePath(), - opCtx.getDependingOps()); + return new LSMInvertedIndexMergeOperation(accessor, cursor, mergeFileRefs.getInsertIndexFileReference(), + mergeFileRefs.getDeleteIndexFileReference(), mergeFileRefs.getBloomFilterFileReference(), callback, + fileManager.getBaseDir().getAbsolutePath()); } } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java index 242bc83..61fc84e 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java @@ -85,11 +85,9 @@ } @Override - public ILSMIOOperation scheduleFlush(ILSMIOOperationCallback callback, List<ILSMIOOperation> dependingOps) - throws HyracksDataException { + public void scheduleFlush(ILSMIOOperationCallback callback) throws HyracksDataException { ctx.setOperation(IndexOperation.FLUSH); - ctx.setDependingOps(dependingOps); - return lsmHarness.scheduleFlush(ctx, callback); + lsmHarness.scheduleFlush(ctx, callback); } @Override @@ -98,13 +96,12 @@ } @Override - public ILSMIOOperation scheduleMerge(ILSMIOOperationCallback callback, List<ILSMDiskComponent> components, - List<ILSMIOOperation> dependingOps) throws HyracksDataException { + public void scheduleMerge(ILSMIOOperationCallback callback, List<ILSMDiskComponent> components) + throws HyracksDataException { ctx.setOperation(IndexOperation.MERGE); ctx.getComponentsToBeMerged().clear(); ctx.getComponentsToBeMerged().addAll(components); - ctx.setDependingOps(dependingOps); - return lsmHarness.scheduleMerge(ctx, callback); + lsmHarness.scheduleMerge(ctx, callback); } @Override @@ -119,7 +116,6 @@ @Override public void scheduleFullMerge(ILSMIOOperationCallback callback) throws HyracksDataException { ctx.setOperation(IndexOperation.FULL_MERGE); - ctx.setDependingOps(null); lsmHarness.scheduleFullMerge(ctx, callback); } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFlushOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFlushOperation.java index 30e1cec..2106f6a 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFlushOperation.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFlushOperation.java @@ -19,10 +19,7 @@ package org.apache.hyracks.storage.am.lsm.invertedindex.impls; -import java.util.List; - import org.apache.hyracks.api.io.FileReference; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor; import org.apache.hyracks.storage.am.lsm.common.impls.FlushOperation; @@ -33,8 +30,8 @@ public LSMInvertedIndexFlushOperation(ILSMIndexAccessor accessor, FileReference flushTarget, FileReference deletedKeysBTreeFlushTarget, FileReference bloomFilterFlushTarget, - ILSMIOOperationCallback callback, String indexIdentifier, List<ILSMIOOperation> dependingOps) { - super(accessor, flushTarget, callback, indexIdentifier, dependingOps); + ILSMIOOperationCallback callback, String indexIdentifier) { + super(accessor, flushTarget, callback, indexIdentifier); this.deletedKeysBTreeFlushTarget = deletedKeysBTreeFlushTarget; this.bloomFilterFlushTarget = bloomFilterFlushTarget; } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexMergeOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexMergeOperation.java index 8361f24..2c1db0f 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexMergeOperation.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexMergeOperation.java @@ -19,10 +19,7 @@ package org.apache.hyracks.storage.am.lsm.invertedindex.impls; -import java.util.List; - import org.apache.hyracks.api.io.FileReference; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor; import org.apache.hyracks.storage.am.lsm.common.impls.MergeOperation; @@ -34,8 +31,8 @@ public LSMInvertedIndexMergeOperation(ILSMIndexAccessor accessor, IIndexCursor cursor, FileReference target, FileReference deletedKeysBTreeMergeTarget, FileReference bloomFilterMergeTarget, - ILSMIOOperationCallback callback, String indexIdentifier, List<ILSMIOOperation> dependingOps) { - super(accessor, target, callback, indexIdentifier, cursor, dependingOps); + ILSMIOOperationCallback callback, String indexIdentifier) { + super(accessor, target, callback, indexIdentifier, cursor); this.deletedKeysBTreeMergeTarget = deletedKeysBTreeMergeTarget; this.bloomFilterMergeTarget = bloomFilterMergeTarget; } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/ExternalRTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/ExternalRTree.java index 110d873..6595403 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/ExternalRTree.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/ExternalRTree.java @@ -416,7 +416,7 @@ // Not supported @Override - public ILSMIOOperation scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) + public void scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) throws HyracksDataException { throw new UnsupportedOperationException("flush not supported in LSM-Disk-Only-RTree"); } @@ -623,7 +623,7 @@ // The only change the the schedule merge is the method used to create the // opCtx. first line <- in schedule merge, we-> @Override - public ILSMIOOperation scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) + public void scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) throws HyracksDataException { ILSMIndexOperationContext rctx = createOpContext(NoOpOperationCallback.INSTANCE, -1); rctx.setOperation(IndexOperation.MERGE); @@ -634,12 +634,10 @@ (ILSMDiskComponent) mergingComponents.get(mergingComponents.size() - 1)); ILSMIndexAccessor accessor = new LSMRTreeAccessor(getLsmHarness(), rctx, buddyBTreeFields); // create the merge operation. - ILSMIOOperation mergeOp = - new LSMRTreeMergeOperation(accessor, cursor, relMergeFileRefs.getInsertIndexFileReference(), - relMergeFileRefs.getDeleteIndexFileReference(), relMergeFileRefs.getBloomFilterFileReference(), - callback, fileManager.getBaseDir().getAbsolutePath(), ctx.getDependingOps()); + LSMRTreeMergeOperation mergeOp = new LSMRTreeMergeOperation(accessor, cursor, + relMergeFileRefs.getInsertIndexFileReference(), relMergeFileRefs.getDeleteIndexFileReference(), + relMergeFileRefs.getBloomFilterFileReference(), callback, fileManager.getBaseDir().getAbsolutePath()); ioScheduler.scheduleOperation(mergeOp); - return mergeOp; } @Override diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java index a4d6a65..ca0e4e1 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java @@ -420,7 +420,7 @@ LSMRTreeAccessor accessor = new LSMRTreeAccessor(getLsmHarness(), opCtx, buddyBTreeFields); return new LSMRTreeFlushOperation(accessor, componentFileRefs.getInsertIndexFileReference(), componentFileRefs.getDeleteIndexFileReference(), componentFileRefs.getBloomFilterFileReference(), - callback, fileManager.getBaseDir().getAbsolutePath(), opCtx.getDependingOps()); + callback, fileManager.getBaseDir().getAbsolutePath()); } @Override @@ -430,6 +430,6 @@ ILSMIndexAccessor accessor = new LSMRTreeAccessor(getLsmHarness(), opCtx, buddyBTreeFields); return new LSMRTreeMergeOperation(accessor, cursor, mergeFileRefs.getInsertIndexFileReference(), mergeFileRefs.getDeleteIndexFileReference(), mergeFileRefs.getBloomFilterFileReference(), callback, - fileManager.getBaseDir().getAbsolutePath(), opCtx.getDependingOps()); + fileManager.getBaseDir().getAbsolutePath()); } } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFlushOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFlushOperation.java index a08d854..6991c56 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFlushOperation.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFlushOperation.java @@ -18,10 +18,7 @@ */ package org.apache.hyracks.storage.am.lsm.rtree.impls; -import java.util.List; - import org.apache.hyracks.api.io.FileReference; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor; import org.apache.hyracks.storage.am.lsm.common.impls.FlushOperation; @@ -32,9 +29,8 @@ private final FileReference bloomFilterFlushTarget; public LSMRTreeFlushOperation(ILSMIndexAccessor accessor, FileReference flushTarget, FileReference btreeFlushTarget, - FileReference bloomFilterFlushTarget, ILSMIOOperationCallback callback, String indexIdentifier, - List<ILSMIOOperation> dependingOps) { - super(accessor, flushTarget, callback, indexIdentifier, dependingOps); + FileReference bloomFilterFlushTarget, ILSMIOOperationCallback callback, String indexIdentifier) { + super(accessor, flushTarget, callback, indexIdentifier); this.btreeFlushTarget = btreeFlushTarget; this.bloomFilterFlushTarget = bloomFilterFlushTarget; } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeMergeOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeMergeOperation.java index a07e57b..83872cf 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeMergeOperation.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeMergeOperation.java @@ -18,11 +18,8 @@ */ package org.apache.hyracks.storage.am.lsm.rtree.impls; -import java.util.List; - import org.apache.hyracks.api.io.FileReference; import org.apache.hyracks.storage.am.common.api.ITreeIndexCursor; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor; import org.apache.hyracks.storage.am.lsm.common.impls.MergeOperation; @@ -33,8 +30,8 @@ public LSMRTreeMergeOperation(ILSMIndexAccessor accessor, ITreeIndexCursor cursor, FileReference target, FileReference btreeMergeTarget, FileReference bloomFilterMergeTarget, ILSMIOOperationCallback callback, - String indexIdentifier, List<ILSMIOOperation> dependingOps) { - super(accessor, target, callback, indexIdentifier, cursor, dependingOps); + String indexIdentifier) { + super(accessor, target, callback, indexIdentifier, cursor); this.btreeMergeTarget = btreeMergeTarget; this.bloomFilterMergeTarget = bloomFilterMergeTarget; } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java index a18f10c..1e15455 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java @@ -332,7 +332,7 @@ throws HyracksDataException { ILSMIndexAccessor accessor = new LSMTreeIndexAccessor(getLsmHarness(), opCtx, cursorFactory); return new LSMRTreeFlushOperation(accessor, componentFileRefs.getInsertIndexFileReference(), null, null, - callback, fileManager.getBaseDir().getAbsolutePath(), opCtx.getDependingOps()); + callback, fileManager.getBaseDir().getAbsolutePath()); } @Override @@ -346,6 +346,6 @@ ITreeIndexCursor cursor = new LSMRTreeWithAntiMatterTuplesSearchCursor(opCtx, returnDeletedTuples); ILSMIndexAccessor accessor = new LSMTreeIndexAccessor(getLsmHarness(), opCtx, cursorFactory); return new MergeOperation(accessor, mergeFileRefs.getInsertIndexFileReference(), callback, - fileManager.getBaseDir().getAbsolutePath(), cursor, opCtx.getDependingOps()); + fileManager.getBaseDir().getAbsolutePath(), cursor); } } diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeFileManagerTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeFileManagerTest.java index 4f7515e..5d6d8de 100644 --- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeFileManagerTest.java +++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeFileManagerTest.java @@ -72,7 +72,7 @@ accessor.insert(tuple); // Flush to generate a disk component - accessor.scheduleFlush(NoOpIOOperationCallbackFactory.INSTANCE.createIoOpCallback(), null); + accessor.scheduleFlush(NoOpIOOperationCallbackFactory.INSTANCE.createIoOpCallback()); // Make sure the disk component was generated LSMBTree btree = (LSMBTree) ctx.getIndex(); diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeFilterMergeTestDriver.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeFilterMergeTestDriver.java index 98faf01..c5eb97c 100644 --- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeFilterMergeTestDriver.java +++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeFilterMergeTestDriver.java @@ -117,7 +117,7 @@ StubIOOperationCallback stub = new StubIOOperationCallback(); BlockingIOOperationCallbackWrapper waiter = new BlockingIOOperationCallbackWrapper(stub); - accessor.scheduleFlush(waiter, null); + accessor.scheduleFlush(waiter); waiter.waitForIO(); if (minMax != null) { Pair<ITupleReference, ITupleReference> obsMinMax = @@ -146,7 +146,7 @@ } } accessor.scheduleMerge(NoOpIOOperationCallbackFactory.INSTANCE.createIoOpCallback(), - ((LSMBTree) ctx.getIndex()).getDiskComponents(), null); + ((LSMBTree) ctx.getIndex()).getDiskComponents()); flushedComponents = ((LSMBTree) ctx.getIndex()).getDiskComponents(); Pair<ITupleReference, ITupleReference> mergedMinMax = diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeMergeTestDriver.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeMergeTestDriver.java index 15dbfdb..7dac1e5 100644 --- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeMergeTestDriver.java +++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeMergeTestDriver.java @@ -76,7 +76,7 @@ ILSMIndexAccessor accessor = (ILSMIndexAccessor) ctx.getIndexAccessor(); accessor.scheduleMerge(NoOpIOOperationCallbackFactory.INSTANCE.createIoOpCallback(), - ((LSMBTree) ctx.getIndex()).getDiskComponents(), null); + ((LSMBTree) ctx.getIndex()).getDiskComponents()); orderedIndexTestUtils.checkPointSearches(ctx); orderedIndexTestUtils.checkScan(ctx); diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeModificationOperationCallbackTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeModificationOperationCallbackTest.java index 11ecf0b..b633614 100644 --- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeModificationOperationCallbackTest.java +++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeModificationOperationCallbackTest.java @@ -81,7 +81,7 @@ } if (j == 1) { - accessor.scheduleFlush(ioOpCallback, null); + accessor.scheduleFlush(ioOpCallback); ioOpCallback.waitForIO(); isFoundNull = true; } else { @@ -94,7 +94,7 @@ } if (j == 1) { - accessor.scheduleFlush(ioOpCallback, null); + accessor.scheduleFlush(ioOpCallback); ioOpCallback.waitForIO(); isFoundNull = true; } else { @@ -106,7 +106,7 @@ accessor.delete(tuple); } - accessor.scheduleFlush(ioOpCallback, null); + accessor.scheduleFlush(ioOpCallback); ioOpCallback.waitForIO(); } } diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeScanDiskComponentsTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeScanDiskComponentsTest.java index f3cf9de..acbeaef 100644 --- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeScanDiskComponentsTest.java +++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeScanDiskComponentsTest.java @@ -19,7 +19,7 @@ package org.apache.hyracks.storage.am.lsm.btree; -import static org.junit.Assert.*; +import static org.junit.Assert.fail; import java.io.ByteArrayInputStream; import java.io.DataInput; @@ -116,17 +116,17 @@ //component 2 contains 1 and 2 upsertTuple(ctx, fieldSerdes, getValue(1, fieldSerdes)); upsertTuple(ctx, fieldSerdes, getValue(2, fieldSerdes)); - accessor.scheduleFlush(NoOpIOOperationCallbackFactory.INSTANCE.createIoOpCallback(), null); + accessor.scheduleFlush(NoOpIOOperationCallbackFactory.INSTANCE.createIoOpCallback()); //component 1 contains 1 and -2 upsertTuple(ctx, fieldSerdes, getValue(1, fieldSerdes)); deleteTuple(ctx, fieldSerdes, getValue(2, fieldSerdes)); - accessor.scheduleFlush(NoOpIOOperationCallbackFactory.INSTANCE.createIoOpCallback(), null); + accessor.scheduleFlush(NoOpIOOperationCallbackFactory.INSTANCE.createIoOpCallback()); //component 0 contains 2 and 3 upsertTuple(ctx, fieldSerdes, getValue(3, fieldSerdes)); upsertTuple(ctx, fieldSerdes, getValue(2, fieldSerdes)); - accessor.scheduleFlush(NoOpIOOperationCallbackFactory.INSTANCE.createIoOpCallback(), null); + accessor.scheduleFlush(NoOpIOOperationCallbackFactory.INSTANCE.createIoOpCallback()); LSMBTree btree = (LSMBTree) ctx.getIndex(); Assert.assertEquals("Check disk components", 3, btree.getDiskComponents().size()); diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeUpdateInPlaceScanDiskComponentsTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeUpdateInPlaceScanDiskComponentsTest.java index a7efd6d..fbcbcc2 100644 --- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeUpdateInPlaceScanDiskComponentsTest.java +++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeUpdateInPlaceScanDiskComponentsTest.java @@ -19,7 +19,7 @@ package org.apache.hyracks.storage.am.lsm.btree; -import static org.junit.Assert.*; +import static org.junit.Assert.fail; import java.io.ByteArrayInputStream; import java.io.DataInput; @@ -352,7 +352,7 @@ op1.performOperation(ctx, AccessMethodTestsConfig.BTREE_NUM_TUPLES_TO_INSERT); op2.performOperation(ctx, AccessMethodTestsConfig.BTREE_NUM_TUPLES_TO_INSERT / AccessMethodTestsConfig.BTREE_NUM_INSERT_ROUNDS); - accessor.scheduleFlush(NoOpIOOperationCallbackFactory.INSTANCE.createIoOpCallback(), null); + accessor.scheduleFlush(NoOpIOOperationCallbackFactory.INSTANCE.createIoOpCallback()); LSMBTree btree = (LSMBTree) ctx.getIndex(); Assert.assertEquals("Check disk components", 1, btree.getDiskComponents().size()); diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeUpdateInPlaceTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeUpdateInPlaceTest.java index adcb3d9..e059faa 100644 --- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeUpdateInPlaceTest.java +++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeUpdateInPlaceTest.java @@ -109,7 +109,7 @@ } if (j == 1) { - lsmAccessor.scheduleFlush(ioOpCallback, null); + lsmAccessor.scheduleFlush(ioOpCallback); ioOpCallback.waitForIO(); isFoundNull = true; isUpdated = false; @@ -124,7 +124,7 @@ } if (j == 1) { - lsmAccessor.scheduleFlush(ioOpCallback, null); + lsmAccessor.scheduleFlush(ioOpCallback); ioOpCallback.waitForIO(); } else { isFoundNull = false; diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtree.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtree.java index 0e16280..6c1a406 100644 --- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtree.java +++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtree.java @@ -103,19 +103,17 @@ } @Override - public ILSMIOOperation scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) + public void scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) throws HyracksDataException { - ILSMIOOperation flushOp = super.scheduleFlush(ctx, callback); + super.scheduleFlush(ctx, callback); numScheduledFlushes++; - return flushOp; } @Override - public ILSMIOOperation scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) + public void scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) throws HyracksDataException { - ILSMIOOperation mergeOp = super.scheduleMerge(ctx, callback); + super.scheduleMerge(ctx, callback); numScheduledMerges++; - return mergeOp; } @Override diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/multithread/LSMBTreeTestWorker.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/multithread/LSMBTreeTestWorker.java index 377ce05..1667e47 100644 --- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/multithread/LSMBTreeTestWorker.java +++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/multithread/LSMBTreeTestWorker.java @@ -119,7 +119,7 @@ case MERGE: accessor.scheduleMerge(NoOpIOOperationCallbackFactory.INSTANCE.createIoOpCallback(), - lsmBTree.getDiskComponents(), null); + lsmBTree.getDiskComponents()); break; default: diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/test/PrefixMergePolicyTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/test/PrefixMergePolicyTest.java index eaa0321..e521b4b 100644 --- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/test/PrefixMergePolicyTest.java +++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/test/PrefixMergePolicyTest.java @@ -221,7 +221,7 @@ return null; } }).when(accessor).scheduleMerge(Mockito.any(ILSMIOOperationCallback.class), - Mockito.anyListOf(ILSMDiskComponent.class), Mockito.any()); + Mockito.anyListOf(ILSMDiskComponent.class)); Mockito.when(index.createAccessor(Mockito.any(IModificationOperationCallback.class), Mockito.any(ISearchOperationCallback.class))).thenReturn(accessor); diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/org/apache/hyracks/storage/am/lsm/invertedindex/LSMInvertedIndexMergeTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/org/apache/hyracks/storage/am/lsm/invertedindex/LSMInvertedIndexMergeTest.java index fd744b0..d093aac 100644 --- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/org/apache/hyracks/storage/am/lsm/invertedindex/LSMInvertedIndexMergeTest.java +++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/org/apache/hyracks/storage/am/lsm/invertedindex/LSMInvertedIndexMergeTest.java @@ -58,7 +58,7 @@ } // Perform merge. invIndexAccessor.scheduleMerge(NoOpIOOperationCallbackFactory.INSTANCE.createIoOpCallback(), - ((LSMInvertedIndex) invIndex).getDiskComponents(), null); + ((LSMInvertedIndex) invIndex).getDiskComponents()); validateAndCheckIndex(testCtx); runTinySearchWorkload(testCtx, tupleGen); } diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/org/apache/hyracks/storage/am/lsm/invertedindex/PartitionedLSMInvertedIndexMergeTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/org/apache/hyracks/storage/am/lsm/invertedindex/PartitionedLSMInvertedIndexMergeTest.java index cfca8f1..3dc7262 100644 --- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/org/apache/hyracks/storage/am/lsm/invertedindex/PartitionedLSMInvertedIndexMergeTest.java +++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/org/apache/hyracks/storage/am/lsm/invertedindex/PartitionedLSMInvertedIndexMergeTest.java @@ -60,7 +60,7 @@ } // Perform merge. invIndexAccessor.scheduleMerge(NoOpIOOperationCallbackFactory.INSTANCE.createIoOpCallback(), - ((LSMInvertedIndex) invIndex).getDiskComponents(), null); + ((LSMInvertedIndex) invIndex).getDiskComponents()); validateAndCheckIndex(testCtx); runTinySearchWorkload(testCtx, tupleGen); } diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/org/apache/hyracks/storage/am/lsm/invertedindex/multithread/LSMInvertedIndexTestWorker.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/org/apache/hyracks/storage/am/lsm/invertedindex/multithread/LSMInvertedIndexTestWorker.java index 594e019..2345698 100644 --- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/org/apache/hyracks/storage/am/lsm/invertedindex/multithread/LSMInvertedIndexTestWorker.java +++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/org/apache/hyracks/storage/am/lsm/invertedindex/multithread/LSMInvertedIndexTestWorker.java @@ -116,7 +116,7 @@ case MERGE: { accessor.scheduleMerge(NoOpIOOperationCallbackFactory.INSTANCE.createIoOpCallback(), - invIndex.getDiskComponents(), null); + invIndex.getDiskComponents()); break; } diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/org/apache/hyracks/storage/am/lsm/rtree/LSMRTreeMergeTestDriver.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/org/apache/hyracks/storage/am/lsm/rtree/LSMRTreeMergeTestDriver.java index 58c71dd..9209a3e 100644 --- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/org/apache/hyracks/storage/am/lsm/rtree/LSMRTreeMergeTestDriver.java +++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/org/apache/hyracks/storage/am/lsm/rtree/LSMRTreeMergeTestDriver.java @@ -78,7 +78,7 @@ ILSMIndexAccessor accessor = (ILSMIndexAccessor) ctx.getIndexAccessor(); accessor.scheduleMerge(NoOpIOOperationCallbackFactory.INSTANCE.createIoOpCallback(), - ((AbstractLSMRTree) ctx.getIndex()).getDiskComponents(), null); + ((AbstractLSMRTree) ctx.getIndex()).getDiskComponents()); rTreeTestUtils.checkScan(ctx); rTreeTestUtils.checkDiskOrderScan(ctx); diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/org/apache/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeTestWorker.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/org/apache/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeTestWorker.java index 4630c28..fe4870b 100644 --- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/org/apache/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeTestWorker.java +++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/org/apache/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeTestWorker.java @@ -79,7 +79,7 @@ case MERGE: accessor.scheduleMerge(NoOpIOOperationCallbackFactory.INSTANCE.createIoOpCallback(), - lsmRTree.getDiskComponents(), null); + lsmRTree.getDiskComponents()); break; default: diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/org/apache/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeWithAntiMatterTuplesTestWorker.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/org/apache/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeWithAntiMatterTuplesTestWorker.java index bbada89..2855f2e 100644 --- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/org/apache/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeWithAntiMatterTuplesTestWorker.java +++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/org/apache/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeWithAntiMatterTuplesTestWorker.java @@ -68,7 +68,7 @@ case MERGE: accessor.scheduleMerge(NoOpIOOperationCallbackFactory.INSTANCE.createIoOpCallback(), - ((AbstractLSMRTree) lsmRTree).getDiskComponents(), null); + ((AbstractLSMRTree) lsmRTree).getDiskComponents()); break; default: -- To view, visit https://asterix-gerrit.ics.uci.edu/2063 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: I670545acd09c678f21be25313353ab306be86202 Gerrit-PatchSet: 3 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Luo Chen <[email protected]> Gerrit-Reviewer: Ian Maxon <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Luo Chen <[email protected]>
