abdullah alamoudi has submitted this change and it was merged. Change subject: Add LSMDiskComponentBulkLoader ......................................................................
Add LSMDiskComponentBulkLoader -Added LSMDiskComponentBulkLoader implementations, which are used to bulk load an LSMDiskComponent with anti-matters -Added LSMDiskComponentWithBuddyBTreeBulkLoader implementations, which are used to bulk load an LSMDiskComponent with deleted-keys btrees -Refactored LSM flush/merge/index bulk load operations to use the LSMDiskComponentBulkLoader Change-Id: I772a6d68761fcbb85982a1c9f72f2d186e1d1ffb Reviewed-on: https://asterix-gerrit.ics.uci.edu/1773 Reviewed-by: abdullah alamoudi <[email protected]> Sonar-Qube: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> BAD: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> --- M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeBulkLoader.java A hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponentBulkLoader.java A hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyDiskComponentBulkLoader.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java A hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponentBulkLoader.java A hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponentWithBuddyBulkLoader.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java A hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDiskComponentBulkLoader.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeBulkLoader.java A hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDiskComponentBulkLoader.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java A hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesDiskComponentBulkLoader.java 16 files changed, 952 insertions(+), 688 deletions(-) Approvals: abdullah alamoudi: Looks good to me, approved Jenkins: Verified; No violations found; No violations found; Verified diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java index 1d8de79..c641dc1 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java @@ -28,12 +28,9 @@ import org.apache.hyracks.api.io.FileReference; import org.apache.hyracks.api.io.IIOManager; import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; -import org.apache.hyracks.storage.am.bloomfilter.impls.BloomCalculations; import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilter; import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilterFactory; -import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilterSpecification; import org.apache.hyracks.storage.am.btree.impls.BTree; -import org.apache.hyracks.storage.am.btree.impls.BTree.BTreeBulkLoader; import org.apache.hyracks.storage.am.common.api.IIndexOperationContext; import org.apache.hyracks.storage.am.common.api.IMetadataPageManager; import org.apache.hyracks.storage.am.common.api.ITreeIndexCursor; @@ -451,13 +448,10 @@ // modifications public class LSMTwoPCBTreeBulkLoader implements IIndexBulkLoader, ITwoPCIndexBulkLoader { private final ILSMDiskComponent component; - private final BTreeBulkLoader bulkLoader; - private final IIndexBulkLoader builder; - private boolean cleanedUpArtifacts = false; - private boolean isEmptyComponent = true; - private boolean endedBloomFilterLoad = false; - private final boolean isTransaction; + private final IIndexBulkLoader componentBulkLoader; private final ITreeIndexTupleWriterFactory frameTupleWriterFactory; + + private final boolean isTransaction; public LSMTwoPCBTreeBulkLoader(float fillFactor, boolean verifyInput, long numElementsHint, boolean isTransaction) throws HyracksDataException { @@ -471,68 +465,23 @@ frameTupleWriterFactory = ((LSMBTreeDiskComponent) component).getBTree().getLeafFrameFactory().getTupleWriterFactory(); - bulkLoader = (BTreeBulkLoader) ((LSMBTreeDiskComponent) component).getBTree().createBulkLoader(fillFactor, - verifyInput, numElementsHint, false); - int maxBucketsPerElement = BloomCalculations.maxBucketsPerElement(numElementsHint); - BloomFilterSpecification bloomFilterSpec = - BloomCalculations.computeBloomSpec(maxBucketsPerElement, bloomFilterFalsePositiveRate); - builder = ((LSMBTreeDiskComponent) component).getBloomFilter().createBuilder(numElementsHint, - bloomFilterSpec.getNumHashes(), bloomFilterSpec.getNumBucketsPerElements()); + componentBulkLoader = + createComponentBulkLoader(component, fillFactor, verifyInput, numElementsHint, false, true); } // It is expected that the mode was set to insert operation before // calling add @Override public void add(ITupleReference tuple) throws HyracksDataException { - try { - bulkLoader.add(tuple); - builder.add(tuple); - } catch (Exception e) { - cleanupArtifacts(); - throw e; - } - if (isEmptyComponent) { - isEmptyComponent = false; - } - } - - // This is made public in case of a failure, it is better to delete all - // created artifacts. - public void cleanupArtifacts() throws HyracksDataException { - if (!cleanedUpArtifacts) { - cleanedUpArtifacts = true; - // We make sure to end the bloom filter load to release latches. - if (!endedBloomFilterLoad) { - builder.end(); - endedBloomFilterLoad = true; - } - try { - ((LSMBTreeDiskComponent) component).getBTree().deactivate(); - } catch (HyracksDataException e) { - // Do nothing.. this could've bee - } - ((LSMBTreeDiskComponent) component).getBTree().destroy(); - try { - ((LSMBTreeDiskComponent) component).getBloomFilter().deactivate(); - } catch (HyracksDataException e) { - // Do nothing.. this could've bee - } - ((LSMBTreeDiskComponent) component).getBloomFilter().destroy(); - } + componentBulkLoader.add(tuple); } @Override public void end() throws HyracksDataException { - if (!cleanedUpArtifacts) { - if (!endedBloomFilterLoad) { - builder.end(); - endedBloomFilterLoad = true; - } - bulkLoader.end(); - if (isEmptyComponent) { - cleanupArtifacts(); - } else if (isTransaction) { + componentBulkLoader.end(); + if (component.getComponentSize() > 0) { + if (isTransaction) { // Since this is a transaction component, validate and // deactivate. it could later be added or deleted markAsValid(component); @@ -551,23 +500,14 @@ @Override public void delete(ITupleReference tuple) throws HyracksDataException { ((LSMBTreeRefrencingTupleWriterFactory) frameTupleWriterFactory).setMode(IndexOperation.DELETE); - try { - bulkLoader.add(tuple); - builder.add(tuple); - } catch (Exception e) { - cleanupArtifacts(); - throw e; - } - if (isEmptyComponent) { - isEmptyComponent = false; - } + componentBulkLoader.add(tuple); ((LSMBTreeRefrencingTupleWriterFactory) frameTupleWriterFactory).setMode(IndexOperation.INSERT); } @Override public void abort() { try { - cleanupArtifacts(); + componentBulkLoader.abort(); } catch (Exception e) { // Do nothing } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java index fed4588..dfa08d6 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java @@ -35,7 +35,6 @@ import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilterFactory; import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilterSpecification; import org.apache.hyracks.storage.am.btree.impls.BTree; -import org.apache.hyracks.storage.am.btree.impls.BTree.BTreeBulkLoader; import org.apache.hyracks.storage.am.btree.impls.RangePredicate; import org.apache.hyracks.storage.am.common.api.IIndexOperationContext; import org.apache.hyracks.storage.am.common.api.IMetadataPageManager; @@ -70,6 +69,7 @@ import org.apache.hyracks.storage.common.IModificationOperationCallback; import org.apache.hyracks.storage.common.ISearchOperationCallback; import org.apache.hyracks.storage.common.ISearchPredicate; +import org.apache.hyracks.storage.common.MultiComparator; import org.apache.hyracks.storage.common.buffercache.IBufferCache; import org.apache.hyracks.storage.common.file.IFileMapProvider; @@ -274,6 +274,26 @@ newerList.add(swapIndex, newComponent); } + @Override + public IIndexBulkLoader createComponentBulkLoader(ILSMDiskComponent component, float fillFactor, + boolean verifyInput, long numElementsHint, boolean checkIfEmptyIndex, boolean withFilter) + throws HyracksDataException { + BloomFilterSpecification bloomFilterSpec = null; + if (numElementsHint > 0) { + int maxBucketsPerElement = BloomCalculations.maxBucketsPerElement(numElementsHint); + bloomFilterSpec = BloomCalculations.computeBloomSpec(maxBucketsPerElement, bloomFilterFalsePositiveRate); + } + if (withFilter && filterFields != null) { + return new LSMBTreeWithBuddyDiskComponentBulkLoader((LSMBTreeWithBuddyDiskComponent) component, + bloomFilterSpec, fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex, filterManager, + treeFields, filterFields, + MultiComparator.create(component.getLSMComponentFilter().getFilterCmpFactories())); + } else { + return new LSMBTreeWithBuddyDiskComponentBulkLoader((LSMBTreeWithBuddyDiskComponent) component, + bloomFilterSpec, fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex); + } + } + // For initial load @Override public IIndexBulkLoader createBulkLoader(float fillLevel, boolean verifyInput, long numElementsHint) @@ -372,6 +392,8 @@ LSMBTreeWithBuddyDiskComponent mergedComponent = createDiskComponent(componentFactory, mergeOp.getTarget(), mergeOp.getBuddyBTreeTarget(), mergeOp.getBloomFilterTarget(), true); + IIndexBulkLoader componentBulkLoader; + // In case we must keep the deleted-keys BuddyBTrees, then they must be // merged *before* merging the b-trees so that // lsmHarness.endSearch() is called once when the b-trees have been @@ -383,46 +405,37 @@ LSMBuddyBTreeMergeCursor buddyBtreeCursor = new LSMBuddyBTreeMergeCursor(opCtx); search(opCtx, buddyBtreeCursor, btreeSearchPred); - BTree buddyBtree = mergedComponent.getBuddyBTree(); - IIndexBulkLoader buddyBtreeBulkLoader = buddyBtree.createBulkLoader(1.0f, true, 0L, false); - long numElements = 0L; for (int i = 0; i < mergeOp.getMergingComponents().size(); ++i) { numElements += ((LSMBTreeWithBuddyDiskComponent) mergeOp.getMergingComponents().get(i)).getBloomFilter() .getNumElements(); } - int maxBucketsPerElement = BloomCalculations.maxBucketsPerElement(numElements); - BloomFilterSpecification bloomFilterSpec = - BloomCalculations.computeBloomSpec(maxBucketsPerElement, bloomFilterFalsePositiveRate); - IIndexBulkLoader builder = mergedComponent.getBloomFilter().createBuilder(numElements, - bloomFilterSpec.getNumHashes(), bloomFilterSpec.getNumBucketsPerElements()); + componentBulkLoader = createComponentBulkLoader(mergedComponent, 1.0f, false, numElements, false, false); try { while (buddyBtreeCursor.hasNext()) { buddyBtreeCursor.next(); ITupleReference tuple = buddyBtreeCursor.getTuple(); - buddyBtreeBulkLoader.add(tuple); - builder.add(tuple); + ((LSMBTreeWithBuddyDiskComponentBulkLoader) componentBulkLoader).delete(tuple); } } finally { buddyBtreeCursor.close(); - builder.end(); } - buddyBtreeBulkLoader.end(); + } else { + componentBulkLoader = createComponentBulkLoader(mergedComponent, 1.0f, false, 0L, false, false); } - IIndexBulkLoader bulkLoader = mergedComponent.getBTree().createBulkLoader(1.0f, false, 0L, false); try { while (cursor.hasNext()) { cursor.next(); ITupleReference frameTuple = cursor.getTuple(); - bulkLoader.add(frameTuple); + componentBulkLoader.add(frameTuple); } } finally { cursor.close(); } - bulkLoader.end(); + componentBulkLoader.end(); return mergedComponent; } @@ -589,12 +602,7 @@ // modifications public class LSMTwoPCBTreeWithBuddyBulkLoader implements IIndexBulkLoader, ITwoPCIndexBulkLoader { private final ILSMDiskComponent component; - private final BTreeBulkLoader btreeBulkLoader; - private final BTreeBulkLoader buddyBtreeBulkLoader; - private final IIndexBulkLoader builder; - private boolean cleanedUpArtifacts = false; - private boolean isEmptyComponent = true; - private boolean endedBloomFilterLoad = false; + private final IIndexBulkLoader componentBulkLoader; private final boolean isTransaction; public LSMTwoPCBTreeWithBuddyBulkLoader(float fillFactor, boolean verifyInput, long numElementsHint, @@ -607,69 +615,20 @@ component = createBulkLoadTarget(); } - // Create the three loaders - btreeBulkLoader = (BTreeBulkLoader) ((LSMBTreeWithBuddyDiskComponent) component).getBTree() - .createBulkLoader(fillFactor, verifyInput, numElementsHint, false); - buddyBtreeBulkLoader = (BTreeBulkLoader) ((LSMBTreeWithBuddyDiskComponent) component).getBuddyBTree() - .createBulkLoader(fillFactor, verifyInput, numElementsHint, false); - int maxBucketsPerElement = BloomCalculations.maxBucketsPerElement(numElementsHint); - BloomFilterSpecification bloomFilterSpec = - BloomCalculations.computeBloomSpec(maxBucketsPerElement, bloomFilterFalsePositiveRate); - builder = ((LSMBTreeWithBuddyDiskComponent) component).getBloomFilter().createBuilder(numElementsHint, - bloomFilterSpec.getNumHashes(), bloomFilterSpec.getNumBucketsPerElements()); + componentBulkLoader = + createComponentBulkLoader(component, fillFactor, verifyInput, numElementsHint, false, true); } @Override public void add(ITupleReference tuple) throws HyracksDataException { - try { - btreeBulkLoader.add(tuple); - } catch (Exception e) { - cleanupArtifacts(); - throw e; - } - if (isEmptyComponent) { - isEmptyComponent = false; - } - } - - // This is made public in case of a failure, it is better to delete all - // created artifacts. - public void cleanupArtifacts() throws HyracksDataException { - if (!cleanedUpArtifacts) { - cleanedUpArtifacts = true; - try { - ((LSMBTreeWithBuddyDiskComponent) component).getBTree().deactivate(); - } catch (Exception e) { - - } - ((LSMBTreeWithBuddyDiskComponent) component).getBTree().destroy(); - try { - ((LSMBTreeWithBuddyDiskComponent) component).getBuddyBTree().deactivate(); - } catch (Exception e) { - - } - ((LSMBTreeWithBuddyDiskComponent) component).getBuddyBTree().destroy(); - try { - ((LSMBTreeWithBuddyDiskComponent) component).getBloomFilter().deactivate(); - } catch (Exception e) { - - } - ((LSMBTreeWithBuddyDiskComponent) component).getBloomFilter().destroy(); - } + componentBulkLoader.add(tuple); } @Override public void end() throws HyracksDataException { - if (!cleanedUpArtifacts) { - if (!endedBloomFilterLoad) { - builder.end(); - endedBloomFilterLoad = true; - } - btreeBulkLoader.end(); - buddyBtreeBulkLoader.end(); - if (isEmptyComponent) { - cleanupArtifacts(); - } else if (isTransaction) { + componentBulkLoader.end(); + if (component.getComponentSize() > 0) { + if (isTransaction) { // Since this is a transaction component, validate and // deactivate. it could later be added or deleted markAsValid(component); @@ -687,22 +646,13 @@ @Override public void delete(ITupleReference tuple) throws HyracksDataException { - try { - buddyBtreeBulkLoader.add(tuple); - builder.add(tuple); - } catch (Exception e) { - cleanupArtifacts(); - throw e; - } - if (isEmptyComponent) { - isEmptyComponent = false; - } + ((LSMBTreeWithBuddyDiskComponentBulkLoader) componentBulkLoader).delete(tuple); } @Override public void abort() { try { - cleanupArtifacts(); + componentBulkLoader.abort(); } catch (Exception e) { } } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java index 4cd4dc6..a4c67c2 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java @@ -73,6 +73,7 @@ import org.apache.hyracks.storage.common.IModificationOperationCallback; import org.apache.hyracks.storage.common.ISearchOperationCallback; import org.apache.hyracks.storage.common.ISearchPredicate; +import org.apache.hyracks.storage.common.MultiComparator; import org.apache.hyracks.storage.common.buffercache.IBufferCache; import org.apache.hyracks.storage.common.file.IFileMapProvider; @@ -292,12 +293,10 @@ RangePredicate nullPred = new RangePredicate(null, null, true, true, null, null); long numElements = 0L; - BloomFilterSpecification bloomFilterSpec = null; if (hasBloomFilter) { //count elements in btree for creating Bloomfilter IIndexCursor countingCursor = ((BTreeAccessor) accessor).createCountingSearchCursor(); accessor.search(countingCursor, nullPred); - try { while (countingCursor.hasNext()) { countingCursor.next(); @@ -307,35 +306,23 @@ } finally { countingCursor.close(); } - - int maxBucketsPerElement = BloomCalculations.maxBucketsPerElement(numElements); - bloomFilterSpec = BloomCalculations.computeBloomSpec(maxBucketsPerElement, bloomFilterFalsePositiveRate); } LSMBTreeDiskComponent component = createDiskComponent(componentFactory, flushOp.getTarget(), flushOp.getBloomFilterTarget(), true); - IIndexBulkLoader bulkLoader = component.getBTree().createBulkLoader(1.0f, false, numElements, false); - IIndexBulkLoader builder = null; - if (hasBloomFilter) { - builder = component.getBloomFilter().createBuilder(numElements, bloomFilterSpec.getNumHashes(), - bloomFilterSpec.getNumBucketsPerElements()); - } + + IIndexBulkLoader componentBulkLoader = + createComponentBulkLoader(component, 1.0f, false, numElements, false, false); IIndexCursor scanCursor = accessor.createSearchCursor(false); accessor.search(scanCursor, nullPred); try { while (scanCursor.hasNext()) { scanCursor.next(); - if (hasBloomFilter) { - builder.add(scanCursor.getTuple()); - } - bulkLoader.add(scanCursor.getTuple()); + componentBulkLoader.add(scanCursor.getTuple()); } } finally { scanCursor.close(); - if (hasBloomFilter) { - builder.end(); - } } if (component.getLSMComponentFilter() != null) { @@ -353,7 +340,8 @@ // TODO This code should be in the callback and not in the index flushingComponent.getMetadata().copy(component.getMetadata()); - bulkLoader.end(); + componentBulkLoader.end(); + return component; } @@ -368,38 +356,25 @@ List<ILSMComponent> mergedComponents = mergeOp.getMergingComponents(); long numElements = 0L; - BloomFilterSpecification bloomFilterSpec = null; if (hasBloomFilter) { //count elements in btree for creating Bloomfilter for (int i = 0; i < mergedComponents.size(); ++i) { numElements += ((LSMBTreeDiskComponent) mergedComponents.get(i)).getBloomFilter().getNumElements(); } - int maxBucketsPerElement = BloomCalculations.maxBucketsPerElement(numElements); - bloomFilterSpec = BloomCalculations.computeBloomSpec(maxBucketsPerElement, bloomFilterFalsePositiveRate); } LSMBTreeDiskComponent mergedComponent = createDiskComponent(componentFactory, mergeOp.getTarget(), mergeOp.getBloomFilterTarget(), true); - IIndexBulkLoader bulkLoader = mergedComponent.getBTree().createBulkLoader(1.0f, false, numElements, false); - IIndexBulkLoader builder = null; - if (hasBloomFilter) { - builder = mergedComponent.getBloomFilter().createBuilder(numElements, bloomFilterSpec.getNumHashes(), - bloomFilterSpec.getNumBucketsPerElements()); - } + IIndexBulkLoader componentBulkLoader = + createComponentBulkLoader(mergedComponent, 1.0f, false, numElements, false, false); try { while (cursor.hasNext()) { cursor.next(); ITupleReference frameTuple = cursor.getTuple(); - if (hasBloomFilter) { - builder.add(frameTuple); - } - bulkLoader.add(frameTuple); + componentBulkLoader.add(frameTuple); } } finally { cursor.close(); - if (hasBloomFilter) { - builder.end(); - } } if (mergedComponent.getLSMComponentFilter() != null) { List<ITupleReference> filterTuples = new ArrayList<>(); @@ -410,7 +385,9 @@ getFilterManager().updateFilter(mergedComponent.getLSMComponentFilter(), filterTuples); getFilterManager().writeFilter(mergedComponent.getLSMComponentFilter(), mergedComponent.getBTree()); } - bulkLoader.end(); + + componentBulkLoader.end(); + return mergedComponent; } @@ -438,6 +415,27 @@ } @Override + public IIndexBulkLoader createComponentBulkLoader(ILSMDiskComponent component, float fillFactor, + boolean verifyInput, long numElementsHint, boolean checkIfEmptyIndex, boolean withFilter) + throws HyracksDataException { + BloomFilterSpecification bloomFilterSpec = null; + if (hasBloomFilter) { + int maxBucketsPerElement = BloomCalculations.maxBucketsPerElement(numElementsHint); + bloomFilterSpec = BloomCalculations.computeBloomSpec(maxBucketsPerElement, bloomFilterFalsePositiveRate); + } + + if (withFilter && filterFields != null) { + return new LSMBTreeDiskComponentBulkLoader((LSMBTreeDiskComponent) component, bloomFilterSpec, fillFactor, + verifyInput, numElementsHint, checkIfEmptyIndex, filterManager, treeFields, filterFields, + MultiComparator.create(component.getLSMComponentFilter().getFilterCmpFactories())); + } else { + return new LSMBTreeDiskComponentBulkLoader((LSMBTreeDiskComponent) component, bloomFilterSpec, fillFactor, + verifyInput, numElementsHint, checkIfEmptyIndex); + } + + } + + @Override public IIndexBulkLoader createBulkLoader(float fillLevel, boolean verifyInput, long numElementsHint) throws HyracksDataException { return new LSMBTreeBulkLoader(this, fillLevel, verifyInput, numElementsHint); diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeBulkLoader.java index 247b3de..239eba2 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeBulkLoader.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeBulkLoader.java @@ -20,135 +20,43 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; -import org.apache.hyracks.storage.am.bloomfilter.impls.BloomCalculations; -import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilterSpecification; -import org.apache.hyracks.storage.am.btree.impls.BTree.BTreeBulkLoader; -import org.apache.hyracks.storage.am.common.tuples.PermutingTupleReference; import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType; import org.apache.hyracks.storage.common.IIndexBulkLoader; -import org.apache.hyracks.storage.common.MultiComparator; public class LSMBTreeBulkLoader implements IIndexBulkLoader { private final LSMBTree lsmIndex; private final ILSMDiskComponent component; - private final BTreeBulkLoader bulkLoader; - private final IIndexBulkLoader builder; - private boolean cleanedUpArtifacts = false; - private boolean isEmptyComponent = true; - private boolean endedBloomFilterLoad = false; - public final PermutingTupleReference indexTuple; - public final PermutingTupleReference filterTuple; - public final MultiComparator filterCmp; + private final IIndexBulkLoader componentBulkLoader; public LSMBTreeBulkLoader(LSMBTree lsmIndex, float fillFactor, boolean verifyInput, long numElementsHint) throws HyracksDataException { this.lsmIndex = lsmIndex; - component = lsmIndex.createBulkLoadTarget(); - bulkLoader = (BTreeBulkLoader) ((LSMBTreeDiskComponent) component).getBTree().createBulkLoader(fillFactor, - verifyInput, numElementsHint, false); - - if (lsmIndex.hasBloomFilter()) { - int maxBucketsPerElement = BloomCalculations.maxBucketsPerElement(numElementsHint); - BloomFilterSpecification bloomFilterSpec = - BloomCalculations.computeBloomSpec(maxBucketsPerElement, lsmIndex.bloomFilterFalsePositiveRate()); - builder = ((LSMBTreeDiskComponent) component).getBloomFilter().createBuilder(numElementsHint, - bloomFilterSpec.getNumHashes(), bloomFilterSpec.getNumBucketsPerElements()); - } else { - builder = null; - } - - if (lsmIndex.getFilterFields() != null) { - indexTuple = new PermutingTupleReference(lsmIndex.getTreeFields()); - filterCmp = MultiComparator.create(component.getLSMComponentFilter().getFilterCmpFactories()); - filterTuple = new PermutingTupleReference(lsmIndex.getFilterFields()); - } else { - indexTuple = null; - filterCmp = null; - filterTuple = null; - } + this.component = lsmIndex.createBulkLoadTarget(); + this.componentBulkLoader = + lsmIndex.createComponentBulkLoader(component, fillFactor, verifyInput, numElementsHint, false, true); } @Override public void add(ITupleReference tuple) throws HyracksDataException { - try { - ITupleReference t; - if (indexTuple != null) { - indexTuple.reset(tuple); - t = indexTuple; - } else { - t = tuple; - } - - bulkLoader.add(t); - if (lsmIndex.hasBloomFilter()) { - builder.add(t); - } - - if (filterTuple != null) { - filterTuple.reset(tuple); - component.getLSMComponentFilter().update(filterTuple, filterCmp); - } - } catch (Exception e) { - cleanupArtifacts(); - throw e; - } - if (isEmptyComponent) { - isEmptyComponent = false; - } - } - - private void cleanupArtifacts() throws HyracksDataException { - if (!cleanedUpArtifacts) { - cleanedUpArtifacts = true; - if (lsmIndex.hasBloomFilter() && !endedBloomFilterLoad) { - builder.abort(); - endedBloomFilterLoad = true; - } - ((LSMBTreeDiskComponent) component).getBTree().deactivate(); - ((LSMBTreeDiskComponent) component).getBTree().destroy(); - if (lsmIndex.hasBloomFilter()) { - ((LSMBTreeDiskComponent) component).getBloomFilter().deactivate(); - ((LSMBTreeDiskComponent) component).getBloomFilter().destroy(); - } - } + componentBulkLoader.add(tuple); } @Override public void end() throws HyracksDataException { - if (!cleanedUpArtifacts) { - if (lsmIndex.hasBloomFilter() && !endedBloomFilterLoad) { - builder.end(); - endedBloomFilterLoad = true; - } - - if (component.getLSMComponentFilter() != null) { - lsmIndex.getFilterManager().writeFilter(component.getLSMComponentFilter(), - ((LSMBTreeDiskComponent) component).getBTree()); - } - bulkLoader.end(); - - if (isEmptyComponent) { - cleanupArtifacts(); - } else { - //TODO(amoudi): Ensure Bulk load follow the same lifecycle Other Operations (Flush, Merge, etc). - //then after operation should be called from harness as well - //https://issues.apache.org/jira/browse/ASTERIXDB-1764 - lsmIndex.getIOOperationCallback().afterOperation(LSMOperationType.FLUSH, null, component); - lsmIndex.getLsmHarness().addBulkLoadedComponent(component); - } + componentBulkLoader.end(); + if (component.getComponentSize() > 0) { + //TODO(amoudi): Ensure Bulk load follow the same lifecycle Other Operations (Flush, Merge, etc). + //then after operation should be called from harness as well + //https://issues.apache.org/jira/browse/ASTERIXDB-1764 + lsmIndex.getIOOperationCallback().afterOperation(LSMOperationType.FLUSH, null, component); + lsmIndex.getLsmHarness().addBulkLoadedComponent(component); } } @Override public void abort() throws HyracksDataException { - if (bulkLoader != null) { - bulkLoader.abort(); - } - - if (builder != null) { - builder.abort(); - } - + componentBulkLoader.end(); } + } \ No newline at end of file diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponentBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponentBulkLoader.java new file mode 100644 index 0000000..a5720de --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponentBulkLoader.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.storage.am.lsm.btree.impls; + +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilter; +import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilterSpecification; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilterManager; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; +import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMDiskComponentBulkLoader; +import org.apache.hyracks.storage.common.IIndex; +import org.apache.hyracks.storage.common.MultiComparator; + +public class LSMBTreeDiskComponentBulkLoader extends AbstractLSMDiskComponentBulkLoader { + + //with filter + public LSMBTreeDiskComponentBulkLoader(LSMBTreeDiskComponent component, BloomFilterSpecification bloomFilterSpec, + float fillFactor, boolean verifyInput, long numElementsHint, boolean checkIfEmptyIndex, + ILSMComponentFilterManager filterManager, int[] indexFields, int[] filterFields, MultiComparator filterCmp) + throws HyracksDataException { + super(component, bloomFilterSpec, fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex, filterManager, + indexFields, filterFields, filterCmp); + } + + //without filter + public LSMBTreeDiskComponentBulkLoader(LSMBTreeDiskComponent component, BloomFilterSpecification bloomFilterSpec, + float fillFactor, boolean verifyInput, long numElementsHint, boolean checkIfEmptyIndex) + throws HyracksDataException { + super(component, bloomFilterSpec, fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex, null, null, null, + null); + } + + @Override + protected BloomFilter getBloomFilter(ILSMDiskComponent component) { + return ((LSMBTreeDiskComponent) component).getBloomFilter(); + } + + @Override + protected IIndex getIndex(ILSMDiskComponent component) { + return ((LSMBTreeDiskComponent) component).getBTree(); + } + +} diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyDiskComponentBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyDiskComponentBulkLoader.java new file mode 100644 index 0000000..e4ab6d4 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyDiskComponentBulkLoader.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.storage.am.lsm.btree.impls; + +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilter; +import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilterSpecification; +import org.apache.hyracks.storage.am.common.api.ITreeIndex; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilterManager; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; +import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMDiskComponentWithBuddyBulkLoader; +import org.apache.hyracks.storage.common.IIndex; +import org.apache.hyracks.storage.common.MultiComparator; + +public class LSMBTreeWithBuddyDiskComponentBulkLoader extends AbstractLSMDiskComponentWithBuddyBulkLoader { + + //with filter + public LSMBTreeWithBuddyDiskComponentBulkLoader(LSMBTreeWithBuddyDiskComponent component, + BloomFilterSpecification bloomFilterSpec, float fillFactor, boolean verifyInput, long numElementsHint, + boolean checkIfEmptyIndex, ILSMComponentFilterManager filterManager, int[] indexFields, int[] filterFields, + MultiComparator filterCmp) throws HyracksDataException { + super(component, bloomFilterSpec, fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex, filterManager, + indexFields, filterFields, filterCmp); + } + + //without filter + public LSMBTreeWithBuddyDiskComponentBulkLoader(LSMBTreeWithBuddyDiskComponent component, + BloomFilterSpecification bloomFilterSpec, float fillFactor, boolean verifyInput, long numElementsHint, + boolean checkIfEmptyIndex) throws HyracksDataException { + super(component, bloomFilterSpec, fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex, null, null, null, + null); + } + + @Override + protected BloomFilter getBloomFilter(ILSMDiskComponent component) { + return ((LSMBTreeWithBuddyDiskComponent) component).getBloomFilter(); + } + + @Override + protected IIndex getIndex(ILSMDiskComponent component) { + return ((LSMBTreeWithBuddyDiskComponent) component).getBTree(); + } + + @Override + protected ITreeIndex getBuddyBTree(ILSMDiskComponent component) { + return ((LSMBTreeWithBuddyDiskComponent) component).getBuddyBTree(); + } + +} diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java index 2a9186b..5a6f391 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java @@ -27,6 +27,7 @@ import org.apache.hyracks.storage.am.common.api.IIndexOperationContext; import org.apache.hyracks.storage.am.lsm.common.impls.LSMHarness; import org.apache.hyracks.storage.common.IIndex; +import org.apache.hyracks.storage.common.IIndexBulkLoader; import org.apache.hyracks.storage.common.IIndexCursor; import org.apache.hyracks.storage.common.IModificationOperationCallback; import org.apache.hyracks.storage.common.ISearchOperationCallback; @@ -139,4 +140,20 @@ * @throws HyracksDataException */ void updateFilter(ILSMIndexOperationContext ictx, ITupleReference tuple) throws HyracksDataException; + + /** + * Create a component bulk loader for the given component + * + * @param component + * @param fillFactor + * @param verifyInput + * @param numElementsHint + * @param checkIfEmptyIndex + * @param withFilter + * @return + * @throws HyracksDataException + */ + IIndexBulkLoader createComponentBulkLoader(ILSMDiskComponent component, float fillFactor, boolean verifyInput, + long numElementsHint, boolean checkIfEmptyIndex, boolean withFilter) throws HyracksDataException; + } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponentBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponentBulkLoader.java new file mode 100644 index 0000000..964893a --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponentBulkLoader.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.storage.am.lsm.common.impls; + +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; +import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilter; +import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilterSpecification; +import org.apache.hyracks.storage.am.common.api.ITreeIndex; +import org.apache.hyracks.storage.am.common.tuples.PermutingTupleReference; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilterManager; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; +import org.apache.hyracks.storage.common.IIndex; +import org.apache.hyracks.storage.common.IIndexBulkLoader; +import org.apache.hyracks.storage.common.MultiComparator; + +public abstract class AbstractLSMDiskComponentBulkLoader implements IIndexBulkLoader { + protected final ILSMDiskComponent component; + + protected final IIndexBulkLoader indexBulkLoader; + protected final IIndexBulkLoader bloomFilterBuilder; + + protected final ILSMComponentFilterManager filterManager; + protected final PermutingTupleReference indexTuple; + protected final PermutingTupleReference filterTuple; + protected final MultiComparator filterCmp; + + protected boolean cleanedUpArtifacts = false; + protected boolean isEmptyComponent = true; + protected boolean endedBloomFilterLoad = false; + + //with filter + public AbstractLSMDiskComponentBulkLoader(ILSMDiskComponent component, BloomFilterSpecification bloomFilterSpec, + float fillFactor, boolean verifyInput, long numElementsHint, boolean checkIfEmptyIndex, + ILSMComponentFilterManager filterManager, int[] indexFields, int[] filterFields, MultiComparator filterCmp) + throws HyracksDataException { + this.component = component; + this.indexBulkLoader = + getIndex(component).createBulkLoader(fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex); + if (bloomFilterSpec != null) { + this.bloomFilterBuilder = getBloomFilter(component).createBuilder(numElementsHint, + bloomFilterSpec.getNumHashes(), bloomFilterSpec.getNumBucketsPerElements()); + } else { + this.bloomFilterBuilder = null; + } + if (filterManager != null) { + this.filterManager = filterManager; + this.indexTuple = new PermutingTupleReference(indexFields); + this.filterTuple = new PermutingTupleReference(filterFields); + this.filterCmp = filterCmp; + } else { + this.filterManager = null; + this.indexTuple = null; + this.filterTuple = null; + this.filterCmp = null; + } + } + + @Override + public void add(ITupleReference tuple) throws HyracksDataException { + try { + ITupleReference t; + if (indexTuple != null) { + indexTuple.reset(tuple); + t = indexTuple; + } else { + t = tuple; + } + + indexBulkLoader.add(t); + if (bloomFilterBuilder != null) { + bloomFilterBuilder.add(t); + } + + if (filterTuple != null) { + filterTuple.reset(tuple); + component.getLSMComponentFilter().update(filterTuple, filterCmp); + } + } catch (Exception e) { + cleanupArtifacts(); + throw e; + } + if (isEmptyComponent) { + isEmptyComponent = false; + } + } + + @Override + public void abort() throws HyracksDataException { + if (indexBulkLoader != null) { + indexBulkLoader.abort(); + } + if (bloomFilterBuilder != null) { + bloomFilterBuilder.abort(); + } + + } + + @Override + public void end() throws HyracksDataException { + if (!cleanedUpArtifacts) { + if (bloomFilterBuilder != null && !endedBloomFilterLoad) { + bloomFilterBuilder.end(); + endedBloomFilterLoad = true; + } + + //use filter + if (filterManager != null && component.getLSMComponentFilter() != null) { + filterManager.writeFilter(component.getLSMComponentFilter(), getTreeIndex(component)); + } + indexBulkLoader.end(); + + if (isEmptyComponent) { + cleanupArtifacts(); + } + } + } + + protected void cleanupArtifacts() throws HyracksDataException { + if (!cleanedUpArtifacts) { + cleanedUpArtifacts = true; + if (bloomFilterBuilder != null && !endedBloomFilterLoad) { + bloomFilterBuilder.abort(); + endedBloomFilterLoad = true; + } + getIndex(component).deactivate(); + getIndex(component).destroy(); + if (bloomFilterBuilder != null) { + getBloomFilter(component).deactivate(); + getBloomFilter(component).destroy(); + } + } + } + + /** + * TreeIndex is used to hold the filter tuple values + * + * @param component + * @return + */ + protected ITreeIndex getTreeIndex(ILSMDiskComponent component) { + return (ITreeIndex) getIndex(component); + } + + protected abstract IIndex getIndex(ILSMDiskComponent component); + + protected abstract BloomFilter getBloomFilter(ILSMDiskComponent component); + +} diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponentWithBuddyBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponentWithBuddyBulkLoader.java new file mode 100644 index 0000000..453d6cf --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponentWithBuddyBulkLoader.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.storage.am.lsm.common.impls; + +import org.apache.hyracks.api.exceptions.ErrorCode; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; +import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilterSpecification; +import org.apache.hyracks.storage.am.common.api.ITreeIndex; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilterManager; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; +import org.apache.hyracks.storage.common.IIndexBulkLoader; +import org.apache.hyracks.storage.common.MultiComparator; + +public abstract class AbstractLSMDiskComponentWithBuddyBulkLoader extends AbstractLSMDiskComponentBulkLoader { + + protected final IIndexBulkLoader buddyBTreeBulkLoader; + + //with filter + public AbstractLSMDiskComponentWithBuddyBulkLoader(ILSMDiskComponent component, + BloomFilterSpecification bloomFilterSpec, float fillFactor, boolean verifyInput, long numElementsHint, + boolean checkIfEmptyIndex, ILSMComponentFilterManager filterManager, int[] indexFields, int[] filterFields, + MultiComparator filterCmp) throws HyracksDataException { + super(component, bloomFilterSpec, fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex, filterManager, + indexFields, filterFields, filterCmp); + + // BuddyBTree must be created even if it could be empty, + // since without it the component is not considered as valid. + buddyBTreeBulkLoader = + getBuddyBTree(component).createBulkLoader(fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex); + } + + @Override + public void add(ITupleReference tuple) throws HyracksDataException { + try { + ITupleReference t; + if (indexTuple != null) { + indexTuple.reset(tuple); + t = indexTuple; + } else { + t = tuple; + } + + indexBulkLoader.add(t); + + if (filterTuple != null) { + filterTuple.reset(tuple); + component.getLSMComponentFilter().update(filterTuple, filterCmp); + } + } catch (Exception e) { + cleanupArtifacts(); + throw e; + } + if (isEmptyComponent) { + isEmptyComponent = false; + } + } + + public void delete(ITupleReference tuple) throws HyracksDataException { + try { + buddyBTreeBulkLoader.add(tuple); + if (bloomFilterBuilder != null) { + bloomFilterBuilder.add(tuple); + } + } catch (HyracksDataException e) { + //deleting a key multiple times is OK + if (e.getErrorCode() != ErrorCode.DUPLICATE_KEY) { + cleanupArtifacts(); + throw e; + } + } catch (Exception e) { + cleanupArtifacts(); + throw e; + } + if (isEmptyComponent) { + isEmptyComponent = false; + } + } + + @Override + public void abort() throws HyracksDataException { + super.abort(); + if (buddyBTreeBulkLoader != null) { + buddyBTreeBulkLoader.abort(); + } + } + + @Override + public void end() throws HyracksDataException { + if (!cleanedUpArtifacts) { + if (bloomFilterBuilder != null && !endedBloomFilterLoad) { + bloomFilterBuilder.end(); + endedBloomFilterLoad = true; + } + + //use filter + if (filterManager != null && component.getLSMComponentFilter() != null) { + filterManager.writeFilter(component.getLSMComponentFilter(), getTreeIndex(component)); + } + indexBulkLoader.end(); + buddyBTreeBulkLoader.end(); + + if (isEmptyComponent) { + cleanupArtifacts(); + } + } + } + + @Override + protected void cleanupArtifacts() throws HyracksDataException { + if (!cleanedUpArtifacts) { + cleanedUpArtifacts = true; + if (bloomFilterBuilder != null && !endedBloomFilterLoad) { + bloomFilterBuilder.abort(); + endedBloomFilterLoad = true; + } + getIndex(component).deactivate(); + getIndex(component).destroy(); + + getBuddyBTree(component).deactivate(); + getBuddyBTree(component).destroy(); + + if (bloomFilterBuilder != null) { + getBloomFilter(component).deactivate(); + getBloomFilter(component).destroy(); + } + } + } + + protected abstract ITreeIndex getBuddyBTree(ILSMDiskComponent component); + +} diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java index 2363c43..f827b21 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java @@ -339,39 +339,14 @@ // Create an inverted index instance to be bulk loaded. LSMInvertedIndexDiskComponent component = createDiskInvIndexComponent(componentFactory, flushOp.getTarget(), flushOp.getDeletedKeysBTreeTarget(), flushOp.getBloomFilterTarget(), true); - IInvertedIndex diskInvertedIndex = component.getInvIndex(); // Create a scan cursor on the BTree underlying the in-memory inverted index. LSMInvertedIndexMemoryComponent flushingComponent = (LSMInvertedIndexMemoryComponent) flushOp.getFlushingComponent(); - InMemoryInvertedIndexAccessor memInvIndexAccessor = (InMemoryInvertedIndexAccessor) flushingComponent - .getInvIndex().createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); - BTreeAccessor memBTreeAccessor = memInvIndexAccessor.getBTreeAccessor(); + RangePredicate nullPred = new RangePredicate(null, null, true, true, null, null); - IIndexCursor scanCursor = memBTreeAccessor.createSearchCursor(false); - memBTreeAccessor.search(scanCursor, nullPred); - // Bulk load the disk inverted index from the in-memory inverted index. - IIndexBulkLoader invIndexBulkLoader = diskInvertedIndex.createBulkLoader(1.0f, false, 0L, false); - try { - while (scanCursor.hasNext()) { - scanCursor.next(); - invIndexBulkLoader.add(scanCursor.getTuple()); - } - } finally { - scanCursor.close(); - } - if (component.getLSMComponentFilter() != null) { - List<ITupleReference> filterTuples = new ArrayList<>(); - filterTuples.add(flushingComponent.getLSMComponentFilter().getMinTuple()); - filterTuples.add(flushingComponent.getLSMComponentFilter().getMaxTuple()); - getFilterManager().updateFilter(component.getLSMComponentFilter(), filterTuples); - getFilterManager().writeFilter(component.getLSMComponentFilter(), - ((OnDiskInvertedIndex) component.getInvIndex()).getBTree()); - } - flushingComponent.getMetadata().copy(component.getMetadata()); - invIndexBulkLoader.end(); - + // Search the deleted keys BTree to calculate the number of elements for BloomFilter IIndexAccessor deletedKeysBTreeAccessor = flushingComponent.getDeletedKeysBTree() .createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); IIndexCursor btreeCountingCursor = ((BTreeAccessor) deletedKeysBTreeAccessor).createCountingSearchCursor(); @@ -387,33 +362,50 @@ btreeCountingCursor.close(); } - int maxBucketsPerElement = BloomCalculations.maxBucketsPerElement(numBTreeTuples); - BloomFilterSpecification bloomFilterSpec = - BloomCalculations.computeBloomSpec(maxBucketsPerElement, bloomFilterFalsePositiveRate); - - // Create an BTree instance for the deleted keys. - BTree diskDeletedKeysBTree = component.getDeletedKeysBTree(); + IIndexBulkLoader componentBulkLoader = + createComponentBulkLoader(component, 1.0f, false, numBTreeTuples, false, false); // Create a scan cursor on the deleted keys BTree underlying the in-memory inverted index. IIndexCursor deletedKeysScanCursor = deletedKeysBTreeAccessor.createSearchCursor(false); deletedKeysBTreeAccessor.search(deletedKeysScanCursor, nullPred); - // Bulk load the deleted-keys BTree. - IIndexBulkLoader deletedKeysBTreeBulkLoader = diskDeletedKeysBTree.createBulkLoader(1.0f, false, 0L, false); - IIndexBulkLoader builder = component.getBloomFilter().createBuilder(numBTreeTuples, - bloomFilterSpec.getNumHashes(), bloomFilterSpec.getNumBucketsPerElements()); - try { while (deletedKeysScanCursor.hasNext()) { deletedKeysScanCursor.next(); - deletedKeysBTreeBulkLoader.add(deletedKeysScanCursor.getTuple()); - builder.add(deletedKeysScanCursor.getTuple()); + ((LSMInvertedIndexDiskComponentBulkLoader) componentBulkLoader) + .delete(deletedKeysScanCursor.getTuple()); } } finally { deletedKeysScanCursor.close(); - builder.end(); } - deletedKeysBTreeBulkLoader.end(); + + // Scan the in-memory inverted index + InMemoryInvertedIndexAccessor memInvIndexAccessor = (InMemoryInvertedIndexAccessor) flushingComponent + .getInvIndex().createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); + BTreeAccessor memBTreeAccessor = memInvIndexAccessor.getBTreeAccessor(); + IIndexCursor scanCursor = memBTreeAccessor.createSearchCursor(false); + memBTreeAccessor.search(scanCursor, nullPred); + + // Bulk load the disk inverted index from the in-memory inverted index. + try { + while (scanCursor.hasNext()) { + scanCursor.next(); + componentBulkLoader.add(scanCursor.getTuple()); + } + } finally { + scanCursor.close(); + } + if (component.getLSMComponentFilter() != null) { + List<ITupleReference> filterTuples = new ArrayList<>(); + filterTuples.add(flushingComponent.getLSMComponentFilter().getMinTuple()); + filterTuples.add(flushingComponent.getLSMComponentFilter().getMaxTuple()); + filterManager.updateFilter(component.getLSMComponentFilter(), filterTuples); + filterManager.writeFilter(component.getLSMComponentFilter(), + ((OnDiskInvertedIndex) component.getInvIndex()).getBTree()); + } + flushingComponent.getMetadata().copy(component.getMetadata()); + + componentBulkLoader.end(); return component; } @@ -433,7 +425,7 @@ LSMInvertedIndexDiskComponent component = createDiskInvIndexComponent(componentFactory, mergeOp.getTarget(), mergeOp.getDeletedKeysBTreeTarget(), mergeOp.getBloomFilterTarget(), true); - IInvertedIndex mergedDiskInvertedIndex = component.getInvIndex(); + IIndexBulkLoader componentBulkLoader; // In case we must keep the deleted-keys BTrees, then they must be merged *before* merging the inverted indexes so that // lsmHarness.endSearch() is called once when the inverted indexes have been merged. @@ -445,44 +437,31 @@ new LSMInvertedIndexDeletedKeysBTreeMergeCursor(opCtx); search(opCtx, btreeCursor, mergePred); - BTree btree = component.getDeletedKeysBTree(); - IIndexBulkLoader btreeBulkLoader = btree.createBulkLoader(1.0f, true, 0L, false); - long numElements = 0L; for (int i = 0; i < mergeOp.getMergingComponents().size(); ++i) { numElements += ((LSMInvertedIndexDiskComponent) mergeOp.getMergingComponents().get(i)).getBloomFilter() .getNumElements(); } - int maxBucketsPerElement = BloomCalculations.maxBucketsPerElement(numElements); - BloomFilterSpecification bloomFilterSpec = - BloomCalculations.computeBloomSpec(maxBucketsPerElement, bloomFilterFalsePositiveRate); - IIndexBulkLoader builder = component.getBloomFilter().createBuilder(numElements, - bloomFilterSpec.getNumHashes(), bloomFilterSpec.getNumBucketsPerElements()); + componentBulkLoader = createComponentBulkLoader(component, 1.0f, false, numElements, false, false); try { while (btreeCursor.hasNext()) { btreeCursor.next(); ITupleReference tuple = btreeCursor.getTuple(); - btreeBulkLoader.add(tuple); - builder.add(tuple); + ((LSMInvertedIndexDiskComponentBulkLoader) componentBulkLoader).delete(tuple); } } finally { btreeCursor.close(); - builder.end(); } - btreeBulkLoader.end(); } else { - BTree btree = component.getDeletedKeysBTree(); - IIndexBulkLoader btreeBulkLoader = btree.createBulkLoader(1.0f, true, 0L, false); - btreeBulkLoader.end(); + componentBulkLoader = createComponentBulkLoader(component, 1.0f, false, 0L, false, false); } - IIndexBulkLoader invIndexBulkLoader = mergedDiskInvertedIndex.createBulkLoader(1.0f, true, 0L, false); try { while (cursor.hasNext()) { cursor.next(); ITupleReference tuple = cursor.getTuple(); - invIndexBulkLoader.add(tuple); + componentBulkLoader.add(tuple); } } finally { cursor.close(); @@ -503,9 +482,30 @@ getFilterManager().writeFilter(component.getLSMComponentFilter(), ((OnDiskInvertedIndex) component.getInvIndex()).getBTree()); } - invIndexBulkLoader.end(); + + componentBulkLoader.end(); return component; + } + + @Override + public IIndexBulkLoader createComponentBulkLoader(ILSMDiskComponent component, float fillFactor, + boolean verifyInput, long numElementsHint, boolean checkIfEmptyIndex, boolean withFilter) + throws HyracksDataException { + BloomFilterSpecification bloomFilterSpec = null; + if (numElementsHint > 0) { + int maxBucketsPerElement = BloomCalculations.maxBucketsPerElement(numElementsHint); + bloomFilterSpec = BloomCalculations.computeBloomSpec(maxBucketsPerElement, bloomFilterFalsePositiveRate); + } + if (withFilter && filterFields != null) { + return new LSMInvertedIndexDiskComponentBulkLoader((LSMInvertedIndexDiskComponent) component, + bloomFilterSpec, fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex, filterManager, + treeFields, filterFields, + MultiComparator.create(component.getLSMComponentFilter().getFilterCmpFactories())); + } else { + return new LSMInvertedIndexDiskComponentBulkLoader((LSMInvertedIndexDiskComponent) component, + bloomFilterSpec, fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex); + } } @Override @@ -516,105 +516,35 @@ public class LSMInvertedIndexBulkLoader implements IIndexBulkLoader { private final ILSMDiskComponent component; - private final IIndexBulkLoader invIndexBulkLoader; - private final IIndexBulkLoader deletedKeysBTreeBulkLoader; - private boolean cleanedUpArtifacts = false; - private boolean isEmptyComponent = true; - public final PermutingTupleReference indexTuple; - public final PermutingTupleReference filterTuple; - public final MultiComparator filterCmp; + private final IIndexBulkLoader componentBulkLoader; public LSMInvertedIndexBulkLoader(float fillFactor, boolean verifyInput, long numElementsHint) throws HyracksDataException { // Note that by using a flush target file name, we state that the // new bulk loaded tree is "newer" than any other merged tree. component = createBulkLoadTarget(); - invIndexBulkLoader = ((LSMInvertedIndexDiskComponent) component).getInvIndex().createBulkLoader(fillFactor, - verifyInput, numElementsHint, false); - //validity of the component depends on the deleted keys file being there even if it's empty. - deletedKeysBTreeBulkLoader = ((LSMInvertedIndexDiskComponent) component).getDeletedKeysBTree() - .createBulkLoader(fillFactor, verifyInput, numElementsHint, false); - - if (getFilterFields() != null) { - indexTuple = new PermutingTupleReference(getTreeFields()); - filterCmp = MultiComparator.create(component.getLSMComponentFilter().getFilterCmpFactories()); - filterTuple = new PermutingTupleReference(getFilterFields()); - } else { - indexTuple = null; - filterCmp = null; - filterTuple = null; - } + componentBulkLoader = + createComponentBulkLoader(component, fillFactor, verifyInput, numElementsHint, false, true); } @Override public void add(ITupleReference tuple) throws HyracksDataException { - try { - ITupleReference t; - if (indexTuple != null) { - indexTuple.reset(tuple); - t = indexTuple; - } else { - t = tuple; - } - - invIndexBulkLoader.add(t); - - if (filterTuple != null) { - filterTuple.reset(tuple); - component.getLSMComponentFilter().update(filterTuple, filterCmp); - } - - } catch (Exception e) { - cleanupArtifacts(); - throw e; - } - if (isEmptyComponent) { - isEmptyComponent = false; - } - } - - protected void cleanupArtifacts() throws HyracksDataException { - if (!cleanedUpArtifacts) { - cleanedUpArtifacts = true; - ((LSMInvertedIndexDiskComponent) component).getInvIndex().deactivate(); - ((LSMInvertedIndexDiskComponent) component).getInvIndex().destroy(); - ((LSMInvertedIndexDiskComponent) component).getDeletedKeysBTree().deactivate(); - ((LSMInvertedIndexDiskComponent) component).getDeletedKeysBTree().destroy(); - ((LSMInvertedIndexDiskComponent) component).getBloomFilter().deactivate(); - ((LSMInvertedIndexDiskComponent) component).getBloomFilter().destroy(); - } + componentBulkLoader.add(tuple); } @Override public void end() throws HyracksDataException { - if (!cleanedUpArtifacts) { - if (component.getLSMComponentFilter() != null) { - getFilterManager().writeFilter(component.getLSMComponentFilter(), - ((OnDiskInvertedIndex) ((LSMInvertedIndexDiskComponent) component).getInvIndex()) - .getBTree()); - } - invIndexBulkLoader.end(); - deletedKeysBTreeBulkLoader.end(); - - if (isEmptyComponent) { - cleanupArtifacts(); - } else { - ioOpCallback.afterOperation(LSMOperationType.FLUSH, null, component); - getLsmHarness().addBulkLoadedComponent(component); - } + componentBulkLoader.end(); + if (component.getComponentSize() > 0) { + ioOpCallback.afterOperation(LSMOperationType.FLUSH, null, component); + lsmHarness.addBulkLoadedComponent(component); } } @Override public void abort() throws HyracksDataException { - if (invIndexBulkLoader != null) { - invIndexBulkLoader.abort(); - } - - if (deletedKeysBTreeBulkLoader != null) { - deletedKeysBTreeBulkLoader.abort(); - } + componentBulkLoader.abort(); } private ILSMDiskComponent createBulkLoadTarget() throws HyracksDataException { diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDiskComponentBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDiskComponentBulkLoader.java new file mode 100644 index 0000000..38e1c42 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDiskComponentBulkLoader.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.storage.am.lsm.invertedindex.impls; + +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilter; +import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilterSpecification; +import org.apache.hyracks.storage.am.common.api.ITreeIndex; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilterManager; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; +import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMDiskComponentWithBuddyBulkLoader; +import org.apache.hyracks.storage.am.lsm.invertedindex.ondisk.OnDiskInvertedIndex; +import org.apache.hyracks.storage.common.IIndex; +import org.apache.hyracks.storage.common.MultiComparator; + +public class LSMInvertedIndexDiskComponentBulkLoader extends AbstractLSMDiskComponentWithBuddyBulkLoader { + + //with filter + public LSMInvertedIndexDiskComponentBulkLoader(LSMInvertedIndexDiskComponent component, + BloomFilterSpecification bloomFilterSpec, float fillFactor, boolean verifyInput, long numElementsHint, + boolean checkIfEmptyIndex, ILSMComponentFilterManager filterManager, int[] indexFields, int[] filterFields, + MultiComparator filterCmp) throws HyracksDataException { + super(component, bloomFilterSpec, fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex, filterManager, + indexFields, filterFields, filterCmp); + } + + //without filter + public LSMInvertedIndexDiskComponentBulkLoader(LSMInvertedIndexDiskComponent component, + BloomFilterSpecification bloomFilterSpec, float fillFactor, boolean verifyInput, long numElementsHint, + boolean checkIfEmptyIndex) throws HyracksDataException { + super(component, bloomFilterSpec, fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex, null, null, null, + null); + } + + @Override + protected BloomFilter getBloomFilter(ILSMDiskComponent component) { + return ((LSMInvertedIndexDiskComponent) component).getBloomFilter(); + } + + @Override + protected IIndex getIndex(ILSMDiskComponent component) { + return ((LSMInvertedIndexDiskComponent) component).getInvIndex(); + } + + @Override + protected ITreeIndex getTreeIndex(ILSMDiskComponent component) { + return ((OnDiskInvertedIndex) ((LSMInvertedIndexDiskComponent) component).getInvIndex()).getBTree(); + } + + @Override + protected ITreeIndex getBuddyBTree(ILSMDiskComponent component) { + return ((LSMInvertedIndexDiskComponent) component).getDeletedKeysBTree(); + } + +} diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java index 13ff420..3618737 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java @@ -71,6 +71,7 @@ import org.apache.hyracks.storage.common.IModificationOperationCallback; import org.apache.hyracks.storage.common.ISearchOperationCallback; import org.apache.hyracks.storage.common.ISearchPredicate; +import org.apache.hyracks.storage.common.MultiComparator; import org.apache.hyracks.storage.common.file.IFileMapProvider; public class LSMRTree extends AbstractLSMRTree { @@ -173,49 +174,11 @@ RTreeSearchCursor rtreeScanCursor = (RTreeSearchCursor) memRTreeAccessor.createSearchCursor(false); SearchPredicate rtreeNullPredicate = new SearchPredicate(null, null); memRTreeAccessor.search(rtreeScanCursor, rtreeNullPredicate); + LSMRTreeDiskComponent component = createDiskComponent(componentFactory, flushOp.getTarget(), flushOp.getBTreeTarget(), flushOp.getBloomFilterTarget(), true); - RTree diskRTree = component.getRTree(); - IIndexBulkLoader rTreeBulkloader; - ITreeIndexCursor cursor; - IBinaryComparatorFactory[] linearizerArray = { linearizer }; - - TreeTupleSorter rTreeTupleSorter = new TreeTupleSorter(flushingComponent.getRTree().getFileId(), - linearizerArray, rtreeLeafFrameFactory.createFrame(), rtreeLeafFrameFactory.createFrame(), - flushingComponent.getRTree().getBufferCache(), comparatorFields); - // BulkLoad the tuples from the in-memory tree into the new disk - // RTree. - - boolean isEmpty = true; - try { - while (rtreeScanCursor.hasNext()) { - isEmpty = false; - rtreeScanCursor.next(); - rTreeTupleSorter.insertTupleEntry(rtreeScanCursor.getPageId(), rtreeScanCursor.getTupleOffset()); - } - } finally { - rtreeScanCursor.close(); - } - rTreeTupleSorter.sort(); - - rTreeBulkloader = diskRTree.createBulkLoader(1.0f, false, 0L, false); - cursor = rTreeTupleSorter; - - if (!isEmpty) { - try { - while (cursor.hasNext()) { - cursor.next(); - ITupleReference frameTuple = cursor.getTuple(); - rTreeBulkloader.add(frameTuple); - } - } finally { - cursor.close(); - } - } - - rTreeBulkloader.end(); - + //count the number of tuples in the buddy btree ITreeIndexAccessor memBTreeAccessor = flushingComponent.getBTree() .createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); RangePredicate btreeNullPredicate = new RangePredicate(null, null, true, true, null, null); @@ -232,29 +195,55 @@ btreeCountingCursor.close(); } - int maxBucketsPerElement = BloomCalculations.maxBucketsPerElement(numBTreeTuples); - BloomFilterSpecification bloomFilterSpec = - BloomCalculations.computeBloomSpec(maxBucketsPerElement, bloomFilterFalsePositiveRate); + IIndexBulkLoader componentBulkLoader = + createComponentBulkLoader(component, 1.0f, false, numBTreeTuples, false, false); + ITreeIndexCursor cursor; + IBinaryComparatorFactory[] linearizerArray = { linearizer }; + + TreeTupleSorter rTreeTupleSorter = new TreeTupleSorter(flushingComponent.getRTree().getFileId(), + linearizerArray, rtreeLeafFrameFactory.createFrame(), rtreeLeafFrameFactory.createFrame(), + flushingComponent.getRTree().getBufferCache(), comparatorFields); + + // BulkLoad the tuples from the in-memory tree into the new disk + // RTree. + boolean isEmpty = true; + try { + while (rtreeScanCursor.hasNext()) { + isEmpty = false; + rtreeScanCursor.next(); + rTreeTupleSorter.insertTupleEntry(rtreeScanCursor.getPageId(), rtreeScanCursor.getTupleOffset()); + } + } finally { + rtreeScanCursor.close(); + } + rTreeTupleSorter.sort(); + + cursor = rTreeTupleSorter; + + if (!isEmpty) { + try { + while (cursor.hasNext()) { + cursor.next(); + ITupleReference frameTuple = cursor.getTuple(); + componentBulkLoader.add(frameTuple); + } + } finally { + cursor.close(); + } + } + + // scan the memory BTree IIndexCursor btreeScanCursor = memBTreeAccessor.createSearchCursor(false); memBTreeAccessor.search(btreeScanCursor, btreeNullPredicate); - BTree diskBTree = component.getBTree(); - - // BulkLoad the tuples from the in-memory tree into the new disk BTree. - IIndexBulkLoader bTreeBulkloader = diskBTree.createBulkLoader(1.0f, false, numBTreeTuples, false); - IIndexBulkLoader builder = component.getBloomFilter().createBuilder(numBTreeTuples, - bloomFilterSpec.getNumHashes(), bloomFilterSpec.getNumBucketsPerElements()); - // scan the memory BTree try { while (btreeScanCursor.hasNext()) { btreeScanCursor.next(); ITupleReference frameTuple = btreeScanCursor.getTuple(); - bTreeBulkloader.add(frameTuple); - builder.add(frameTuple); + ((LSMRTreeDiskComponentBulkLoader) componentBulkLoader).delete(frameTuple); } } finally { btreeScanCursor.close(); - builder.end(); } if (component.getLSMComponentFilter() != null) { @@ -266,7 +255,8 @@ } // Note. If we change the filter to write to metadata object, we don't need the if block above flushingComponent.getMetadata().copy(component.getMetadata()); - bTreeBulkloader.end(); + + componentBulkLoader.end(); return component; } @@ -282,40 +272,46 @@ LSMRTreeDiskComponent mergedComponent = createDiskComponent(componentFactory, mergeOp.getTarget(), mergeOp.getBTreeTarget(), mergeOp.getBloomFilterTarget(), true); + IIndexBulkLoader componentBulkLoader; + // In case we must keep the deleted-keys BTrees, then they must be merged *before* merging the r-trees so that // lsmHarness.endSearch() is called once when the r-trees have been merged. - BTree btree = mergedComponent.getBTree(); - IIndexBulkLoader btreeBulkLoader = btree.createBulkLoader(1.0f, true, 0L, false); if (mergeOp.getMergingComponents().get(mergeOp.getMergingComponents().size() - 1) != diskComponents .get(diskComponents.size() - 1)) { // Keep the deleted tuples since the oldest disk component is not included in the merge operation - - LSMRTreeDeletedKeysBTreeMergeCursor btreeCursor = new LSMRTreeDeletedKeysBTreeMergeCursor(opCtx); - search(opCtx, btreeCursor, rtreeSearchPred); long numElements = 0L; for (int i = 0; i < mergeOp.getMergingComponents().size(); ++i) { numElements += ((LSMRTreeDiskComponent) mergeOp.getMergingComponents().get(i)).getBloomFilter() .getNumElements(); } + componentBulkLoader = createComponentBulkLoader(mergedComponent, 1.0f, false, numElements, false, false); - int maxBucketsPerElement = BloomCalculations.maxBucketsPerElement(numElements); - BloomFilterSpecification bloomFilterSpec = - BloomCalculations.computeBloomSpec(maxBucketsPerElement, bloomFilterFalsePositiveRate); - IIndexBulkLoader builder = mergedComponent.getBloomFilter().createBuilder(numElements, - bloomFilterSpec.getNumHashes(), bloomFilterSpec.getNumBucketsPerElements()); - + LSMRTreeDeletedKeysBTreeMergeCursor btreeCursor = new LSMRTreeDeletedKeysBTreeMergeCursor(opCtx); + search(opCtx, btreeCursor, rtreeSearchPred); try { while (btreeCursor.hasNext()) { btreeCursor.next(); ITupleReference tuple = btreeCursor.getTuple(); - btreeBulkLoader.add(tuple); - builder.add(tuple); + ((LSMRTreeDiskComponentBulkLoader) componentBulkLoader).delete(tuple); } } finally { btreeCursor.close(); - builder.end(); } + } else { + //no buddy-btree needed + componentBulkLoader = createComponentBulkLoader(mergedComponent, 1.0f, false, 0L, false, false); + } + + //search old rtree components + try { + while (cursor.hasNext()) { + cursor.next(); + ITupleReference frameTuple = cursor.getTuple(); + componentBulkLoader.add(frameTuple); + } + } finally { + cursor.close(); } if (mergedComponent.getLSMComponentFilter() != null) { @@ -327,19 +323,8 @@ getFilterManager().updateFilter(mergedComponent.getLSMComponentFilter(), filterTuples); getFilterManager().writeFilter(mergedComponent.getLSMComponentFilter(), mergedComponent.getRTree()); } - btreeBulkLoader.end(); - IIndexBulkLoader bulkLoader = mergedComponent.getRTree().createBulkLoader(1.0f, false, 0L, false); - try { - while (cursor.hasNext()) { - cursor.next(); - ITupleReference frameTuple = cursor.getTuple(); - bulkLoader.add(frameTuple); - } - } finally { - cursor.close(); - } - bulkLoader.end(); + componentBulkLoader.end(); return mergedComponent; } @@ -358,6 +343,25 @@ } @Override + public IIndexBulkLoader createComponentBulkLoader(ILSMDiskComponent component, float fillFactor, + boolean verifyInput, long numElementsHint, boolean checkIfEmptyIndex, boolean withFilter) + throws HyracksDataException { + BloomFilterSpecification bloomFilterSpec = null; + if (numElementsHint > 0) { + int maxBucketsPerElement = BloomCalculations.maxBucketsPerElement(numElementsHint); + bloomFilterSpec = BloomCalculations.computeBloomSpec(maxBucketsPerElement, bloomFilterFalsePositiveRate); + } + if (withFilter && filterFields != null) { + return new LSMRTreeDiskComponentBulkLoader((LSMRTreeDiskComponent) component, bloomFilterSpec, fillFactor, + verifyInput, numElementsHint, checkIfEmptyIndex, filterManager, treeFields, filterFields, + MultiComparator.create(component.getLSMComponentFilter().getFilterCmpFactories())); + } else { + return new LSMRTreeDiskComponentBulkLoader((LSMRTreeDiskComponent) component, bloomFilterSpec, fillFactor, + verifyInput, numElementsHint, checkIfEmptyIndex); + } + } + + @Override public IIndexBulkLoader createBulkLoader(float fillLevel, boolean verifyInput, long numElementsHint) throws HyracksDataException { return new LSMRTreeBulkLoader(this, fillLevel, verifyInput, numElementsHint); diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeBulkLoader.java index edc3e7d..fbdd37a 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeBulkLoader.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeBulkLoader.java @@ -20,110 +20,41 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; -import org.apache.hyracks.storage.am.common.tuples.PermutingTupleReference; import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType; import org.apache.hyracks.storage.common.IIndexBulkLoader; -import org.apache.hyracks.storage.common.MultiComparator; public class LSMRTreeBulkLoader implements IIndexBulkLoader { private final ILSMDiskComponent component; - private final IIndexBulkLoader bulkLoader; - private final IIndexBulkLoader buddyBTreeBulkloader; - private boolean cleanedUpArtifacts = false; - private boolean isEmptyComponent = true; - public final PermutingTupleReference indexTuple; - public final PermutingTupleReference filterTuple; - public final MultiComparator filterCmp; private final LSMRTree lsmIndex; + private final IIndexBulkLoader componentBulkLoader; public LSMRTreeBulkLoader(LSMRTree lsmIndex, float fillFactor, boolean verifyInput, long numElementsHint) throws HyracksDataException { this.lsmIndex = lsmIndex; // Note that by using a flush target file name, we state that the // new bulk loaded tree is "newer" than any other merged tree. - component = lsmIndex.createBulkLoadTarget(); - bulkLoader = ((LSMRTreeDiskComponent) component).getRTree().createBulkLoader(fillFactor, verifyInput, - numElementsHint, false); - buddyBTreeBulkloader = ((LSMRTreeDiskComponent) component).getBTree().createBulkLoader(fillFactor, verifyInput, - numElementsHint, false); - if (lsmIndex.getFilterFields() != null) { - indexTuple = new PermutingTupleReference(lsmIndex.getTreeFields()); - filterCmp = MultiComparator.create(component.getLSMComponentFilter().getFilterCmpFactories()); - filterTuple = new PermutingTupleReference(lsmIndex.getFilterFields()); - } else { - indexTuple = null; - filterCmp = null; - filterTuple = null; - } + this.component = lsmIndex.createBulkLoadTarget(); + this.componentBulkLoader = + lsmIndex.createComponentBulkLoader(component, fillFactor, verifyInput, numElementsHint, false, true); } @Override public void add(ITupleReference tuple) throws HyracksDataException { - try { - ITupleReference t; - if (indexTuple != null) { - indexTuple.reset(tuple); - t = indexTuple; - } else { - t = tuple; - } - - bulkLoader.add(t); - - if (filterTuple != null) { - filterTuple.reset(tuple); - component.getLSMComponentFilter().update(filterTuple, filterCmp); - } - } catch (Exception e) { - cleanupArtifacts(); - throw e; - } - if (isEmptyComponent) { - isEmptyComponent = false; - } + componentBulkLoader.add(tuple); } @Override public void end() throws HyracksDataException { - if (!cleanedUpArtifacts) { - - if (component.getLSMComponentFilter() != null) { - lsmIndex.getFilterManager().writeFilter(component.getLSMComponentFilter(), - ((LSMRTreeDiskComponent) component).getRTree()); - } - - bulkLoader.end(); - buddyBTreeBulkloader.end(); - - if (isEmptyComponent) { - cleanupArtifacts(); - } else { - lsmIndex.getIOOperationCallback().afterOperation(LSMOperationType.FLUSH, null, component); - lsmIndex.getLsmHarness().addBulkLoadedComponent(component); - } + componentBulkLoader.end(); + if (component.getComponentSize() > 0) { + lsmIndex.getIOOperationCallback().afterOperation(LSMOperationType.FLUSH, null, component); + lsmIndex.getLsmHarness().addBulkLoadedComponent(component); } } @Override public void abort() throws HyracksDataException { - if (bulkLoader != null) { - bulkLoader.abort(); - } - if (buddyBTreeBulkloader != null) { - buddyBTreeBulkloader.abort(); - } - } - - protected void cleanupArtifacts() throws HyracksDataException { - if (!cleanedUpArtifacts) { - cleanedUpArtifacts = true; - ((LSMRTreeDiskComponent) component).getRTree().deactivate(); - ((LSMRTreeDiskComponent) component).getRTree().destroy(); - ((LSMRTreeDiskComponent) component).getBTree().deactivate(); - ((LSMRTreeDiskComponent) component).getBTree().destroy(); - ((LSMRTreeDiskComponent) component).getBloomFilter().deactivate(); - ((LSMRTreeDiskComponent) component).getBloomFilter().destroy(); - } + componentBulkLoader.abort(); } } \ No newline at end of file diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDiskComponentBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDiskComponentBulkLoader.java new file mode 100644 index 0000000..ff0a299 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDiskComponentBulkLoader.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.storage.am.lsm.rtree.impls; + +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilter; +import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilterSpecification; +import org.apache.hyracks.storage.am.common.api.ITreeIndex; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilterManager; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; +import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMDiskComponentWithBuddyBulkLoader; +import org.apache.hyracks.storage.common.IIndex; +import org.apache.hyracks.storage.common.MultiComparator; + +public class LSMRTreeDiskComponentBulkLoader extends AbstractLSMDiskComponentWithBuddyBulkLoader { + + //with filter + public LSMRTreeDiskComponentBulkLoader(LSMRTreeDiskComponent component, BloomFilterSpecification bloomFilterSpec, + float fillFactor, boolean verifyInput, long numElementsHint, boolean checkIfEmptyIndex, + ILSMComponentFilterManager filterManager, int[] indexFields, int[] filterFields, MultiComparator filterCmp) + throws HyracksDataException { + super(component, bloomFilterSpec, fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex, filterManager, + indexFields, filterFields, filterCmp); + } + + //without filter + public LSMRTreeDiskComponentBulkLoader(LSMRTreeDiskComponent component, BloomFilterSpecification bloomFilterSpec, + float fillFactor, boolean verifyInput, long numElementsHint, boolean checkIfEmptyIndex) + throws HyracksDataException { + super(component, bloomFilterSpec, fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex, null, null, null, + null); + } + + @Override + protected BloomFilter getBloomFilter(ILSMDiskComponent component) { + return ((LSMRTreeDiskComponent) component).getBloomFilter(); + } + + @Override + protected IIndex getIndex(ILSMDiskComponent component) { + return ((LSMRTreeDiskComponent) component).getRTree(); + } + + @Override + protected ITreeIndex getBuddyBTree(ILSMDiskComponent component) { + return ((LSMRTreeDiskComponent) component).getBTree(); + } + +} diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java index a20e6f2..24c46d7 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java @@ -35,7 +35,6 @@ import org.apache.hyracks.storage.am.common.api.ITreeIndexCursor; import org.apache.hyracks.storage.am.common.api.ITreeIndexFrameFactory; import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback; -import org.apache.hyracks.storage.am.common.tuples.PermutingTupleReference; import org.apache.hyracks.storage.am.lsm.common.api.IComponentFilterHelper; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilterFrameFactory; @@ -136,26 +135,13 @@ SearchPredicate rtreeNullPredicate = new SearchPredicate(null, null); memRTreeAccessor.search(rtreeScanCursor, rtreeNullPredicate); LSMRTreeDiskComponent component = createDiskComponent(componentFactory, flushOp.getTarget(), null, null, true); - RTree diskRTree = component.getRTree(); - - // scan the memory BTree - ITreeIndexAccessor memBTreeAccessor = flushingComponent.getBTree() - .createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); - BTreeRangeSearchCursor btreeScanCursor = (BTreeRangeSearchCursor) memBTreeAccessor.createSearchCursor(false); - RangePredicate btreeNullPredicate = new RangePredicate(null, null, true, true, null, null); - memBTreeAccessor.search(btreeScanCursor, btreeNullPredicate); + IIndexBulkLoader componentBulkLoader = createComponentBulkLoader(component, 1.0f, false, 0L, false, false); // Since the LSM-RTree is used as a secondary assumption, the // primary key will be the last comparator in the BTree comparators TreeTupleSorter rTreeTupleSorter = new TreeTupleSorter(flushingComponent.getRTree().getFileId(), linearizerArray, rtreeLeafFrameFactory.createFrame(), rtreeLeafFrameFactory.createFrame(), flushingComponent.getRTree().getBufferCache(), comparatorFields); - - TreeTupleSorter bTreeTupleSorter = new TreeTupleSorter(flushingComponent.getBTree().getFileId(), - linearizerArray, btreeLeafFrameFactory.createFrame(), btreeLeafFrameFactory.createFrame(), - flushingComponent.getBTree().getBufferCache(), comparatorFields); - // BulkLoad the tuples from the in-memory tree into the new disk - // RTree. boolean isEmpty = true; try { @@ -171,6 +157,16 @@ rTreeTupleSorter.sort(); } + // scan the memory BTree + ITreeIndexAccessor memBTreeAccessor = flushingComponent.getBTree() + .createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); + BTreeRangeSearchCursor btreeScanCursor = (BTreeRangeSearchCursor) memBTreeAccessor.createSearchCursor(false); + RangePredicate btreeNullPredicate = new RangePredicate(null, null, true, true, null, null); + memBTreeAccessor.search(btreeScanCursor, btreeNullPredicate); + TreeTupleSorter bTreeTupleSorter = new TreeTupleSorter(flushingComponent.getBTree().getFileId(), + linearizerArray, btreeLeafFrameFactory.createFrame(), btreeLeafFrameFactory.createFrame(), + flushingComponent.getBTree().getBufferCache(), comparatorFields); + isEmpty = true; try { while (btreeScanCursor.hasNext()) { @@ -185,7 +181,6 @@ bTreeTupleSorter.sort(); } - IIndexBulkLoader rTreeBulkloader = diskRTree.createBulkLoader(1.0f, false, 0L, false); LSMRTreeWithAntiMatterTuplesFlushCursor cursor = new LSMRTreeWithAntiMatterTuplesFlushCursor(rTreeTupleSorter, bTreeTupleSorter, comparatorFields, linearizerArray); cursor.open(null, null); @@ -195,7 +190,7 @@ cursor.next(); ITupleReference frameTuple = cursor.getTuple(); - rTreeBulkloader.add(frameTuple); + componentBulkLoader.add(frameTuple); } } finally { cursor.close(); @@ -209,8 +204,8 @@ getFilterManager().writeFilter(component.getLSMComponentFilter(), component.getRTree()); } flushingComponent.getMetadata().copy(component.getMetadata()); - rTreeBulkloader.end(); + componentBulkLoader.end(); return component; } @@ -225,13 +220,13 @@ // Bulk load the tuples from all on-disk RTrees into the new RTree. LSMRTreeDiskComponent component = createDiskComponent(componentFactory, mergeOp.getTarget(), null, null, true); - RTree mergedRTree = component.getRTree(); - IIndexBulkLoader bulkloader = mergedRTree.createBulkLoader(1.0f, false, 0L, false); + + IIndexBulkLoader componentBulkLoader = createComponentBulkLoader(component, 1.0f, false, 0L, false, false); try { while (cursor.hasNext()) { cursor.next(); ITupleReference frameTuple = cursor.getTuple(); - bulkloader.add(frameTuple); + componentBulkLoader.add(frameTuple); } } finally { cursor.close(); @@ -245,7 +240,8 @@ getFilterManager().updateFilter(component.getLSMComponentFilter(), filterTuples); getFilterManager().writeFilter(component.getLSMComponentFilter(), component.getRTree()); } - bulkloader.end(); + + componentBulkLoader.end(); return component; } @@ -258,6 +254,20 @@ } @Override + public IIndexBulkLoader createComponentBulkLoader(ILSMDiskComponent component, float fillFactor, + boolean verifyInput, long numElementsHint, boolean checkIfEmptyIndex, boolean withFilter) + throws HyracksDataException { + if (withFilter && filterFields != null) { + return new LSMRTreeWithAntiMatterTuplesDiskComponentBulkLoader((LSMRTreeDiskComponent) component, null, + fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex, filterManager, treeFields, + filterFields, MultiComparator.create(component.getLSMComponentFilter().getFilterCmpFactories())); + } else { + return new LSMRTreeWithAntiMatterTuplesDiskComponentBulkLoader((LSMRTreeDiskComponent) component, null, + fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex); + } + } + + @Override public IIndexBulkLoader createBulkLoader(float fillLevel, boolean verifyInput, long numElementsHint) throws HyracksDataException { return new LSMRTreeWithAntiMatterTuplesBulkLoader(fillLevel, verifyInput, numElementsHint); @@ -265,91 +275,35 @@ public class LSMRTreeWithAntiMatterTuplesBulkLoader implements IIndexBulkLoader { private final ILSMDiskComponent component; - private final IIndexBulkLoader bulkLoader; - private boolean cleanedUpArtifacts = false; - private boolean isEmptyComponent = true; - public final PermutingTupleReference indexTuple; - public final PermutingTupleReference filterTuple; - public final MultiComparator filterCmp; + private final IIndexBulkLoader componentBulkLoader; public LSMRTreeWithAntiMatterTuplesBulkLoader(float fillFactor, boolean verifyInput, long numElementsHint) throws HyracksDataException { - // Note that by using a flush target file name, we state that the - // new bulk loaded tree is "newer" than any other merged tree. - component = createBulkLoadTarget(); - bulkLoader = ((LSMRTreeDiskComponent) component).getRTree().createBulkLoader(fillFactor, verifyInput, - numElementsHint, false); - if (getFilterFields() != null) { - indexTuple = new PermutingTupleReference(getTreeFields()); - filterCmp = MultiComparator.create(component.getLSMComponentFilter().getFilterCmpFactories()); - filterTuple = new PermutingTupleReference(getFilterFields()); - } else { - indexTuple = null; - filterCmp = null; - filterTuple = null; - } + componentBulkLoader = + createComponentBulkLoader(component, fillFactor, verifyInput, numElementsHint, false, true); } @Override public void add(ITupleReference tuple) throws HyracksDataException { - try { - ITupleReference t; - if (indexTuple != null) { - indexTuple.reset(tuple); - t = indexTuple; - } else { - t = tuple; - } - - bulkLoader.add(t); - - if (filterTuple != null) { - filterTuple.reset(tuple); - component.getLSMComponentFilter().update(filterTuple, filterCmp); - } - - } catch (Exception e) { - cleanupArtifacts(); - throw e; - } - if (isEmptyComponent) { - isEmptyComponent = false; - } + componentBulkLoader.add(tuple); } @Override public void end() throws HyracksDataException { - if (!cleanedUpArtifacts) { - if (component.getLSMComponentFilter() != null) { - getFilterManager().writeFilter(component.getLSMComponentFilter(), - ((LSMRTreeDiskComponent) component).getRTree()); - } - bulkLoader.end(); - - if (isEmptyComponent) { - cleanupArtifacts(); - } else { - ioOpCallback.afterOperation(LSMOperationType.FLUSH, null, component); - getLsmHarness().addBulkLoadedComponent(component); - } + componentBulkLoader.end(); + if (component.getComponentSize() > 0) { + ioOpCallback.afterOperation(LSMOperationType.FLUSH, null, component); + lsmHarness.addBulkLoadedComponent(component); } } @Override public void abort() throws HyracksDataException { - if (bulkLoader != null) { - bulkLoader.abort(); - } - } - - protected void cleanupArtifacts() throws HyracksDataException { - if (!cleanedUpArtifacts) { - cleanedUpArtifacts = true; - ((LSMRTreeDiskComponent) component).getRTree().deactivate(); - ((LSMRTreeDiskComponent) component).getRTree().destroy(); + if (componentBulkLoader != null) { + componentBulkLoader.abort(); } } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesDiskComponentBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesDiskComponentBulkLoader.java new file mode 100644 index 0000000..88a2e1e --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesDiskComponentBulkLoader.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.storage.am.lsm.rtree.impls; + +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilter; +import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilterSpecification; +import org.apache.hyracks.storage.am.common.api.ITreeIndex; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilterManager; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; +import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMDiskComponentBulkLoader; +import org.apache.hyracks.storage.common.MultiComparator; + +public class LSMRTreeWithAntiMatterTuplesDiskComponentBulkLoader extends AbstractLSMDiskComponentBulkLoader { + + //with filter + public LSMRTreeWithAntiMatterTuplesDiskComponentBulkLoader(LSMRTreeDiskComponent component, + BloomFilterSpecification bloomFilterSpec, float fillFactor, boolean verifyInput, long numElementsHint, + boolean checkIfEmptyIndex, ILSMComponentFilterManager filterManager, int[] indexFields, int[] filterFields, + MultiComparator filterCmp) throws HyracksDataException { + super(component, bloomFilterSpec, fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex, filterManager, + indexFields, filterFields, filterCmp); + } + + //without filter + public LSMRTreeWithAntiMatterTuplesDiskComponentBulkLoader(LSMRTreeDiskComponent component, + BloomFilterSpecification bloomFilterSpec, float fillFactor, boolean verifyInput, long numElementsHint, + boolean checkIfEmptyIndex) throws HyracksDataException { + super(component, bloomFilterSpec, fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex, null, null, null, + null); + } + + @Override + protected BloomFilter getBloomFilter(ILSMDiskComponent component) { + return ((LSMRTreeDiskComponent) component).getBloomFilter(); + } + + @Override + protected ITreeIndex getIndex(ILSMDiskComponent component) { + return ((LSMRTreeDiskComponent) component).getRTree(); + } + +} -- To view, visit https://asterix-gerrit.ics.uci.edu/1773 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: I772a6d68761fcbb85982a1c9f72f2d186e1d1ffb Gerrit-PatchSet: 13 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Luo Chen <[email protected]> Gerrit-Reviewer: Ian Maxon <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Jianfeng Jia <[email protected]> Gerrit-Reviewer: Luo Chen <[email protected]> Gerrit-Reviewer: Till Westmann <[email protected]> Gerrit-Reviewer: Yingyi Bu <[email protected]> Gerrit-Reviewer: abdullah alamoudi <[email protected]>
