Luo Chen has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/1773
Change subject: Add LSM Disk Component Bulk Loader
......................................................................
Add LSM Disk Component Bulk Loader
-Added ILSMDiskComponentBulkLoader interface, which is used to bulk
load an LSMDiskComponent with anti-matters
-Added ILSMDiskComponentWithBuddyBTreeBulkLoader, which is used to
bulk load an LSMDiskComponent with deleted-keys btrees
-Refactored LSM flush/merge/index bulk load operations to use
the LSMDiskComponentBulkLoader
Change-Id: I772a6d68761fcbb85982a1c9f72f2d186e1d1ffb
---
M
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
A
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponentBulkLoader.java
A
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponentBulkLoader.java
A
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponentWithBuddyBTreeBulkLoader.java
A
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponentBulkLoader.java
A
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponentWithBuddyBTreeBulkLoader.java
M
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
A
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDiskComponentBulkLoader.java
M
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
A
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDiskComponentBulkLoader.java
M
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
A
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesDiskComponentBulkLoader.java
12 files changed, 873 insertions(+), 505 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/73/1773/1
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
index 1c99d5a..d9e4da3 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
@@ -36,7 +36,6 @@
import
org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilterSpecification;
import org.apache.hyracks.storage.am.btree.impls.BTree;
import org.apache.hyracks.storage.am.btree.impls.BTree.BTreeAccessor;
-import org.apache.hyracks.storage.am.btree.impls.BTree.BTreeBulkLoader;
import org.apache.hyracks.storage.am.btree.impls.RangePredicate;
import org.apache.hyracks.storage.am.common.api.IIndexOperationContext;
import org.apache.hyracks.storage.am.common.api.IPageManager;
@@ -46,12 +45,12 @@
import org.apache.hyracks.storage.am.common.impls.AbstractSearchPredicate;
import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
-import org.apache.hyracks.storage.am.common.tuples.PermutingTupleReference;
import org.apache.hyracks.storage.am.lsm.btree.tuples.LSMBTreeTupleReference;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilterFactory;
import
org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilterFrameFactory;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import
org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentBulkLoader;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMHarness;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
@@ -426,12 +425,10 @@
RangePredicate nullPred = new RangePredicate(null, null, true, true,
null, null);
long numElements = 0L;
- BloomFilterSpecification bloomFilterSpec = null;
if (hasBloomFilter) {
//count elements in btree for creating Bloomfilter
IIndexCursor countingCursor = ((BTreeAccessor)
accessor).createCountingSearchCursor();
accessor.search(countingCursor, nullPred);
-
try {
while (countingCursor.hasNext()) {
countingCursor.next();
@@ -441,35 +438,22 @@
} finally {
countingCursor.close();
}
-
- int maxBucketsPerElement =
BloomCalculations.maxBucketsPerElement(numElements);
- bloomFilterSpec =
BloomCalculations.computeBloomSpec(maxBucketsPerElement,
bloomFilterFalsePositiveRate);
}
LSMBTreeDiskComponent component =
createDiskComponent(componentFactory, flushOp.getBTreeFlushTarget(),
flushOp.getBloomFilterFlushTarget(), true);
- IIndexBulkLoader bulkLoader =
component.getBTree().createBulkLoader(1.0f, false, numElements, false);
- IIndexBulkLoader builder = null;
- if (hasBloomFilter) {
- builder = component.getBloomFilter().createBuilder(numElements,
bloomFilterSpec.getNumHashes(),
- bloomFilterSpec.getNumBucketsPerElements());
- }
+ ILSMDiskComponentBulkLoader componentBulkLoader =
+ createComponentBulkLoader(component, 1.0f, false, numElements,
false, false);
IIndexCursor scanCursor = accessor.createSearchCursor(false);
accessor.search(scanCursor, nullPred);
try {
while (scanCursor.hasNext()) {
scanCursor.next();
- if (hasBloomFilter) {
- builder.add(scanCursor.getTuple());
- }
- bulkLoader.add(scanCursor.getTuple());
+ componentBulkLoader.add(scanCursor.getTuple());
}
} finally {
scanCursor.close();
- if (hasBloomFilter) {
- builder.end();
- }
}
if (component.getLSMComponentFilter() != null) {
@@ -487,7 +471,8 @@
// TODO This code should be in the callback and not in the index
flushingComponent.getMetadata().copy(component.getMetadata());
- bulkLoader.end();
+ componentBulkLoader.end();
+
return component;
}
@@ -526,38 +511,25 @@
List<ILSMComponent> mergedComponents = mergeOp.getMergingComponents();
long numElements = 0L;
- BloomFilterSpecification bloomFilterSpec = null;
if (hasBloomFilter) {
//count elements in btree for creating Bloomfilter
for (int i = 0; i < mergedComponents.size(); ++i) {
numElements += ((LSMBTreeDiskComponent)
mergedComponents.get(i)).getBloomFilter().getNumElements();
}
- int maxBucketsPerElement =
BloomCalculations.maxBucketsPerElement(numElements);
- bloomFilterSpec =
BloomCalculations.computeBloomSpec(maxBucketsPerElement,
bloomFilterFalsePositiveRate);
}
LSMBTreeDiskComponent mergedComponent =
createDiskComponent(componentFactory, mergeOp.getBTreeMergeTarget(),
mergeOp.getBloomFilterMergeTarget(), true);
- IIndexBulkLoader bulkLoader =
mergedComponent.getBTree().createBulkLoader(1.0f, false, numElements, false);
- IIndexBulkLoader builder = null;
- if (hasBloomFilter) {
- builder =
mergedComponent.getBloomFilter().createBuilder(numElements,
bloomFilterSpec.getNumHashes(),
- bloomFilterSpec.getNumBucketsPerElements());
- }
+ ILSMDiskComponentBulkLoader componentBulkLoader =
+ createComponentBulkLoader(mergedComponent, 1.0f, false,
numElements, false, false);
try {
while (cursor.hasNext()) {
cursor.next();
ITupleReference frameTuple = cursor.getTuple();
- if (hasBloomFilter) {
- builder.add(frameTuple);
- }
- bulkLoader.add(frameTuple);
+ componentBulkLoader.add(frameTuple);
}
} finally {
cursor.close();
- if (hasBloomFilter) {
- builder.end();
- }
}
if (mergedComponent.getLSMComponentFilter() != null) {
List<ITupleReference> filterTuples = new ArrayList<>();
@@ -568,7 +540,9 @@
filterManager.updateFilter(mergedComponent.getLSMComponentFilter(),
filterTuples);
filterManager.writeFilter(mergedComponent.getLSMComponentFilter(),
mergedComponent.getBTree());
}
- bulkLoader.end();
+
+ componentBulkLoader.end();
+
return mergedComponent;
}
@@ -593,6 +567,26 @@
filterManager.readFilter(component.getLSMComponentFilter(),
component.getBTree());
}
return component;
+ }
+
+ public ILSMDiskComponentBulkLoader
createComponentBulkLoader(LSMBTreeDiskComponent component, float fillFactor,
+ boolean verifyInput, long numElementsHint, boolean
checkIfEmptyIndex, boolean withFilter)
+ throws HyracksDataException {
+ BloomFilterSpecification bloomFilterSpec = null;
+ if (hasBloomFilter) {
+ int maxBucketsPerElement =
BloomCalculations.maxBucketsPerElement(numElementsHint);
+ bloomFilterSpec =
BloomCalculations.computeBloomSpec(maxBucketsPerElement,
bloomFilterFalsePositiveRate);
+ }
+
+ if (withFilter && filterFields != null) {
+ return new LSMBTreeDiskComponentBulkLoader(component,
bloomFilterSpec, fillFactor, verifyInput,
+ numElementsHint, checkIfEmptyIndex, filterManager,
btreeFields, filterFields,
+
MultiComparator.create(component.getLSMComponentFilter().getFilterCmpFactories()));
+ } else {
+ return new LSMBTreeDiskComponentBulkLoader(component,
bloomFilterSpec, fillFactor, verifyInput,
+ numElementsHint, checkIfEmptyIndex);
+ }
+
}
@Override
@@ -620,14 +614,7 @@
public class LSMBTreeBulkLoader implements IIndexBulkLoader {
private final ILSMDiskComponent component;
- private final BTreeBulkLoader bulkLoader;
- private final IIndexBulkLoader builder;
- private boolean cleanedUpArtifacts = false;
- private boolean isEmptyComponent = true;
- private boolean endedBloomFilterLoad = false;
- public final PermutingTupleReference indexTuple;
- public final PermutingTupleReference filterTuple;
- public final MultiComparator filterCmp;
+ private final ILSMDiskComponentBulkLoader bulkLoader;
public LSMBTreeBulkLoader(float fillFactor, boolean verifyInput, long
numElementsHint,
boolean checkIfEmptyIndex) throws HyracksDataException {
@@ -635,111 +622,30 @@
throw
HyracksDataException.create(ErrorCode.LOAD_NON_EMPTY_INDEX);
}
component = createBulkLoadTarget();
- bulkLoader = (BTreeBulkLoader) ((LSMBTreeDiskComponent)
component).getBTree().createBulkLoader(fillFactor,
- verifyInput, numElementsHint, false);
-
- if (hasBloomFilter) {
- int maxBucketsPerElement =
BloomCalculations.maxBucketsPerElement(numElementsHint);
- BloomFilterSpecification bloomFilterSpec =
-
BloomCalculations.computeBloomSpec(maxBucketsPerElement,
bloomFilterFalsePositiveRate);
- builder = ((LSMBTreeDiskComponent)
component).getBloomFilter().createBuilder(numElementsHint,
- bloomFilterSpec.getNumHashes(),
bloomFilterSpec.getNumBucketsPerElements());
- } else {
- builder = null;
- }
-
- if (filterFields != null) {
- indexTuple = new PermutingTupleReference(btreeFields);
- filterCmp =
MultiComparator.create(component.getLSMComponentFilter().getFilterCmpFactories());
- filterTuple = new PermutingTupleReference(filterFields);
- } else {
- indexTuple = null;
- filterCmp = null;
- filterTuple = null;
- }
+ bulkLoader = createComponentBulkLoader((LSMBTreeDiskComponent)
component, fillFactor, verifyInput,
+ numElementsHint, checkIfEmptyIndex, true);
}
@Override
public void add(ITupleReference tuple) throws HyracksDataException {
- try {
- ITupleReference t;
- if (indexTuple != null) {
- indexTuple.reset(tuple);
- t = indexTuple;
- } else {
- t = tuple;
- }
-
- bulkLoader.add(t);
- if (hasBloomFilter) {
- builder.add(t);
- }
-
- if (filterTuple != null) {
- filterTuple.reset(tuple);
- component.getLSMComponentFilter().update(filterTuple,
filterCmp);
- }
- } catch (Exception e) {
- cleanupArtifacts();
- throw e;
- }
- if (isEmptyComponent) {
- isEmptyComponent = false;
- }
- }
-
- protected void cleanupArtifacts() throws HyracksDataException {
- if (!cleanedUpArtifacts) {
- cleanedUpArtifacts = true;
- if (hasBloomFilter && !endedBloomFilterLoad) {
- builder.abort();
- endedBloomFilterLoad = true;
- }
- ((LSMBTreeDiskComponent) component).getBTree().deactivate();
- ((LSMBTreeDiskComponent) component).getBTree().destroy();
- if (hasBloomFilter) {
- ((LSMBTreeDiskComponent)
component).getBloomFilter().deactivate();
- ((LSMBTreeDiskComponent)
component).getBloomFilter().destroy();
- }
- }
+ bulkLoader.add(tuple);
}
@Override
public void end() throws HyracksDataException {
- if (!cleanedUpArtifacts) {
- if (hasBloomFilter && !endedBloomFilterLoad) {
- builder.end();
- endedBloomFilterLoad = true;
- }
-
- if (component.getLSMComponentFilter() != null) {
-
filterManager.writeFilter(component.getLSMComponentFilter(),
- ((LSMBTreeDiskComponent) component).getBTree());
- }
- bulkLoader.end();
-
- if (isEmptyComponent) {
- cleanupArtifacts();
- } else {
- //TODO(amoudi): Ensure Bulk load follow the same lifecycle
Other Operations (Flush, Merge, etc).
- //then after operation should be called from harness as
well
- //https://issues.apache.org/jira/browse/ASTERIXDB-1764
- ioOpCallback.afterOperation(LSMOperationType.FLUSH, null,
component);
- lsmHarness.addBulkLoadedComponent(component);
- }
+ bulkLoader.end();
+ if (component.getComponentSize() > 0) {
+ //TODO(amoudi): Ensure Bulk load follow the same lifecycle
Other Operations (Flush, Merge, etc).
+ //then after operation should be called from harness as well
+ //https://issues.apache.org/jira/browse/ASTERIXDB-1764
+ ioOpCallback.afterOperation(LSMOperationType.FLUSH, null,
component);
+ lsmHarness.addBulkLoadedComponent(component);
}
}
@Override
public void abort() throws HyracksDataException {
- if (bulkLoader != null) {
- bulkLoader.abort();
- }
-
- if (builder != null) {
- builder.abort();
- }
-
+ bulkLoader.end();
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponentBulkLoader.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponentBulkLoader.java
new file mode 100644
index 0000000..a5720de
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponentBulkLoader.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.impls;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilter;
+import
org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilterSpecification;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilterManager;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import
org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMDiskComponentBulkLoader;
+import org.apache.hyracks.storage.common.IIndex;
+import org.apache.hyracks.storage.common.MultiComparator;
+
+public class LSMBTreeDiskComponentBulkLoader extends
AbstractLSMDiskComponentBulkLoader {
+
+ //with filter
+ public LSMBTreeDiskComponentBulkLoader(LSMBTreeDiskComponent component,
BloomFilterSpecification bloomFilterSpec,
+ float fillFactor, boolean verifyInput, long numElementsHint,
boolean checkIfEmptyIndex,
+ ILSMComponentFilterManager filterManager, int[] indexFields, int[]
filterFields, MultiComparator filterCmp)
+ throws HyracksDataException {
+ super(component, bloomFilterSpec, fillFactor, verifyInput,
numElementsHint, checkIfEmptyIndex, filterManager,
+ indexFields, filterFields, filterCmp);
+ }
+
+ //without filter
+ public LSMBTreeDiskComponentBulkLoader(LSMBTreeDiskComponent component,
BloomFilterSpecification bloomFilterSpec,
+ float fillFactor, boolean verifyInput, long numElementsHint,
boolean checkIfEmptyIndex)
+ throws HyracksDataException {
+ super(component, bloomFilterSpec, fillFactor, verifyInput,
numElementsHint, checkIfEmptyIndex, null, null, null,
+ null);
+ }
+
+ @Override
+ protected BloomFilter getBloomFilter(ILSMDiskComponent component) {
+ return ((LSMBTreeDiskComponent) component).getBloomFilter();
+ }
+
+ @Override
+ protected IIndex getIndex(ILSMDiskComponent component) {
+ return ((LSMBTreeDiskComponent) component).getBTree();
+ }
+
+}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponentBulkLoader.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponentBulkLoader.java
new file mode 100644
index 0000000..cdb3e73
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponentBulkLoader.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.common.api;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+
+public interface ILSMDiskComponentBulkLoader {
+ /**
+ * Append a tuple to the disk component in the context of a bulk load.
+ *
+ * @param tuple
+ * Tuple to be inserted.
+ * @throws HyracksDataException
+ * If the BufferCache throws while un/pinning or un/latching.
+ */
+ public void add(ITupleReference tuple) throws HyracksDataException;
+
+ /**
+ * Finalize the bulk loading operation in the given context.
+ *
+ * @throws HyracksDataException
+ * If the BufferCache throws while un/pinning or un/latching.
+ */
+ public void end() throws HyracksDataException;
+
+ /**
+ * Release all resources held by this bulkloader, with no guarantee of
+ * persisted content.
+ */
+ void abort() throws HyracksDataException;
+}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponentWithBuddyBTreeBulkLoader.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponentWithBuddyBTreeBulkLoader.java
new file mode 100644
index 0000000..8ef79ed
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponentWithBuddyBTreeBulkLoader.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.common.api;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+
+public interface ILSMDiskComponentWithBuddyBTreeBulkLoader extends
ILSMDiskComponentBulkLoader {
+ /**
+ * Append a deleted tuple to the disk component in the context of a bulk
load.
+ *
+ * @param tuple
+ * Tuple to be inserted.
+ * @throws HyracksDataException
+ * If the BufferCache throws while un/pinning or un/latching.
+ */
+ public void addDeletedTuple(ITupleReference tuple) throws
HyracksDataException;
+
+}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponentBulkLoader.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponentBulkLoader.java
new file mode 100644
index 0000000..1893a28
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponentBulkLoader.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.common.impls;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilter;
+import
org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilterSpecification;
+import org.apache.hyracks.storage.am.common.api.ITreeIndex;
+import org.apache.hyracks.storage.am.common.tuples.PermutingTupleReference;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilterManager;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import
org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentBulkLoader;
+import org.apache.hyracks.storage.common.IIndex;
+import org.apache.hyracks.storage.common.IIndexBulkLoader;
+import org.apache.hyracks.storage.common.MultiComparator;
+
+public abstract class AbstractLSMDiskComponentBulkLoader implements
ILSMDiskComponentBulkLoader {
+ protected final ILSMDiskComponent component;
+
+ protected final IIndexBulkLoader indexBulkLoader;
+ protected final IIndexBulkLoader bloomFilterBuilder;
+
+ protected final ILSMComponentFilterManager filterManager;
+ protected final PermutingTupleReference indexTuple;
+ protected final PermutingTupleReference filterTuple;
+ protected final MultiComparator filterCmp;
+
+ protected boolean cleanedUpArtifacts = false;
+ protected boolean isEmptyComponent = true;
+ protected boolean endedBloomFilterLoad = false;
+
+ //with filter
+ public AbstractLSMDiskComponentBulkLoader(ILSMDiskComponent component,
BloomFilterSpecification bloomFilterSpec,
+ float fillFactor, boolean verifyInput, long numElementsHint,
boolean checkIfEmptyIndex,
+ ILSMComponentFilterManager filterManager, int[] indexFields, int[]
filterFields, MultiComparator filterCmp)
+ throws HyracksDataException {
+ this.component = component;
+ this.indexBulkLoader =
+ getIndex(component).createBulkLoader(fillFactor, verifyInput,
numElementsHint, checkIfEmptyIndex);
+ if (bloomFilterSpec != null) {
+ this.bloomFilterBuilder =
getBloomFilter(component).createBuilder(numElementsHint,
+ bloomFilterSpec.getNumHashes(),
bloomFilterSpec.getNumBucketsPerElements());
+ } else {
+ this.bloomFilterBuilder = null;
+ }
+ if (filterManager != null) {
+ this.filterManager = filterManager;
+ this.indexTuple = new PermutingTupleReference(indexFields);
+ this.filterTuple = new PermutingTupleReference(filterFields);
+ this.filterCmp = filterCmp;
+ } else {
+ this.filterManager = null;
+ this.indexTuple = null;
+ this.filterTuple = null;
+ this.filterCmp = null;
+ }
+ }
+
+ @Override
+ public void add(ITupleReference tuple) throws HyracksDataException {
+ try {
+ ITupleReference t;
+ if (indexTuple != null) {
+ indexTuple.reset(tuple);
+ t = indexTuple;
+ } else {
+ t = tuple;
+ }
+
+ indexBulkLoader.add(t);
+ if (bloomFilterBuilder != null) {
+ bloomFilterBuilder.add(t);
+ }
+
+ if (filterTuple != null) {
+ filterTuple.reset(tuple);
+ component.getLSMComponentFilter().update(filterTuple,
filterCmp);
+ }
+ } catch (Exception e) {
+ cleanupArtifacts();
+ throw e;
+ }
+ if (isEmptyComponent) {
+ isEmptyComponent = false;
+ }
+ }
+
+ @Override
+ public void abort() throws HyracksDataException {
+ if (indexBulkLoader != null) {
+ indexBulkLoader.abort();
+ }
+ if (bloomFilterBuilder != null) {
+ bloomFilterBuilder.abort();
+ }
+
+ }
+
+ @Override
+ public void end() throws HyracksDataException {
+ if (!cleanedUpArtifacts) {
+ if (bloomFilterBuilder != null && !endedBloomFilterLoad) {
+ bloomFilterBuilder.end();
+ endedBloomFilterLoad = true;
+ }
+
+ //use filter
+ if (filterManager != null && component.getLSMComponentFilter() !=
null) {
+ filterManager.writeFilter(component.getLSMComponentFilter(),
getTreeIndex(component));
+ }
+ indexBulkLoader.end();
+
+ if (isEmptyComponent) {
+ cleanupArtifacts();
+ }
+ }
+ }
+
+ protected void cleanupArtifacts() throws HyracksDataException {
+ if (!cleanedUpArtifacts) {
+ cleanedUpArtifacts = true;
+ if (bloomFilterBuilder != null && !endedBloomFilterLoad) {
+ bloomFilterBuilder.abort();
+ endedBloomFilterLoad = true;
+ }
+ getIndex(component).deactivate();
+ getIndex(component).destroy();
+ if (bloomFilterBuilder != null) {
+ getBloomFilter(component).deactivate();
+ getBloomFilter(component).destroy();
+ }
+ }
+ }
+
+ /**
+ * TreeIndex is used to hold the filter tuple values
+ *
+ * @param component
+ * @return
+ */
+ protected ITreeIndex getTreeIndex(ILSMDiskComponent component) {
+ return (ITreeIndex) getIndex(component);
+ }
+
+ protected abstract IIndex getIndex(ILSMDiskComponent component);
+
+ protected abstract BloomFilter getBloomFilter(ILSMDiskComponent component);
+
+}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponentWithBuddyBTreeBulkLoader.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponentWithBuddyBTreeBulkLoader.java
new file mode 100644
index 0000000..0cebced
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponentWithBuddyBTreeBulkLoader.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.common.impls;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import
org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilterSpecification;
+import org.apache.hyracks.storage.am.common.api.ITreeIndex;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilterManager;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import
org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentWithBuddyBTreeBulkLoader;
+import org.apache.hyracks.storage.common.IIndexBulkLoader;
+import org.apache.hyracks.storage.common.MultiComparator;
+
+public abstract class AbstractLSMDiskComponentWithBuddyBTreeBulkLoader extends
AbstractLSMDiskComponentBulkLoader
+ implements ILSMDiskComponentWithBuddyBTreeBulkLoader {
+
+ protected final IIndexBulkLoader buddyBTreeBulkLoader;
+
+ //with filter
+ public AbstractLSMDiskComponentWithBuddyBTreeBulkLoader(ILSMDiskComponent
component,
+ BloomFilterSpecification bloomFilterSpec, float fillFactor,
boolean verifyInput, long numElementsHint,
+ boolean checkIfEmptyIndex, ILSMComponentFilterManager
filterManager, int[] indexFields, int[] filterFields,
+ MultiComparator filterCmp) throws HyracksDataException {
+ super(component, bloomFilterSpec, fillFactor, verifyInput,
numElementsHint, checkIfEmptyIndex, filterManager,
+ indexFields, filterFields, filterCmp);
+
+ // BuddyBTree must be created even if it could be empty,
+ // since without it the component is not considered as valid.
+ buddyBTreeBulkLoader =
+ getBuddyBTree(component).createBulkLoader(fillFactor,
verifyInput, numElementsHint, checkIfEmptyIndex);
+ }
+
+ @Override
+ public void add(ITupleReference tuple) throws HyracksDataException {
+ try {
+ ITupleReference t;
+ if (indexTuple != null) {
+ indexTuple.reset(tuple);
+ t = indexTuple;
+ } else {
+ t = tuple;
+ }
+
+ indexBulkLoader.add(t);
+
+ if (filterTuple != null) {
+ filterTuple.reset(tuple);
+ component.getLSMComponentFilter().update(filterTuple,
filterCmp);
+ }
+ } catch (Exception e) {
+ cleanupArtifacts();
+ throw e;
+ }
+ if (isEmptyComponent) {
+ isEmptyComponent = false;
+ }
+ }
+
+ @Override
+ public void addDeletedTuple(ITupleReference tuple) throws
HyracksDataException {
+ try {
+ buddyBTreeBulkLoader.add(tuple);
+ } catch (Exception e) {
+ cleanupArtifacts();
+ throw e;
+ }
+ if (isEmptyComponent) {
+ isEmptyComponent = false;
+ }
+ }
+
+ @Override
+ public void abort() throws HyracksDataException {
+ super.abort();
+ if (buddyBTreeBulkLoader != null) {
+ buddyBTreeBulkLoader.abort();
+ }
+ }
+
+ @Override
+ public void end() throws HyracksDataException {
+ if (!cleanedUpArtifacts) {
+ if (bloomFilterBuilder != null && !endedBloomFilterLoad) {
+ bloomFilterBuilder.end();
+ endedBloomFilterLoad = true;
+ }
+
+ //use filter
+ if (filterManager != null && component.getLSMComponentFilter() !=
null) {
+ filterManager.writeFilter(component.getLSMComponentFilter(),
getTreeIndex(component));
+ }
+ indexBulkLoader.end();
+ buddyBTreeBulkLoader.end();
+
+ if (isEmptyComponent) {
+ cleanupArtifacts();
+ }
+ }
+ }
+
+ @Override
+ protected void cleanupArtifacts() throws HyracksDataException {
+ if (!cleanedUpArtifacts) {
+ cleanedUpArtifacts = true;
+ if (bloomFilterBuilder != null && !endedBloomFilterLoad) {
+ bloomFilterBuilder.abort();
+ endedBloomFilterLoad = true;
+ }
+ getIndex(component).deactivate();
+ getIndex(component).destroy();
+
+ getBuddyBTree(component).deactivate();
+ getBuddyBTree(component).destroy();
+
+ if (bloomFilterBuilder != null) {
+ getBloomFilter(component).deactivate();
+ getBloomFilter(component).destroy();
+ }
+ }
+ }
+
+ protected abstract ITreeIndex getBuddyBTree(ILSMDiskComponent component);
+
+}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
index 3d7b716..c6178c7 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
@@ -52,6 +52,7 @@
import
org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilterFrameFactory;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentFactory;
+import
org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentWithBuddyBTreeBulkLoader;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
@@ -465,39 +466,14 @@
LSMInvertedIndexDiskComponent component =
createDiskInvIndexComponent(componentFactory,
flushOp.getDictBTreeFlushTarget(),
flushOp.getDeletedKeysBTreeFlushTarget(),
flushOp.getBloomFilterFlushTarget(), true);
- IInvertedIndex diskInvertedIndex = component.getInvIndex();
// Create a scan cursor on the BTree underlying the in-memory inverted
index.
LSMInvertedIndexMemoryComponent flushingComponent =
(LSMInvertedIndexMemoryComponent)
flushOp.getFlushingComponent();
- InMemoryInvertedIndexAccessor memInvIndexAccessor =
(InMemoryInvertedIndexAccessor) flushingComponent
- .getInvIndex().createAccessor(NoOpOperationCallback.INSTANCE,
NoOpOperationCallback.INSTANCE);
- BTreeAccessor memBTreeAccessor =
memInvIndexAccessor.getBTreeAccessor();
+
RangePredicate nullPred = new RangePredicate(null, null, true, true,
null, null);
- IIndexCursor scanCursor = memBTreeAccessor.createSearchCursor(false);
- memBTreeAccessor.search(scanCursor, nullPred);
- // Bulk load the disk inverted index from the in-memory inverted index.
- IIndexBulkLoader invIndexBulkLoader =
diskInvertedIndex.createBulkLoader(1.0f, false, 0L, false);
- try {
- while (scanCursor.hasNext()) {
- scanCursor.next();
- invIndexBulkLoader.add(scanCursor.getTuple());
- }
- } finally {
- scanCursor.close();
- }
- if (component.getLSMComponentFilter() != null) {
- List<ITupleReference> filterTuples = new ArrayList<>();
-
filterTuples.add(flushingComponent.getLSMComponentFilter().getMinTuple());
-
filterTuples.add(flushingComponent.getLSMComponentFilter().getMaxTuple());
- filterManager.updateFilter(component.getLSMComponentFilter(),
filterTuples);
- filterManager.writeFilter(component.getLSMComponentFilter(),
- ((OnDiskInvertedIndex)
component.getInvIndex()).getBTree());
- }
- flushingComponent.getMetadata().copy(component.getMetadata());
- invIndexBulkLoader.end();
-
+ // Search the deleted keys BTree to calculate the number of elements
for BloomFilter
IIndexAccessor deletedKeysBTreeAccessor =
flushingComponent.getDeletedKeysBTree()
.createAccessor(NoOpOperationCallback.INSTANCE,
NoOpOperationCallback.INSTANCE);
IIndexCursor btreeCountingCursor = ((BTreeAccessor)
deletedKeysBTreeAccessor).createCountingSearchCursor();
@@ -513,33 +489,49 @@
btreeCountingCursor.close();
}
- int maxBucketsPerElement =
BloomCalculations.maxBucketsPerElement(numBTreeTuples);
- BloomFilterSpecification bloomFilterSpec =
- BloomCalculations.computeBloomSpec(maxBucketsPerElement,
bloomFilterFalsePositiveRate);
-
- // Create an BTree instance for the deleted keys.
- BTree diskDeletedKeysBTree = component.getDeletedKeysBTree();
+ ILSMDiskComponentWithBuddyBTreeBulkLoader componentBulkLoader =
+ createComponentBulkLoader(component, true, 1.0f, false,
numBTreeTuples, false, false);
// Create a scan cursor on the deleted keys BTree underlying the
in-memory inverted index.
IIndexCursor deletedKeysScanCursor =
deletedKeysBTreeAccessor.createSearchCursor(false);
deletedKeysBTreeAccessor.search(deletedKeysScanCursor, nullPred);
- // Bulk load the deleted-keys BTree.
- IIndexBulkLoader deletedKeysBTreeBulkLoader =
diskDeletedKeysBTree.createBulkLoader(1.0f, false, 0L, false);
- IIndexBulkLoader builder =
component.getBloomFilter().createBuilder(numBTreeTuples,
- bloomFilterSpec.getNumHashes(),
bloomFilterSpec.getNumBucketsPerElements());
-
try {
while (deletedKeysScanCursor.hasNext()) {
deletedKeysScanCursor.next();
-
deletedKeysBTreeBulkLoader.add(deletedKeysScanCursor.getTuple());
- builder.add(deletedKeysScanCursor.getTuple());
+
componentBulkLoader.addDeletedTuple(deletedKeysScanCursor.getTuple());
}
} finally {
deletedKeysScanCursor.close();
- builder.end();
}
- deletedKeysBTreeBulkLoader.end();
+
+ // Scan the in-memory inverted index
+ InMemoryInvertedIndexAccessor memInvIndexAccessor =
(InMemoryInvertedIndexAccessor) flushingComponent
+ .getInvIndex().createAccessor(NoOpOperationCallback.INSTANCE,
NoOpOperationCallback.INSTANCE);
+ BTreeAccessor memBTreeAccessor =
memInvIndexAccessor.getBTreeAccessor();
+ IIndexCursor scanCursor = memBTreeAccessor.createSearchCursor(false);
+ memBTreeAccessor.search(scanCursor, nullPred);
+
+ // Bulk load the disk inverted index from the in-memory inverted index.
+ try {
+ while (scanCursor.hasNext()) {
+ scanCursor.next();
+ componentBulkLoader.add(scanCursor.getTuple());
+ }
+ } finally {
+ scanCursor.close();
+ }
+ if (component.getLSMComponentFilter() != null) {
+ List<ITupleReference> filterTuples = new ArrayList<>();
+
filterTuples.add(flushingComponent.getLSMComponentFilter().getMinTuple());
+
filterTuples.add(flushingComponent.getLSMComponentFilter().getMaxTuple());
+ filterManager.updateFilter(component.getLSMComponentFilter(),
filterTuples);
+ filterManager.writeFilter(component.getLSMComponentFilter(),
+ ((OnDiskInvertedIndex)
component.getInvIndex()).getBTree());
+ }
+ flushingComponent.getMetadata().copy(component.getMetadata());
+
+ componentBulkLoader.end();
return component;
}
@@ -585,7 +577,7 @@
createDiskInvIndexComponent(componentFactory,
mergeOp.getDictBTreeMergeTarget(),
mergeOp.getDeletedKeysBTreeMergeTarget(),
mergeOp.getBloomFilterMergeTarget(), true);
- IInvertedIndex mergedDiskInvertedIndex = component.getInvIndex();
+ ILSMDiskComponentWithBuddyBTreeBulkLoader componentBulkLoader;
// In case we must keep the deleted-keys BTrees, then they must be
merged *before* merging the inverted indexes so that
// lsmHarness.endSearch() is called once when the inverted indexes
have been merged.
@@ -597,44 +589,31 @@
new LSMInvertedIndexDeletedKeysBTreeMergeCursor(opCtx);
search(opCtx, btreeCursor, mergePred);
- BTree btree = component.getDeletedKeysBTree();
- IIndexBulkLoader btreeBulkLoader = btree.createBulkLoader(1.0f,
true, 0L, false);
-
long numElements = 0L;
for (int i = 0; i < mergeOp.getMergingComponents().size(); ++i) {
numElements += ((LSMInvertedIndexDiskComponent)
mergeOp.getMergingComponents().get(i)).getBloomFilter()
.getNumElements();
}
- int maxBucketsPerElement =
BloomCalculations.maxBucketsPerElement(numElements);
- BloomFilterSpecification bloomFilterSpec =
- BloomCalculations.computeBloomSpec(maxBucketsPerElement,
bloomFilterFalsePositiveRate);
- IIndexBulkLoader builder =
component.getBloomFilter().createBuilder(numElements,
- bloomFilterSpec.getNumHashes(),
bloomFilterSpec.getNumBucketsPerElements());
+ componentBulkLoader = createComponentBulkLoader(component, true,
1.0f, false, numElements, false, false);
try {
while (btreeCursor.hasNext()) {
btreeCursor.next();
ITupleReference tuple = btreeCursor.getTuple();
- btreeBulkLoader.add(tuple);
- builder.add(tuple);
+ componentBulkLoader.addDeletedTuple(tuple);
}
} finally {
btreeCursor.close();
- builder.end();
}
- btreeBulkLoader.end();
} else {
- BTree btree = component.getDeletedKeysBTree();
- IIndexBulkLoader btreeBulkLoader = btree.createBulkLoader(1.0f,
true, 0L, false);
- btreeBulkLoader.end();
+ componentBulkLoader = createComponentBulkLoader(component, false,
1.0f, false, 0L, false, false);
}
- IIndexBulkLoader invIndexBulkLoader =
mergedDiskInvertedIndex.createBulkLoader(1.0f, true, 0L, false);
try {
while (cursor.hasNext()) {
cursor.next();
ITupleReference tuple = cursor.getTuple();
- invIndexBulkLoader.add(tuple);
+ componentBulkLoader.add(tuple);
}
} finally {
cursor.close();
@@ -655,9 +634,28 @@
filterManager.writeFilter(component.getLSMComponentFilter(),
((OnDiskInvertedIndex)
component.getInvIndex()).getBTree());
}
- invIndexBulkLoader.end();
+
+ componentBulkLoader.end();
return component;
+ }
+
+ public ILSMDiskComponentWithBuddyBTreeBulkLoader
createComponentBulkLoader(LSMInvertedIndexDiskComponent component,
+ boolean hasBloomFilter, float fillFactor, boolean verifyInput,
long numElementsHint,
+ boolean checkIfEmptyIndex, boolean withFilter) throws
HyracksDataException {
+ BloomFilterSpecification bloomFilterSpec = null;
+ if (hasBloomFilter) {
+ int maxBucketsPerElement =
BloomCalculations.maxBucketsPerElement(numElementsHint);
+ bloomFilterSpec =
BloomCalculations.computeBloomSpec(maxBucketsPerElement,
bloomFilterFalsePositiveRate);
+ }
+ if (withFilter && filterFields != null) {
+ return new LSMInvertedIndexDiskComponentBulkLoader(component,
bloomFilterSpec, fillFactor, verifyInput,
+ numElementsHint, checkIfEmptyIndex, filterManager,
invertedIndexFields, filterFields,
+
MultiComparator.create(component.getLSMComponentFilter().getFilterCmpFactories()));
+ } else {
+ return new LSMInvertedIndexDiskComponentBulkLoader(component,
bloomFilterSpec, fillFactor, verifyInput,
+ numElementsHint, checkIfEmptyIndex);
+ }
}
@Override
@@ -668,13 +666,7 @@
public class LSMInvertedIndexBulkLoader implements IIndexBulkLoader {
private final ILSMDiskComponent component;
- private final IIndexBulkLoader invIndexBulkLoader;
- private final IIndexBulkLoader deletedKeysBTreeBulkLoader;
- private boolean cleanedUpArtifacts = false;
- private boolean isEmptyComponent = true;
- public final PermutingTupleReference indexTuple;
- public final PermutingTupleReference filterTuple;
- public final MultiComparator filterCmp;
+ private final ILSMDiskComponentWithBuddyBTreeBulkLoader
componentBulkLoader;
public LSMInvertedIndexBulkLoader(float fillFactor, boolean
verifyInput, long numElementsHint,
boolean checkIfEmptyIndex) throws HyracksDataException {
@@ -684,92 +676,27 @@
// Note that by using a flush target file name, we state that the
// new bulk loaded tree is "newer" than any other merged tree.
component = createBulkLoadTarget();
- invIndexBulkLoader = ((LSMInvertedIndexDiskComponent)
component).getInvIndex().createBulkLoader(fillFactor,
- verifyInput, numElementsHint, false);
-
- //validity of the component depends on the deleted keys file being
there even if it's empty.
- deletedKeysBTreeBulkLoader = ((LSMInvertedIndexDiskComponent)
component).getDeletedKeysBTree()
- .createBulkLoader(fillFactor, verifyInput,
numElementsHint, false);
-
- if (filterFields != null) {
- indexTuple = new PermutingTupleReference(invertedIndexFields);
- filterCmp =
MultiComparator.create(component.getLSMComponentFilter().getFilterCmpFactories());
- filterTuple = new PermutingTupleReference(filterFields);
- } else {
- indexTuple = null;
- filterCmp = null;
- filterTuple = null;
- }
+ componentBulkLoader =
createComponentBulkLoader((LSMInvertedIndexDiskComponent) component, false,
+ fillFactor, verifyInput, numElementsHint,
checkIfEmptyIndex, true);
}
@Override
public void add(ITupleReference tuple) throws HyracksDataException {
- try {
- ITupleReference t;
- if (indexTuple != null) {
- indexTuple.reset(tuple);
- t = indexTuple;
- } else {
- t = tuple;
- }
-
- invIndexBulkLoader.add(t);
-
- if (filterTuple != null) {
- filterTuple.reset(tuple);
- component.getLSMComponentFilter().update(filterTuple,
filterCmp);
- }
-
- } catch (Exception e) {
- cleanupArtifacts();
- throw e;
- }
- if (isEmptyComponent) {
- isEmptyComponent = false;
- }
- }
-
- protected void cleanupArtifacts() throws HyracksDataException {
- if (!cleanedUpArtifacts) {
- cleanedUpArtifacts = true;
- ((LSMInvertedIndexDiskComponent)
component).getInvIndex().deactivate();
- ((LSMInvertedIndexDiskComponent)
component).getInvIndex().destroy();
- ((LSMInvertedIndexDiskComponent)
component).getDeletedKeysBTree().deactivate();
- ((LSMInvertedIndexDiskComponent)
component).getDeletedKeysBTree().destroy();
- ((LSMInvertedIndexDiskComponent)
component).getBloomFilter().deactivate();
- ((LSMInvertedIndexDiskComponent)
component).getBloomFilter().destroy();
- }
+ componentBulkLoader.add(tuple);
}
@Override
public void end() throws HyracksDataException {
- if (!cleanedUpArtifacts) {
- if (component.getLSMComponentFilter() != null) {
-
filterManager.writeFilter(component.getLSMComponentFilter(),
- (((OnDiskInvertedIndex)
((LSMInvertedIndexDiskComponent) component).getInvIndex())
- .getBTree()));
- }
- invIndexBulkLoader.end();
- deletedKeysBTreeBulkLoader.end();
-
- if (isEmptyComponent) {
- cleanupArtifacts();
- } else {
- ioOpCallback.afterOperation(LSMOperationType.FLUSH, null,
component);
- lsmHarness.addBulkLoadedComponent(component);
- }
+ componentBulkLoader.end();
+ if (component.getComponentSize() > 0) {
+ ioOpCallback.afterOperation(LSMOperationType.FLUSH, null,
component);
+ lsmHarness.addBulkLoadedComponent(component);
}
}
@Override
public void abort() throws HyracksDataException {
- if (invIndexBulkLoader != null) {
- invIndexBulkLoader.abort();
- }
-
- if (deletedKeysBTreeBulkLoader != null) {
- deletedKeysBTreeBulkLoader.abort();
- }
+ componentBulkLoader.abort();
}
private ILSMDiskComponent createBulkLoadTarget() throws
HyracksDataException {
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDiskComponentBulkLoader.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDiskComponentBulkLoader.java
new file mode 100644
index 0000000..4322852
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDiskComponentBulkLoader.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.invertedindex.impls;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilter;
+import
org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilterSpecification;
+import org.apache.hyracks.storage.am.common.api.ITreeIndex;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilterManager;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import
org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMDiskComponentWithBuddyBTreeBulkLoader;
+import
org.apache.hyracks.storage.am.lsm.invertedindex.ondisk.OnDiskInvertedIndex;
+import org.apache.hyracks.storage.common.IIndex;
+import org.apache.hyracks.storage.common.MultiComparator;
+
+public class LSMInvertedIndexDiskComponentBulkLoader extends
AbstractLSMDiskComponentWithBuddyBTreeBulkLoader {
+
+ //with filter
+ public
LSMInvertedIndexDiskComponentBulkLoader(LSMInvertedIndexDiskComponent component,
+ BloomFilterSpecification bloomFilterSpec, float fillFactor,
boolean verifyInput, long numElementsHint,
+ boolean checkIfEmptyIndex, ILSMComponentFilterManager
filterManager, int[] indexFields, int[] filterFields,
+ MultiComparator filterCmp) throws HyracksDataException {
+ super(component, bloomFilterSpec, fillFactor, verifyInput,
numElementsHint, checkIfEmptyIndex, filterManager,
+ indexFields, filterFields, filterCmp);
+ }
+
+ //without filter
+ public
LSMInvertedIndexDiskComponentBulkLoader(LSMInvertedIndexDiskComponent component,
+ BloomFilterSpecification bloomFilterSpec, float fillFactor,
boolean verifyInput, long numElementsHint,
+ boolean checkIfEmptyIndex) throws HyracksDataException {
+ super(component, bloomFilterSpec, fillFactor, verifyInput,
numElementsHint, checkIfEmptyIndex, null, null, null,
+ null);
+ }
+
+ @Override
+ protected BloomFilter getBloomFilter(ILSMDiskComponent component) {
+ return ((LSMInvertedIndexDiskComponent) component).getBloomFilter();
+ }
+
+ @Override
+ protected IIndex getIndex(ILSMDiskComponent component) {
+ return ((LSMInvertedIndexDiskComponent) component).getInvIndex();
+ }
+
+ @Override
+ protected ITreeIndex getTreeIndex(ILSMDiskComponent component) {
+ return ((OnDiskInvertedIndex) ((LSMInvertedIndexDiskComponent)
component).getInvIndex()).getBTree();
+ }
+
+ @Override
+ protected ITreeIndex getBuddyBTree(ILSMDiskComponent component) {
+ return ((LSMInvertedIndexDiskComponent)
component).getDeletedKeysBTree();
+ }
+
+}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
index 8882639..95db8a3 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
@@ -45,11 +45,11 @@
import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
import org.apache.hyracks.storage.am.common.tuples.DualTupleReference;
-import org.apache.hyracks.storage.am.common.tuples.PermutingTupleReference;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilterFactory;
import
org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilterFrameFactory;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import
org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentWithBuddyBTreeBulkLoader;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMHarness;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
@@ -60,7 +60,6 @@
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
-import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
import
org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences;
import
org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentFilterManager;
import org.apache.hyracks.storage.am.lsm.common.impls.LSMTreeIndexAccessor;
@@ -228,47 +227,8 @@
memRTreeAccessor.search(rtreeScanCursor, rtreeNullPredicate);
LSMRTreeDiskComponent component =
createDiskComponent(componentFactory, flushOp.getRTreeFlushTarget(),
flushOp.getBTreeFlushTarget(),
flushOp.getBloomFilterFlushTarget(), true);
- RTree diskRTree = component.getRTree();
- IIndexBulkLoader rTreeBulkloader;
- ITreeIndexCursor cursor;
- IBinaryComparatorFactory[] linearizerArray = { linearizer };
-
- TreeTupleSorter rTreeTupleSorter = new
TreeTupleSorter(flushingComponent.getRTree().getFileId(),
- linearizerArray, rtreeLeafFrameFactory.createFrame(),
rtreeLeafFrameFactory.createFrame(),
- flushingComponent.getRTree().getBufferCache(),
comparatorFields);
- // BulkLoad the tuples from the in-memory tree into the new disk
- // RTree.
-
- boolean isEmpty = true;
- try {
- while (rtreeScanCursor.hasNext()) {
- isEmpty = false;
- rtreeScanCursor.next();
- rTreeTupleSorter.insertTupleEntry(rtreeScanCursor.getPageId(),
rtreeScanCursor.getTupleOffset());
- }
- } finally {
- rtreeScanCursor.close();
- }
- rTreeTupleSorter.sort();
-
- rTreeBulkloader = diskRTree.createBulkLoader(1.0f, false, 0L, false);
- cursor = rTreeTupleSorter;
-
- if (!isEmpty) {
- try {
- while (cursor.hasNext()) {
- cursor.next();
- ITupleReference frameTuple = cursor.getTuple();
- rTreeBulkloader.add(frameTuple);
- }
- } finally {
- cursor.close();
- }
- }
-
- rTreeBulkloader.end();
-
+ //count the number of tuples in the buddy btree
ITreeIndexAccessor memBTreeAccessor = flushingComponent.getBTree()
.createAccessor(NoOpOperationCallback.INSTANCE,
NoOpOperationCallback.INSTANCE);
RangePredicate btreeNullPredicate = new RangePredicate(null, null,
true, true, null, null);
@@ -285,29 +245,56 @@
btreeCountingCursor.close();
}
- int maxBucketsPerElement =
BloomCalculations.maxBucketsPerElement(numBTreeTuples);
- BloomFilterSpecification bloomFilterSpec =
- BloomCalculations.computeBloomSpec(maxBucketsPerElement,
bloomFilterFalsePositiveRate);
+ ILSMDiskComponentWithBuddyBTreeBulkLoader componentBulkLoader =
+ createComponentBulkLoader(component, true, 1.0f, false,
numBTreeTuples, false, false);
+
+ ITreeIndexCursor cursor;
+ IBinaryComparatorFactory[] linearizerArray = { linearizer };
+
+ TreeTupleSorter rTreeTupleSorter = new
TreeTupleSorter(flushingComponent.getRTree().getFileId(),
+ linearizerArray, rtreeLeafFrameFactory.createFrame(),
rtreeLeafFrameFactory.createFrame(),
+ flushingComponent.getRTree().getBufferCache(),
comparatorFields);
+
+ // BulkLoad the tuples from the in-memory tree into the new disk
+ // RTree.
+ boolean isEmpty = true;
+ try {
+ while (rtreeScanCursor.hasNext()) {
+ isEmpty = false;
+ rtreeScanCursor.next();
+ rTreeTupleSorter.insertTupleEntry(rtreeScanCursor.getPageId(),
rtreeScanCursor.getTupleOffset());
+ }
+ } finally {
+ rtreeScanCursor.close();
+ }
+ rTreeTupleSorter.sort();
+
+ cursor = rTreeTupleSorter;
+
+ if (!isEmpty) {
+ try {
+ while (cursor.hasNext()) {
+ cursor.next();
+ ITupleReference frameTuple = cursor.getTuple();
+ componentBulkLoader.add(frameTuple);
+ }
+ } finally {
+ cursor.close();
+ }
+ }
IIndexCursor btreeScanCursor =
memBTreeAccessor.createSearchCursor(false);
memBTreeAccessor.search(btreeScanCursor, btreeNullPredicate);
- BTree diskBTree = component.getBTree();
- // BulkLoad the tuples from the in-memory tree into the new disk BTree.
- IIndexBulkLoader bTreeBulkloader = diskBTree.createBulkLoader(1.0f,
false, numBTreeTuples, false);
- IIndexBulkLoader builder =
component.getBloomFilter().createBuilder(numBTreeTuples,
- bloomFilterSpec.getNumHashes(),
bloomFilterSpec.getNumBucketsPerElements());
// scan the memory BTree
try {
while (btreeScanCursor.hasNext()) {
btreeScanCursor.next();
ITupleReference frameTuple = btreeScanCursor.getTuple();
- bTreeBulkloader.add(frameTuple);
- builder.add(frameTuple);
+ componentBulkLoader.addDeletedTuple(frameTuple);
}
} finally {
btreeScanCursor.close();
- builder.end();
}
if (component.getLSMComponentFilter() != null) {
@@ -319,7 +306,8 @@
}
// Note. If we change the filter to write to metadata object, we don't
need the if block above
flushingComponent.getMetadata().copy(component.getMetadata());
- bTreeBulkloader.end();
+
+ componentBulkLoader.end();
return component;
}
@@ -350,10 +338,10 @@
LSMRTreeDiskComponent mergedComponent =
createDiskComponent(componentFactory, mergeOp.getRTreeMergeTarget(),
mergeOp.getBTreeMergeTarget(),
mergeOp.getBloomFilterMergeTarget(), true);
+ ILSMDiskComponentWithBuddyBTreeBulkLoader componentBulkLoader;
+
// In case we must keep the deleted-keys BTrees, then they must be
merged *before* merging the r-trees so that
// lsmHarness.endSearch() is called once when the r-trees have been
merged.
- BTree btree = mergedComponent.getBTree();
- IIndexBulkLoader btreeBulkLoader = btree.createBulkLoader(1.0f, true,
0L, false);
if
(mergeOp.getMergingComponents().get(mergeOp.getMergingComponents().size() - 1)
!= diskComponents
.get(diskComponents.size() - 1)) {
// Keep the deleted tuples since the oldest disk component is not
included in the merge operation
@@ -366,24 +354,31 @@
numElements += ((LSMRTreeDiskComponent)
mergeOp.getMergingComponents().get(i)).getBloomFilter()
.getNumElements();
}
-
- int maxBucketsPerElement =
BloomCalculations.maxBucketsPerElement(numElements);
- BloomFilterSpecification bloomFilterSpec =
- BloomCalculations.computeBloomSpec(maxBucketsPerElement,
bloomFilterFalsePositiveRate);
- IIndexBulkLoader builder =
mergedComponent.getBloomFilter().createBuilder(numElements,
- bloomFilterSpec.getNumHashes(),
bloomFilterSpec.getNumBucketsPerElements());
+ componentBulkLoader =
+ createComponentBulkLoader(mergedComponent, true, 1.0f,
false, numElements, false, true);
try {
while (btreeCursor.hasNext()) {
btreeCursor.next();
ITupleReference tuple = btreeCursor.getTuple();
- btreeBulkLoader.add(tuple);
- builder.add(tuple);
+ componentBulkLoader.addDeletedTuple(tuple);
}
} finally {
btreeCursor.close();
- builder.end();
}
+ } else {
+ //no buddy-btree needed
+ componentBulkLoader = createComponentBulkLoader(mergedComponent,
true, 1.0f, false, 0L, false, true);
+ }
+
+ try {
+ while (cursor.hasNext()) {
+ cursor.next();
+ ITupleReference frameTuple = cursor.getTuple();
+ componentBulkLoader.add(frameTuple);
+ }
+ } finally {
+ cursor.close();
}
if (mergedComponent.getLSMComponentFilter() != null) {
@@ -395,19 +390,8 @@
filterManager.updateFilter(mergedComponent.getLSMComponentFilter(),
filterTuples);
filterManager.writeFilter(mergedComponent.getLSMComponentFilter(),
mergedComponent.getRTree());
}
- btreeBulkLoader.end();
- IIndexBulkLoader bulkLoader =
mergedComponent.getRTree().createBulkLoader(1.0f, false, 0L, false);
- try {
- while (cursor.hasNext()) {
- cursor.next();
- ITupleReference frameTuple = cursor.getTuple();
- bulkLoader.add(frameTuple);
- }
- } finally {
- cursor.close();
- }
- bulkLoader.end();
+ componentBulkLoader.end();
return mergedComponent;
}
@@ -464,6 +448,24 @@
componentFileRefs.getDeleteIndexFileReference(),
componentFileRefs.getBloomFilterFileReference(), true);
}
+ public ILSMDiskComponentWithBuddyBTreeBulkLoader
createComponentBulkLoader(LSMRTreeDiskComponent component,
+ boolean hasBloomFilter, float fillFactor, boolean verifyInput,
long numElementsHint,
+ boolean checkIfEmptyIndex, boolean withFilter) throws
HyracksDataException {
+ BloomFilterSpecification bloomFilterSpec = null;
+ if (hasBloomFilter) {
+ int maxBucketsPerElement =
BloomCalculations.maxBucketsPerElement(numElementsHint);
+ bloomFilterSpec =
BloomCalculations.computeBloomSpec(maxBucketsPerElement,
bloomFilterFalsePositiveRate);
+ }
+ if (withFilter && filterFields != null) {
+ return new LSMRTreeDiskComponentBulkLoader(component,
bloomFilterSpec, fillFactor, verifyInput,
+ numElementsHint, checkIfEmptyIndex, filterManager,
rtreeFields, filterFields,
+
MultiComparator.create(component.getLSMComponentFilter().getFilterCmpFactories()));
+ } else {
+ return new LSMRTreeDiskComponentBulkLoader(component,
bloomFilterSpec, fillFactor, verifyInput,
+ numElementsHint, checkIfEmptyIndex);
+ }
+ }
+
@Override
public IIndexBulkLoader createBulkLoader(float fillLevel, boolean
verifyInput, long numElementsHint,
boolean checkIfEmptyIndex) throws HyracksDataException {
@@ -512,13 +514,7 @@
public class LSMRTreeBulkLoader implements IIndexBulkLoader {
private final ILSMDiskComponent component;
- private final IIndexBulkLoader bulkLoader;
- private final IIndexBulkLoader buddyBTreeBulkloader;
- private boolean cleanedUpArtifacts = false;
- private boolean isEmptyComponent = true;
- public final PermutingTupleReference indexTuple;
- public final PermutingTupleReference filterTuple;
- public final MultiComparator filterCmp;
+ private final ILSMDiskComponentWithBuddyBTreeBulkLoader
componentBulkLoader;
public LSMRTreeBulkLoader(float fillFactor, boolean verifyInput, long
numElementsHint,
boolean checkIfEmptyIndex) throws HyracksDataException {
@@ -528,88 +524,23 @@
// Note that by using a flush target file name, we state that the
// new bulk loaded tree is "newer" than any other merged tree.
component = createBulkLoadTarget();
- bulkLoader = ((LSMRTreeDiskComponent)
component).getRTree().createBulkLoader(fillFactor, verifyInput,
- numElementsHint, false);
- buddyBTreeBulkloader = ((LSMRTreeDiskComponent)
component).getBTree().createBulkLoader(fillFactor,
- verifyInput, numElementsHint, false);
- if (filterFields != null) {
- indexTuple = new PermutingTupleReference(rtreeFields);
- filterCmp =
MultiComparator.create(component.getLSMComponentFilter().getFilterCmpFactories());
- filterTuple = new PermutingTupleReference(filterFields);
- } else {
- indexTuple = null;
- filterCmp = null;
- filterTuple = null;
- }
+ componentBulkLoader =
createComponentBulkLoader((LSMRTreeDiskComponent) component, false, fillFactor,
+ verifyInput, numElementsHint, checkIfEmptyIndex, true);
}
@Override
public void add(ITupleReference tuple) throws HyracksDataException {
- try {
- ITupleReference t;
- if (indexTuple != null) {
- indexTuple.reset(tuple);
- t = indexTuple;
- } else {
- t = tuple;
- }
-
- bulkLoader.add(t);
-
- if (filterTuple != null) {
- filterTuple.reset(tuple);
- component.getLSMComponentFilter().update(filterTuple,
filterCmp);
- }
- } catch (Exception e) {
- cleanupArtifacts();
- throw e;
- }
- if (isEmptyComponent) {
- isEmptyComponent = false;
- }
+ componentBulkLoader.add(tuple);
}
@Override
public void end() throws HyracksDataException {
- if (!cleanedUpArtifacts) {
-
- if (component.getLSMComponentFilter() != null) {
-
filterManager.writeFilter(component.getLSMComponentFilter(),
- ((LSMRTreeDiskComponent) component).getRTree());
- }
-
- bulkLoader.end();
- buddyBTreeBulkloader.end();
-
- if (isEmptyComponent) {
- cleanupArtifacts();
- } else {
- ioOpCallback.afterOperation(LSMOperationType.FLUSH, null,
component);
- lsmHarness.addBulkLoadedComponent(component);
- }
- }
+ componentBulkLoader.end();
}
@Override
public void abort() throws HyracksDataException {
- if (bulkLoader != null) {
- bulkLoader.abort();
- }
- if (buddyBTreeBulkloader != null) {
- buddyBTreeBulkloader.abort();
- }
- }
-
- protected void cleanupArtifacts() throws HyracksDataException {
- if (!cleanedUpArtifacts) {
- cleanedUpArtifacts = true;
- ((LSMRTreeDiskComponent) component).getRTree().deactivate();
- ((LSMRTreeDiskComponent) component).getRTree().destroy();
- ((LSMRTreeDiskComponent) component).getBTree().deactivate();
- ((LSMRTreeDiskComponent) component).getBTree().destroy();
- ((LSMRTreeDiskComponent)
component).getBloomFilter().deactivate();
- ((LSMRTreeDiskComponent) component).getBloomFilter().destroy();
- }
+ componentBulkLoader.abort();
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDiskComponentBulkLoader.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDiskComponentBulkLoader.java
new file mode 100644
index 0000000..34a9be0
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDiskComponentBulkLoader.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.rtree.impls;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilter;
+import
org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilterSpecification;
+import org.apache.hyracks.storage.am.common.api.ITreeIndex;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilterManager;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import
org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMDiskComponentWithBuddyBTreeBulkLoader;
+import org.apache.hyracks.storage.common.IIndex;
+import org.apache.hyracks.storage.common.MultiComparator;
+
+public class LSMRTreeDiskComponentBulkLoader extends
AbstractLSMDiskComponentWithBuddyBTreeBulkLoader {
+
+ //with filter
+ public LSMRTreeDiskComponentBulkLoader(LSMRTreeDiskComponent component,
BloomFilterSpecification bloomFilterSpec,
+ float fillFactor, boolean verifyInput, long numElementsHint,
boolean checkIfEmptyIndex,
+ ILSMComponentFilterManager filterManager, int[] indexFields, int[]
filterFields, MultiComparator filterCmp)
+ throws HyracksDataException {
+ super(component, bloomFilterSpec, fillFactor, verifyInput,
numElementsHint, checkIfEmptyIndex, filterManager,
+ indexFields, filterFields, filterCmp);
+ }
+
+ //without filter
+ public LSMRTreeDiskComponentBulkLoader(LSMRTreeDiskComponent component,
BloomFilterSpecification bloomFilterSpec,
+ float fillFactor, boolean verifyInput, long numElementsHint,
boolean checkIfEmptyIndex)
+ throws HyracksDataException {
+ super(component, bloomFilterSpec, fillFactor, verifyInput,
numElementsHint, checkIfEmptyIndex, null, null, null,
+ null);
+ }
+
+ @Override
+ protected BloomFilter getBloomFilter(ILSMDiskComponent component) {
+ return ((LSMRTreeDiskComponent) component).getBloomFilter();
+ }
+
+ @Override
+ protected IIndex getIndex(ILSMDiskComponent component) {
+ return ((LSMRTreeDiskComponent) component).getRTree();
+ }
+
+ @Override
+ protected ITreeIndex getBuddyBTree(ILSMDiskComponent component) {
+ return ((LSMRTreeDiskComponent) component).getBTree();
+ }
+
+}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
index 018e6d3..ca5b45c 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
@@ -37,11 +37,11 @@
import org.apache.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
-import org.apache.hyracks.storage.am.common.tuples.PermutingTupleReference;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilterFactory;
import
org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilterFrameFactory;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import
org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentBulkLoader;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentFactory;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMHarness;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
@@ -179,7 +179,6 @@
memRTreeAccessor.search(rtreeScanCursor, rtreeNullPredicate);
LSMRTreeDiskComponent component =
createDiskComponent(componentFactory,
flushOp.getRTreeFlushTarget(), null, null, true);
- RTree diskRTree = component.getRTree();
// scan the memory BTree
ITreeIndexAccessor memBTreeAccessor = flushingComponent.getBTree()
@@ -228,7 +227,9 @@
bTreeTupleSorter.sort();
}
- IIndexBulkLoader rTreeBulkloader = diskRTree.createBulkLoader(1.0f,
false, 0L, false);
+ ILSMDiskComponentBulkLoader componentBulkLoader =
+ createComponentBulkLoader(component, 1.0f, false, 0L, false,
false);
+
LSMRTreeWithAntiMatterTuplesFlushCursor cursor = new
LSMRTreeWithAntiMatterTuplesFlushCursor(rTreeTupleSorter,
bTreeTupleSorter, comparatorFields, linearizerArray);
cursor.open(null, null);
@@ -238,7 +239,7 @@
cursor.next();
ITupleReference frameTuple = cursor.getTuple();
- rTreeBulkloader.add(frameTuple);
+ componentBulkLoader.add(frameTuple);
}
} finally {
cursor.close();
@@ -252,8 +253,8 @@
filterManager.writeFilter(component.getLSMComponentFilter(),
component.getRTree());
}
flushingComponent.getMetadata().copy(component.getMetadata());
- rTreeBulkloader.end();
+ componentBulkLoader.end();
return component;
}
@@ -287,13 +288,13 @@
// Bulk load the tuples from all on-disk RTrees into the new RTree.
LSMRTreeDiskComponent component =
createDiskComponent(componentFactory,
mergeOp.getRTreeMergeTarget(), null, null, true);
- RTree mergedRTree = component.getRTree();
- IIndexBulkLoader bulkloader = mergedRTree.createBulkLoader(1.0f,
false, 0L, false);
+ ILSMDiskComponentBulkLoader componentBulkLoader =
+ createComponentBulkLoader(component, 1.0f, false, 0L, false,
false);
try {
while (cursor.hasNext()) {
cursor.next();
ITupleReference frameTuple = cursor.getTuple();
- bulkloader.add(frameTuple);
+ componentBulkLoader.add(frameTuple);
}
} finally {
cursor.close();
@@ -307,7 +308,8 @@
filterManager.updateFilter(component.getLSMComponentFilter(),
filterTuples);
filterManager.writeFilter(component.getLSMComponentFilter(),
component.getRTree());
}
- bulkloader.end();
+
+ componentBulkLoader.end();
return component;
}
@@ -335,6 +337,19 @@
}
}
+ public ILSMDiskComponentBulkLoader
createComponentBulkLoader(LSMRTreeDiskComponent component, float fillFactor,
+ boolean verifyInput, long numElementsHint, boolean
checkIfEmptyIndex, boolean withFilter)
+ throws HyracksDataException {
+ if (withFilter && filterFields != null) {
+ return new
LSMRTreeWithAntiMatterTuplesDiskComponentBulkLoader(component, null,
fillFactor, verifyInput,
+ numElementsHint, checkIfEmptyIndex, filterManager,
rtreeFields, filterFields,
+
MultiComparator.create(component.getLSMComponentFilter().getFilterCmpFactories()));
+ } else {
+ return new
LSMRTreeWithAntiMatterTuplesDiskComponentBulkLoader(component, null,
fillFactor, verifyInput,
+ numElementsHint, checkIfEmptyIndex);
+ }
+ }
+
@Override
public IIndexBulkLoader createBulkLoader(float fillLevel, boolean
verifyInput, long numElementsHint,
boolean checkIfEmptyIndex) throws HyracksDataException {
@@ -343,12 +358,7 @@
public class LSMRTreeWithAntiMatterTuplesBulkLoader implements
IIndexBulkLoader {
private final ILSMDiskComponent component;
- private final IIndexBulkLoader bulkLoader;
- private boolean cleanedUpArtifacts = false;
- private boolean isEmptyComponent = true;
- public final PermutingTupleReference indexTuple;
- public final PermutingTupleReference filterTuple;
- public final MultiComparator filterCmp;
+ private final ILSMDiskComponentBulkLoader bulkLoader;
public LSMRTreeWithAntiMatterTuplesBulkLoader(float fillFactor,
boolean verifyInput, long numElementsHint,
boolean checkIfEmptyIndex) throws HyracksDataException {
@@ -359,63 +369,21 @@
// new bulk loaded tree is "newer" than any other merged tree.
component = createBulkLoadTarget();
- bulkLoader = ((LSMRTreeDiskComponent)
component).getRTree().createBulkLoader(fillFactor, verifyInput,
- numElementsHint, false);
-
- if (filterFields != null) {
- indexTuple = new PermutingTupleReference(rtreeFields);
- filterCmp =
MultiComparator.create(component.getLSMComponentFilter().getFilterCmpFactories());
- filterTuple = new PermutingTupleReference(filterFields);
- } else {
- indexTuple = null;
- filterCmp = null;
- filterTuple = null;
- }
+ bulkLoader = createComponentBulkLoader((LSMRTreeDiskComponent)
component, fillFactor, verifyInput,
+ numElementsHint, checkIfEmptyIndex, true);
}
@Override
public void add(ITupleReference tuple) throws HyracksDataException {
- try {
- ITupleReference t;
- if (indexTuple != null) {
- indexTuple.reset(tuple);
- t = indexTuple;
- } else {
- t = tuple;
- }
-
- bulkLoader.add(t);
-
- if (filterTuple != null) {
- filterTuple.reset(tuple);
- component.getLSMComponentFilter().update(filterTuple,
filterCmp);
- }
-
- } catch (Exception e) {
- cleanupArtifacts();
- throw e;
- }
- if (isEmptyComponent) {
- isEmptyComponent = false;
- }
+ bulkLoader.add(tuple);
}
@Override
public void end() throws HyracksDataException {
- if (!cleanedUpArtifacts) {
-
- if (component.getLSMComponentFilter() != null) {
-
filterManager.writeFilter(component.getLSMComponentFilter(),
- ((LSMRTreeDiskComponent) component).getRTree());
- }
- bulkLoader.end();
-
- if (isEmptyComponent) {
- cleanupArtifacts();
- } else {
- ioOpCallback.afterOperation(LSMOperationType.FLUSH, null,
component);
- lsmHarness.addBulkLoadedComponent(component);
- }
+ bulkLoader.end();
+ if (component.getComponentSize() > 0) {
+ ioOpCallback.afterOperation(LSMOperationType.FLUSH, null,
component);
+ lsmHarness.addBulkLoadedComponent(component);
}
}
@@ -423,14 +391,6 @@
public void abort() throws HyracksDataException {
if (bulkLoader != null) {
bulkLoader.abort();
- }
- }
-
- protected void cleanupArtifacts() throws HyracksDataException {
- if (!cleanedUpArtifacts) {
- cleanedUpArtifacts = true;
- ((LSMRTreeDiskComponent) component).getRTree().deactivate();
- ((LSMRTreeDiskComponent) component).getRTree().destroy();
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesDiskComponentBulkLoader.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesDiskComponentBulkLoader.java
new file mode 100644
index 0000000..88a2e1e
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesDiskComponentBulkLoader.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.rtree.impls;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilter;
+import
org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilterSpecification;
+import org.apache.hyracks.storage.am.common.api.ITreeIndex;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilterManager;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import
org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMDiskComponentBulkLoader;
+import org.apache.hyracks.storage.common.MultiComparator;
+
+public class LSMRTreeWithAntiMatterTuplesDiskComponentBulkLoader extends
AbstractLSMDiskComponentBulkLoader {
+
+ //with filter
+ public
LSMRTreeWithAntiMatterTuplesDiskComponentBulkLoader(LSMRTreeDiskComponent
component,
+ BloomFilterSpecification bloomFilterSpec, float fillFactor,
boolean verifyInput, long numElementsHint,
+ boolean checkIfEmptyIndex, ILSMComponentFilterManager
filterManager, int[] indexFields, int[] filterFields,
+ MultiComparator filterCmp) throws HyracksDataException {
+ super(component, bloomFilterSpec, fillFactor, verifyInput,
numElementsHint, checkIfEmptyIndex, filterManager,
+ indexFields, filterFields, filterCmp);
+ }
+
+ //without filter
+ public
LSMRTreeWithAntiMatterTuplesDiskComponentBulkLoader(LSMRTreeDiskComponent
component,
+ BloomFilterSpecification bloomFilterSpec, float fillFactor,
boolean verifyInput, long numElementsHint,
+ boolean checkIfEmptyIndex) throws HyracksDataException {
+ super(component, bloomFilterSpec, fillFactor, verifyInput,
numElementsHint, checkIfEmptyIndex, null, null, null,
+ null);
+ }
+
+ @Override
+ protected BloomFilter getBloomFilter(ILSMDiskComponent component) {
+ return ((LSMRTreeDiskComponent) component).getBloomFilter();
+ }
+
+ @Override
+ protected ITreeIndex getIndex(ILSMDiskComponent component) {
+ return ((LSMRTreeDiskComponent) component).getRTree();
+ }
+
+}
--
To view, visit https://asterix-gerrit.ics.uci.edu/1773
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I772a6d68761fcbb85982a1c9f72f2d186e1d1ffb
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Luo Chen <[email protected]>