Wail Alkowaileet has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/2367
Change subject: [WIP] Extract bulk loader out
......................................................................
[WIP] Extract bulk loader out
TODO:
- Clean up InvertedIndex bulkloader
- Extract external indexes bulkloader
Change-Id: I7f42a391a4de4b02acf6a8fdaf2b60818c1da806
---
M
hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTree.java
A
hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTreeBulkLoader.java
A
hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IBulkLoadFinalizer.java
M
hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndex.java
A
hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndexBulkLoader.java
M
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexBulkLoader.java
M
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java
A
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndexBulkLoader.java
M
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/PartitionedOnDiskInvertedIndex.java
M
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/ExternalRTree.java
M
hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTree.java
A
hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTreeBulkLoader.java
12 files changed, 915 insertions(+), 690 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/67/2367/1
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTree.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTree.java
index 6e2d694..98cb391 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTree.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTree.java
@@ -41,7 +41,6 @@
import
org.apache.hyracks.storage.am.btree.impls.BTreeOpContext.PageValidationInfo;
import org.apache.hyracks.storage.am.common.api.IBTreeIndexTupleReference;
import org.apache.hyracks.storage.am.common.api.IPageManager;
-import org.apache.hyracks.storage.am.common.api.ISplitKey;
import org.apache.hyracks.storage.am.common.api.ITreeIndexAccessor;
import org.apache.hyracks.storage.am.common.api.ITreeIndexCursor;
import org.apache.hyracks.storage.am.common.api.ITreeIndexFrame;
@@ -50,7 +49,6 @@
import org.apache.hyracks.storage.am.common.frames.FrameOpSpaceStatus;
import org.apache.hyracks.storage.am.common.impls.AbstractTreeIndex;
import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters;
-import org.apache.hyracks.storage.am.common.impls.NodeFrontier;
import org.apache.hyracks.storage.am.common.impls.TreeIndexDiskOrderScanCursor;
import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
import org.apache.hyracks.storage.common.IIndexAccessParameters;
@@ -61,7 +59,6 @@
import org.apache.hyracks.storage.common.ISearchOperationCallback;
import org.apache.hyracks.storage.common.ISearchPredicate;
import org.apache.hyracks.storage.common.MultiComparator;
-import org.apache.hyracks.storage.common.buffercache.BufferCache;
import org.apache.hyracks.storage.common.buffercache.IBufferCache;
import org.apache.hyracks.storage.common.buffercache.ICachedPage;
import org.apache.hyracks.storage.common.file.BufferedFileHandle;
@@ -1000,204 +997,7 @@
@Override
public IIndexBulkLoader createBulkLoader(float fillFactor, boolean
verifyInput, long numElementsHint,
boolean checkIfEmptyIndex) throws HyracksDataException {
- return new BTreeBulkLoader(fillFactor, verifyInput);
- }
-
- public class BTreeBulkLoader extends
AbstractTreeIndex.AbstractTreeIndexBulkLoader {
- protected final ISplitKey splitKey;
- protected final boolean verifyInput;
-
- public BTreeBulkLoader(float fillFactor, boolean verifyInput) throws
HyracksDataException {
- super(fillFactor);
- this.verifyInput = verifyInput;
- splitKey = new
BTreeSplitKey(leafFrame.getTupleWriter().createTupleReference());
- splitKey.getTuple().setFieldCount(cmp.getKeyFieldCount());
- }
-
- @Override
- public void add(ITupleReference tuple) throws HyracksDataException {
- try {
- int tupleSize =
Math.max(leafFrame.getBytesRequiredToWriteTuple(tuple),
- interiorFrame.getBytesRequiredToWriteTuple(tuple));
-
- NodeFrontier leafFrontier = nodeFrontiers.get(0);
-
- int spaceNeeded = tupleWriter.bytesRequired(tuple) + slotSize;
- int spaceUsed = leafFrame.getBuffer().capacity() -
leafFrame.getTotalFreeSpace();
-
- // try to free space by compression
- if (spaceUsed + spaceNeeded > leafMaxBytes) {
- leafFrame.compress();
- spaceUsed = leafFrame.getBuffer().capacity() -
leafFrame.getTotalFreeSpace();
- }
- //full, allocate new page
- if (spaceUsed + spaceNeeded > leafMaxBytes) {
- if (leafFrame.getTupleCount() == 0) {
- bufferCache.returnPage(leafFrontier.page, false);
- } else {
- leafFrontier.lastTuple.resetByTupleIndex(leafFrame,
leafFrame.getTupleCount() - 1);
- if (verifyInput) {
- verifyInputTuple(tuple, leafFrontier.lastTuple);
- }
- int splitKeySize =
tupleWriter.bytesRequired(leafFrontier.lastTuple, 0, cmp.getKeyFieldCount());
- splitKey.initData(splitKeySize);
- tupleWriter.writeTupleFields(leafFrontier.lastTuple,
0, cmp.getKeyFieldCount(),
- splitKey.getBuffer().array(), 0);
-
splitKey.getTuple().resetByTupleOffset(splitKey.getBuffer().array(), 0);
- splitKey.setLeftPage(leafFrontier.pageId);
-
- propagateBulk(1, pagesToWrite);
-
- leafFrontier.pageId =
freePageManager.takePage(metaFrame);
-
- ((IBTreeLeafFrame)
leafFrame).setNextLeaf(leafFrontier.pageId);
-
- queue.put(leafFrontier.page);
- for (ICachedPage c : pagesToWrite) {
- queue.put(c);
- }
- pagesToWrite.clear();
-
- splitKey.setRightPage(leafFrontier.pageId);
- }
- if (tupleSize > maxTupleSize) {
- final long dpid =
BufferedFileHandle.getDiskPageId(getFileId(), leafFrontier.pageId);
- // calculate required number of pages.
- int headerSize =
Math.max(leafFrame.getPageHeaderSize(), interiorFrame.getPageHeaderSize());
- final int multiplier =
- (int) Math.ceil((double) tupleSize /
(bufferCache.getPageSize() - headerSize));
- if (multiplier > 1) {
- leafFrontier.page =
bufferCache.confiscateLargePage(dpid, multiplier,
- freePageManager.takeBlock(metaFrame,
multiplier - 1));
- } else {
- leafFrontier.page =
bufferCache.confiscatePage(dpid);
- }
- leafFrame.setPage(leafFrontier.page);
- leafFrame.initBuffer((byte) 0);
- ((IBTreeLeafFrame) leafFrame).setLargeFlag(true);
- } else {
- final long dpid =
BufferedFileHandle.getDiskPageId(getFileId(), leafFrontier.pageId);
- leafFrontier.page = bufferCache.confiscatePage(dpid);
- leafFrame.setPage(leafFrontier.page);
- leafFrame.initBuffer((byte) 0);
- }
- } else {
- if (verifyInput && leafFrame.getTupleCount() > 0) {
- leafFrontier.lastTuple.resetByTupleIndex(leafFrame,
leafFrame.getTupleCount() - 1);
- verifyInputTuple(tuple, leafFrontier.lastTuple);
- }
- }
- ((IBTreeLeafFrame) leafFrame).insertSorted(tuple);
- } catch (HyracksDataException | RuntimeException e) {
- handleException();
- throw e;
- }
- }
-
- protected void verifyInputTuple(ITupleReference tuple, ITupleReference
prevTuple) throws HyracksDataException {
- // New tuple should be strictly greater than last tuple.
- int cmpResult = cmp.compare(tuple, prevTuple);
- if (cmpResult < 0) {
- throw
HyracksDataException.create(ErrorCode.UNSORTED_LOAD_INPUT);
- }
- if (cmpResult == 0) {
- throw
HyracksDataException.create(ErrorCode.DUPLICATE_LOAD_INPUT);
- }
- }
-
- protected void propagateBulk(int level, List<ICachedPage>
pagesToWrite) throws HyracksDataException {
- if (splitKey.getBuffer() == null) {
- return;
- }
-
- if (level >= nodeFrontiers.size()) {
- addLevel();
- }
-
- NodeFrontier frontier = nodeFrontiers.get(level);
- interiorFrame.setPage(frontier.page);
-
- ITupleReference tuple = splitKey.getTuple();
- int tupleBytes = tupleWriter.bytesRequired(tuple, 0,
cmp.getKeyFieldCount());
- int spaceNeeded = tupleBytes + slotSize + 4;
- if (tupleBytes >
interiorFrame.getMaxTupleSize(BTree.this.bufferCache.getPageSize())) {
- throw
HyracksDataException.create(ErrorCode.RECORD_IS_TOO_LARGE, tupleBytes,
-
interiorFrame.getMaxTupleSize(BTree.this.bufferCache.getPageSize()));
- }
-
- int spaceUsed = interiorFrame.getBuffer().capacity() -
interiorFrame.getTotalFreeSpace();
- if (spaceUsed + spaceNeeded > interiorMaxBytes) {
-
- ISplitKey copyKey =
splitKey.duplicate(leafFrame.getTupleWriter().createTupleReference());
- tuple = copyKey.getTuple();
-
- frontier.lastTuple.resetByTupleIndex(interiorFrame,
interiorFrame.getTupleCount() - 1);
- int splitKeySize =
tupleWriter.bytesRequired(frontier.lastTuple, 0, cmp.getKeyFieldCount());
- splitKey.initData(splitKeySize);
- tupleWriter.writeTupleFields(frontier.lastTuple, 0,
cmp.getKeyFieldCount(),
- splitKey.getBuffer().array(), 0);
-
splitKey.getTuple().resetByTupleOffset(splitKey.getBuffer().array(), 0);
-
- ((IBTreeInteriorFrame) interiorFrame).deleteGreatest();
- int finalPageId = freePageManager.takePage(metaFrame);
-
frontier.page.setDiskPageId(BufferedFileHandle.getDiskPageId(getFileId(),
finalPageId));
- pagesToWrite.add(frontier.page);
- splitKey.setLeftPage(finalPageId);
-
- propagateBulk(level + 1, pagesToWrite);
- frontier.page =
bufferCache.confiscatePage(BufferCache.INVALID_DPID);
- interiorFrame.setPage(frontier.page);
- interiorFrame.initBuffer((byte) level);
- }
- ((IBTreeInteriorFrame) interiorFrame).insertSorted(tuple);
- }
-
- private void persistFrontiers(int level, int rightPage) throws
HyracksDataException {
- if (level >= nodeFrontiers.size()) {
- rootPage = nodeFrontiers.get(level - 1).pageId;
- releasedLatches = true;
- return;
- }
- if (level < 1) {
- ICachedPage lastLeaf = nodeFrontiers.get(level).page;
- int lastLeafPage = nodeFrontiers.get(level).pageId;
-
lastLeaf.setDiskPageId(BufferedFileHandle.getDiskPageId(getFileId(),
nodeFrontiers.get(level).pageId));
- queue.put(lastLeaf);
- nodeFrontiers.get(level).page = null;
- persistFrontiers(level + 1, lastLeafPage);
- return;
- }
- NodeFrontier frontier = nodeFrontiers.get(level);
- interiorFrame.setPage(frontier.page);
- //just finalize = the layer right above the leaves has correct
righthand pointers already
- if (rightPage < 0) {
- throw new HyracksDataException(
- "Error in index creation. Internal node appears to
have no rightmost guide");
- }
- ((IBTreeInteriorFrame)
interiorFrame).setRightmostChildPageId(rightPage);
- int finalPageId = freePageManager.takePage(metaFrame);
-
frontier.page.setDiskPageId(BufferedFileHandle.getDiskPageId(getFileId(),
finalPageId));
- queue.put(frontier.page);
- frontier.pageId = finalPageId;
-
- persistFrontiers(level + 1, finalPageId);
- }
-
- @Override
- public void end() throws HyracksDataException {
- try {
- persistFrontiers(0, -1);
- super.end();
- } catch (HyracksDataException | RuntimeException e) {
- handleException();
- throw e;
- }
- }
-
- @Override
- public void abort() throws HyracksDataException {
- super.handleException();
- }
+ return new BTreeBulkLoader(this, fillFactor, verifyInput,
maxTupleSize, rootPageSetter);
}
@SuppressWarnings("rawtypes")
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTreeBulkLoader.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTreeBulkLoader.java
new file mode 100644
index 0000000..660e7d8
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTreeBulkLoader.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.btree.impls;
+
+import java.util.List;
+
+import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.storage.am.btree.api.IBTreeInteriorFrame;
+import org.apache.hyracks.storage.am.btree.api.IBTreeLeafFrame;
+import org.apache.hyracks.storage.am.common.api.IBulkLoadFinalizer;
+import org.apache.hyracks.storage.am.common.api.ISplitKey;
+import org.apache.hyracks.storage.am.common.impls.AbstractTreeIndexBulkLoader;
+import org.apache.hyracks.storage.am.common.impls.NodeFrontier;
+import org.apache.hyracks.storage.common.buffercache.BufferCache;
+import org.apache.hyracks.storage.common.buffercache.ICachedPage;
+import org.apache.hyracks.storage.common.file.BufferedFileHandle;
+
+public class BTreeBulkLoader extends AbstractTreeIndexBulkLoader {
+ protected final ISplitKey splitKey;
+ protected final boolean verifyInput;
+
+ public BTreeBulkLoader(BTree index, float fillFactor, boolean verifyInput,
int maxTupleSize,
+ IBulkLoadFinalizer<Void, Integer> finalizer) throws
HyracksDataException {
+ super(index, fillFactor, maxTupleSize, finalizer);
+ this.verifyInput = verifyInput;
+ splitKey = new
BTreeSplitKey(leafFrame.getTupleWriter().createTupleReference());
+ splitKey.getTuple().setFieldCount(cmp.getKeyFieldCount());
+ }
+
+ @Override
+ public void add(ITupleReference tuple) throws HyracksDataException {
+ try {
+ int tupleSize =
Math.max(leafFrame.getBytesRequiredToWriteTuple(tuple),
+ interiorFrame.getBytesRequiredToWriteTuple(tuple));
+
+ NodeFrontier leafFrontier = nodeFrontiers.get(0);
+
+ int spaceNeeded = tupleWriter.bytesRequired(tuple) + slotSize;
+ int spaceUsed = getFrameUsedSpace(leafFrame);
+
+ // try to free space by compression
+ if (spaceUsed + spaceNeeded > leafMaxBytes) {
+ leafFrame.compress();
+ spaceUsed = getFrameUsedSpace(leafFrame);
+ }
+ //full, allocate new page
+ if (spaceUsed + spaceNeeded > leafMaxBytes) {
+ if (leafFrame.getTupleCount() == 0) {
+ bufferCache.returnPage(leafFrontier.page, false);
+ } else {
+ leafFrontier.lastTuple.resetByTupleIndex(leafFrame,
leafFrame.getTupleCount() - 1);
+ if (verifyInput) {
+ verifyInputTuple(tuple, leafFrontier.lastTuple);
+ }
+ int splitKeySize =
tupleWriter.bytesRequired(leafFrontier.lastTuple, 0, cmp.getKeyFieldCount());
+ splitKey.initData(splitKeySize);
+ tupleWriter.writeTupleFields(leafFrontier.lastTuple, 0,
cmp.getKeyFieldCount(),
+ splitKey.getBuffer().array(), 0);
+
splitKey.getTuple().resetByTupleOffset(splitKey.getBuffer().array(), 0);
+ splitKey.setLeftPage(leafFrontier.pageId);
+
+ propagateBulk(1, pagesToWrite);
+
+ leafFrontier.pageId = freePageManager.takePage(metaFrame);
+
+ ((IBTreeLeafFrame)
leafFrame).setNextLeaf(leafFrontier.pageId);
+
+ queue.put(leafFrontier.page);
+ for (ICachedPage c : pagesToWrite) {
+ queue.put(c);
+ }
+ pagesToWrite.clear();
+
+ splitKey.setRightPage(leafFrontier.pageId);
+ }
+ if (tupleSize > maxTupleSize) {
+ final long dpid = BufferedFileHandle.getDiskPageId(fileId,
leafFrontier.pageId);
+ // calculate required number of pages.
+ int headerSize = Math.max(leafFrame.getPageHeaderSize(),
interiorFrame.getPageHeaderSize());
+ final int multiplier =
+ (int) Math.ceil((double) tupleSize /
(bufferCache.getPageSize() - headerSize));
+ if (multiplier > 1) {
+ leafFrontier.page =
bufferCache.confiscateLargePage(dpid, multiplier,
+ freePageManager.takeBlock(metaFrame,
multiplier - 1));
+ } else {
+ leafFrontier.page = bufferCache.confiscatePage(dpid);
+ }
+ leafFrame.setPage(leafFrontier.page);
+ leafFrame.initBuffer((byte) 0);
+ ((IBTreeLeafFrame) leafFrame).setLargeFlag(true);
+ } else {
+ final long dpid = BufferedFileHandle.getDiskPageId(fileId,
leafFrontier.pageId);
+ leafFrontier.page = bufferCache.confiscatePage(dpid);
+ leafFrame.setPage(leafFrontier.page);
+ leafFrame.initBuffer((byte) 0);
+ }
+ } else {
+ if (verifyInput && leafFrame.getTupleCount() > 0) {
+ leafFrontier.lastTuple.resetByTupleIndex(leafFrame,
leafFrame.getTupleCount() - 1);
+ verifyInputTuple(tuple, leafFrontier.lastTuple);
+ }
+ }
+ ((IBTreeLeafFrame) leafFrame).insertSorted(tuple);
+ } catch (HyracksDataException | RuntimeException e) {
+ cleanUp();
+ throw e;
+ }
+ }
+
+ protected void verifyInputTuple(ITupleReference tuple, ITupleReference
prevTuple) throws HyracksDataException {
+ // New tuple should be strictly greater than last tuple.
+ int cmpResult = cmp.compare(tuple, prevTuple);
+ if (cmpResult < 0) {
+ throw HyracksDataException.create(ErrorCode.UNSORTED_LOAD_INPUT);
+ }
+ if (cmpResult == 0) {
+ throw HyracksDataException.create(ErrorCode.DUPLICATE_LOAD_INPUT);
+ }
+ }
+
+ protected void propagateBulk(int level, List<ICachedPage> pagesToWrite)
throws HyracksDataException {
+ if (splitKey.getBuffer() == null) {
+ return;
+ }
+
+ if (level >= nodeFrontiers.size()) {
+ addLevel();
+ }
+
+ NodeFrontier frontier = nodeFrontiers.get(level);
+ interiorFrame.setPage(frontier.page);
+
+ ITupleReference tuple = splitKey.getTuple();
+ int tupleBytes = tupleWriter.bytesRequired(tuple, 0,
cmp.getKeyFieldCount());
+ int spaceNeeded = tupleBytes + slotSize + 4;
+ if (tupleBytes >
interiorFrame.getMaxTupleSize(bufferCache.getPageSize())) {
+ throw HyracksDataException.create(ErrorCode.RECORD_IS_TOO_LARGE,
tupleBytes,
+ interiorFrame.getMaxTupleSize(bufferCache.getPageSize()));
+ }
+
+ int spaceUsed = interiorFrame.getBuffer().capacity() -
interiorFrame.getTotalFreeSpace();
+ if (spaceUsed + spaceNeeded > interiorMaxBytes) {
+
+ ISplitKey copyKey =
splitKey.duplicate(leafFrame.getTupleWriter().createTupleReference());
+ tuple = copyKey.getTuple();
+
+ frontier.lastTuple.resetByTupleIndex(interiorFrame,
interiorFrame.getTupleCount() - 1);
+ int splitKeySize = tupleWriter.bytesRequired(frontier.lastTuple,
0, cmp.getKeyFieldCount());
+ splitKey.initData(splitKeySize);
+ tupleWriter.writeTupleFields(frontier.lastTuple, 0,
cmp.getKeyFieldCount(), splitKey.getBuffer().array(),
+ 0);
+
splitKey.getTuple().resetByTupleOffset(splitKey.getBuffer().array(), 0);
+
+ ((IBTreeInteriorFrame) interiorFrame).deleteGreatest();
+ int finalPageId = freePageManager.takePage(metaFrame);
+
frontier.page.setDiskPageId(BufferedFileHandle.getDiskPageId(fileId,
finalPageId));
+ pagesToWrite.add(frontier.page);
+ splitKey.setLeftPage(finalPageId);
+
+ propagateBulk(level + 1, pagesToWrite);
+ frontier.page =
bufferCache.confiscatePage(BufferCache.INVALID_DPID);
+ interiorFrame.setPage(frontier.page);
+ interiorFrame.initBuffer((byte) level);
+ }
+ ((IBTreeInteriorFrame) interiorFrame).insertSorted(tuple);
+ }
+
+ private void persistFrontiers(int level, int rightPage) throws
HyracksDataException {
+ if (level >= nodeFrontiers.size()) {
+ rootPage = nodeFrontiers.get(level - 1).pageId;
+ releasedLatches = true;
+ return;
+ }
+ if (level < 1) {
+ ICachedPage lastLeaf = nodeFrontiers.get(level).page;
+ int lastLeafPage = nodeFrontiers.get(level).pageId;
+ lastLeaf.setDiskPageId(BufferedFileHandle.getDiskPageId(fileId,
nodeFrontiers.get(level).pageId));
+ queue.put(lastLeaf);
+ nodeFrontiers.get(level).page = null;
+ persistFrontiers(level + 1, lastLeafPage);
+ return;
+ }
+ NodeFrontier frontier = nodeFrontiers.get(level);
+ interiorFrame.setPage(frontier.page);
+ //just finalize = the layer right above the leaves has correct
righthand pointers already
+ if (rightPage < 0) {
+ throw new HyracksDataException("Error in index creation. Internal
node appears to have no rightmost guide");
+ }
+ ((IBTreeInteriorFrame)
interiorFrame).setRightmostChildPageId(rightPage);
+ int finalPageId = freePageManager.takePage(metaFrame);
+ frontier.page.setDiskPageId(BufferedFileHandle.getDiskPageId(fileId,
finalPageId));
+ queue.put(frontier.page);
+ frontier.pageId = finalPageId;
+
+ persistFrontiers(level + 1, finalPageId);
+ }
+
+ @Override
+ public void end() throws HyracksDataException {
+ try {
+ persistFrontiers(0, -1);
+ super.end();
+ } catch (HyracksDataException | RuntimeException e) {
+ cleanUp();
+ throw e;
+ }
+ }
+}
\ No newline at end of file
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IBulkLoadFinalizer.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IBulkLoadFinalizer.java
new file mode 100644
index 0000000..9419706
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IBulkLoadFinalizer.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.common.api;
+
+/**
+ * A generic API for a Bulkloader to callback the index after finishing the
bulkload operation.
+ */
+@FunctionalInterface
+public interface IBulkLoadFinalizer<R, T> {
+ public R finalizeBulkLoad(T arg);
+}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndex.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndex.java
index 905c99d..82037de 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndex.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndex.java
@@ -19,26 +19,20 @@
package org.apache.hyracks.storage.am.common.impls;
-import java.util.ArrayList;
-import java.util.List;
-
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.storage.am.common.api.IBulkLoadFinalizer;
import org.apache.hyracks.storage.am.common.api.IPageManager;
import org.apache.hyracks.storage.am.common.api.ITreeIndex;
import org.apache.hyracks.storage.am.common.api.ITreeIndexAccessor;
import org.apache.hyracks.storage.am.common.api.ITreeIndexFrame;
import org.apache.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
-import org.apache.hyracks.storage.am.common.api.ITreeIndexMetadataFrame;
-import org.apache.hyracks.storage.am.common.api.ITreeIndexTupleWriter;
import org.apache.hyracks.storage.common.IIndexBulkLoader;
-import org.apache.hyracks.storage.common.MultiComparator;
import org.apache.hyracks.storage.common.buffercache.IBufferCache;
import org.apache.hyracks.storage.common.buffercache.ICachedPage;
-import org.apache.hyracks.storage.common.buffercache.IFIFOPageQueue;
import org.apache.hyracks.storage.common.file.BufferedFileHandle;
public abstract class AbstractTreeIndex implements ITreeIndex {
@@ -55,6 +49,7 @@
protected final IBinaryComparatorFactory[] cmpFactories;
protected final int fieldCount;
+ protected final IBulkLoadFinalizer<Void, Integer> rootPageSetter;
protected FileReference file;
private int fileId = -1;
@@ -73,6 +68,10 @@
this.cmpFactories = cmpFactories;
this.fieldCount = fieldCount;
this.file = file;
+ rootPageSetter = (Integer newRootPage) -> {
+ rootPage = newRootPage;
+ return null;
+ };
}
@Override
@@ -227,99 +226,10 @@
return fieldCount;
}
- public abstract class AbstractTreeIndexBulkLoader implements
IIndexBulkLoader {
- protected final MultiComparator cmp;
- protected final int slotSize;
- protected final int leafMaxBytes;
- protected final int interiorMaxBytes;
- protected final ArrayList<NodeFrontier> nodeFrontiers = new
ArrayList<>();
- protected final ITreeIndexMetadataFrame metaFrame;
- protected final ITreeIndexTupleWriter tupleWriter;
- protected ITreeIndexFrame leafFrame;
- protected ITreeIndexFrame interiorFrame;
- // Immutable bulk loaders write their root page at page -2, as needed
e.g. by append-only file systems such as
- // HDFS. Since loading this tree relies on the root page actually
being at that point, no further inserts into
- // that tree are allowed. Currently, this is not enforced.
- protected boolean releasedLatches;
- protected final IFIFOPageQueue queue;
- protected List<ICachedPage> pagesToWrite;
-
- public AbstractTreeIndexBulkLoader(float fillFactor) throws
HyracksDataException {
- leafFrame = leafFrameFactory.createFrame();
- interiorFrame = interiorFrameFactory.createFrame();
- metaFrame = freePageManager.createMetadataFrame();
-
- queue = bufferCache.createFIFOQueue();
-
- if (!isEmptyTree(leafFrame)) {
- throw
HyracksDataException.create(ErrorCode.CANNOT_BULK_LOAD_NON_EMPTY_TREE);
- }
-
- this.cmp = MultiComparator.create(cmpFactories);
-
- leafFrame.setMultiComparator(cmp);
- interiorFrame.setMultiComparator(cmp);
-
- tupleWriter = leafFrame.getTupleWriter();
-
- NodeFrontier leafFrontier = new
NodeFrontier(leafFrame.createTupleReference());
- leafFrontier.pageId = freePageManager.takePage(metaFrame);
- leafFrontier.page =
-
bufferCache.confiscatePage(BufferedFileHandle.getDiskPageId(fileId,
leafFrontier.pageId));
-
- interiorFrame.setPage(leafFrontier.page);
- interiorFrame.initBuffer((byte) 0);
- interiorMaxBytes = (int) (interiorFrame.getBuffer().capacity() *
fillFactor);
-
- leafFrame.setPage(leafFrontier.page);
- leafFrame.initBuffer((byte) 0);
- leafMaxBytes = (int) (leafFrame.getBuffer().capacity() *
fillFactor);
- slotSize = leafFrame.getSlotSize();
-
- nodeFrontiers.add(leafFrontier);
- pagesToWrite = new ArrayList<>();
- }
-
- protected void handleException() throws HyracksDataException {
- // Unlatch and unpin pages that weren't in the queue to avoid
leaking memory.
- for (NodeFrontier nodeFrontier : nodeFrontiers) {
- ICachedPage frontierPage = nodeFrontier.page;
- if (frontierPage.confiscated()) {
- bufferCache.returnPage(frontierPage, false);
- }
- }
- for (ICachedPage pageToDiscard : pagesToWrite) {
- bufferCache.returnPage(pageToDiscard, false);
- }
- releasedLatches = true;
- }
-
- @Override
- public void end() throws HyracksDataException {
- bufferCache.finishQueue();
- freePageManager.setRootPageId(rootPage);
- }
-
- protected void addLevel() throws HyracksDataException {
- NodeFrontier frontier = new
NodeFrontier(tupleWriter.createTupleReference());
- frontier.page =
bufferCache.confiscatePage(IBufferCache.INVALID_DPID);
- frontier.pageId = -1;
- frontier.lastTuple.setFieldCount(cmp.getKeyFieldCount());
- interiorFrame.setPage(frontier.page);
- interiorFrame.initBuffer((byte) nodeFrontiers.size());
- nodeFrontiers.add(frontier);
- }
-
- public ITreeIndexFrame getLeafFrame() {
- return leafFrame;
- }
-
- public void setLeafFrame(ITreeIndexFrame leafFrame) {
- this.leafFrame = leafFrame;
- }
-
- }
-
+ /**
+ * TODO: Should be deleted ? This never been used.
+ *
+ */
public class TreeIndexInsertBulkLoader implements IIndexBulkLoader {
ITreeIndexAccessor accessor;
@@ -347,10 +257,6 @@
@Override
public long getMemoryAllocationSize() {
return 0;
- }
-
- public IBinaryComparatorFactory[] getCmpFactories() {
- return cmpFactories;
}
@Override
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndexBulkLoader.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndexBulkLoader.java
new file mode 100644
index 0000000..8376150
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndexBulkLoader.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.common.impls;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.common.api.IBulkLoadFinalizer;
+import org.apache.hyracks.storage.am.common.api.IPageManager;
+import org.apache.hyracks.storage.am.common.api.ITreeIndexFrame;
+import org.apache.hyracks.storage.am.common.api.ITreeIndexMetadataFrame;
+import org.apache.hyracks.storage.am.common.api.ITreeIndexTupleWriter;
+import org.apache.hyracks.storage.common.IIndexBulkLoader;
+import org.apache.hyracks.storage.common.MultiComparator;
+import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+import org.apache.hyracks.storage.common.buffercache.ICachedPage;
+import org.apache.hyracks.storage.common.buffercache.IFIFOPageQueue;
+import org.apache.hyracks.storage.common.file.BufferedFileHandle;
+
+public abstract class AbstractTreeIndexBulkLoader implements IIndexBulkLoader {
+
+ protected final MultiComparator cmp;
+ protected final int slotSize;
+ protected final int leafMaxBytes;
+ protected final int interiorMaxBytes;
+ protected final ArrayList<NodeFrontier> nodeFrontiers;
+ protected final ITreeIndexMetadataFrame metaFrame;
+ protected final ITreeIndexTupleWriter tupleWriter;
+ protected ITreeIndexFrame leafFrame;
+ protected ITreeIndexFrame interiorFrame;
+ // Immutable bulk loaders write their root page at page -2, as needed e.g.
by append-only file systems such as
+ // HDFS. Since loading this tree relies on the root page actually being
at that point, no further inserts into
+ // that tree are allowed. Currently, this is not enforced.
+ protected boolean releasedLatches;
+ protected final IFIFOPageQueue queue;
+ protected final List<ICachedPage> pagesToWrite;
+
+ protected final IBufferCache bufferCache;
+ protected final IPageManager freePageManager;
+ protected final int maxTupleSize;
+ protected final int fileId;
+ protected final IBulkLoadFinalizer<Void, Integer> rootPageSetter;
+ protected int rootPage;
+
+ public AbstractTreeIndexBulkLoader(AbstractTreeIndex index, float
fillFactor, int maxTupleSize,
+ IBulkLoadFinalizer<Void, Integer> rootPageSetter) throws
HyracksDataException {
+ this.leafFrame = index.getLeafFrameFactory().createFrame();
+
+ if (!index.isEmptyTree(leafFrame)) {
+ throw
HyracksDataException.create(ErrorCode.CANNOT_BULK_LOAD_NON_EMPTY_TREE);
+ }
+ this.bufferCache = index.getBufferCache();
+ this.freePageManager = index.getPageManager();
+ this.interiorFrame = index.getInteriorFrameFactory().createFrame();
+ metaFrame = freePageManager.createMetadataFrame();
+
+ queue = bufferCache.createFIFOQueue();
+
+ this.cmp = MultiComparator.create(index.getComparatorFactories());
+
+ leafFrame.setMultiComparator(cmp);
+ interiorFrame.setMultiComparator(cmp);
+
+ tupleWriter = leafFrame.getTupleWriter();
+
+ NodeFrontier leafFrontier = new
NodeFrontier(leafFrame.createTupleReference());
+ leafFrontier.pageId = freePageManager.takePage(metaFrame);
+ leafFrontier.page =
+
bufferCache.confiscatePage(BufferedFileHandle.getDiskPageId(index.getFileId(),
leafFrontier.pageId));
+
+ interiorFrame.setPage(leafFrontier.page);
+ interiorFrame.initBuffer((byte) 0);
+ interiorMaxBytes = (int) (interiorFrame.getBuffer().capacity() *
fillFactor);
+
+ leafFrame.setPage(leafFrontier.page);
+ leafFrame.initBuffer((byte) 0);
+ leafMaxBytes = (int) (leafFrame.getBuffer().capacity() * fillFactor);
+ slotSize = leafFrame.getSlotSize();
+
+ nodeFrontiers = new ArrayList<>();
+ pagesToWrite = new ArrayList<>();
+ nodeFrontiers.add(leafFrontier);
+ this.maxTupleSize = maxTupleSize;
+ fileId = index.getFileId();
+ this.rootPageSetter = rootPageSetter;
+
+ }
+
+ /* ********************************
+ * IIndexBulkLoader methods
+ * ********************************
+ */
+
+ @Override
+ public void end() throws HyracksDataException {
+ rootPageSetter.finalizeBulkLoad(rootPage);
+ bufferCache.finishQueue();
+ freePageManager.setRootPageId(rootPage);
+ }
+
+ /**
+ * Calling abort will release latches and unpin all pages
+ */
+ @Override
+ public void abort() throws HyracksDataException {
+ cleanUp();
+ }
+
+ /* ********************************
+ * TreeIndex additional methods
+ * ********************************
+ */
+ public ITreeIndexFrame getLeafFrame() {
+ return leafFrame;
+ }
+
+ protected int getFrameUsedSpace(ITreeIndexFrame frame) {
+ return frame.getBuffer().capacity() - frame.getTotalFreeSpace();
+ }
+
+ protected void addLevel() throws HyracksDataException {
+ NodeFrontier frontier = new
NodeFrontier(tupleWriter.createTupleReference());
+ frontier.page = bufferCache.confiscatePage(IBufferCache.INVALID_DPID);
+ frontier.pageId = -1;
+ frontier.lastTuple.setFieldCount(cmp.getKeyFieldCount());
+ interiorFrame.setPage(frontier.page);
+ interiorFrame.initBuffer((byte) nodeFrontiers.size());
+ nodeFrontiers.add(frontier);
+ }
+
+ protected void cleanUp() throws HyracksDataException {
+ // Unlatch and unpin pages that weren't in the queue to avoid leaking
memory.
+ for (NodeFrontier nodeFrontier : nodeFrontiers) {
+ ICachedPage frontierPage = nodeFrontier.page;
+ if (frontierPage.confiscated()) {
+ bufferCache.returnPage(frontierPage, false);
+ }
+ }
+ for (ICachedPage pageToDiscard : pagesToWrite) {
+ bufferCache.returnPage(pageToDiscard, false);
+ }
+ releasedLatches = true;
+ }
+}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexBulkLoader.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexBulkLoader.java
index 84857f4..168fcfd 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexBulkLoader.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexBulkLoader.java
@@ -20,7 +20,7 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
-import
org.apache.hyracks.storage.am.common.impls.AbstractTreeIndex.AbstractTreeIndexBulkLoader;
+import org.apache.hyracks.storage.am.common.impls.AbstractTreeIndexBulkLoader;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMTreeTupleWriter;
import org.apache.hyracks.storage.common.IIndexBulkLoader;
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java
index 4942eda..e39c14e 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java
@@ -19,8 +19,6 @@
package org.apache.hyracks.storage.am.lsm.invertedindex.ondisk;
-import java.io.DataOutput;
-import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.hyracks.api.context.IHyracksCommonContext;
@@ -40,6 +38,7 @@
import org.apache.hyracks.storage.am.btree.impls.DiskBTree;
import org.apache.hyracks.storage.am.btree.impls.RangePredicate;
import org.apache.hyracks.storage.am.btree.util.BTreeUtils;
+import org.apache.hyracks.storage.am.common.api.IBulkLoadFinalizer;
import org.apache.hyracks.storage.am.common.api.IIndexOperationContext;
import org.apache.hyracks.storage.am.common.api.IPageManagerFactory;
import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters;
@@ -58,9 +57,6 @@
import org.apache.hyracks.storage.common.ISearchPredicate;
import org.apache.hyracks.storage.common.MultiComparator;
import org.apache.hyracks.storage.common.buffercache.IBufferCache;
-import org.apache.hyracks.storage.common.buffercache.ICachedPage;
-import org.apache.hyracks.storage.common.buffercache.IFIFOPageQueue;
-import org.apache.hyracks.storage.common.file.BufferedFileHandle;
/**
* An inverted index consists of two files: 1. a file storing (paginated)
@@ -103,12 +99,13 @@
protected final int numTokenFields;
protected final int numInvListKeys;
protected final FileReference invListsFile;
+ protected final IBulkLoadFinalizer<Void, Integer> maxPageIdSetter;
// Last page id of inverted-lists file (inclusive). Set during bulk load.
protected int invListsMaxPageId = -1;
protected boolean isOpen = false;
protected boolean wasOpen = false;
- public OnDiskInvertedIndex(IBufferCache bufferCache, IInvertedListBuilder
invListBuilder,
+ public OnDiskInvertedIndex( /*NOSONAR*/ IBufferCache bufferCache,
IInvertedListBuilder invListBuilder,
ITypeTraits[] invListTypeTraits, IBinaryComparatorFactory[]
invListCmpFactories,
ITypeTraits[] tokenTypeTraits, IBinaryComparatorFactory[]
tokenCmpFactories, FileReference btreeFile,
FileReference invListsFile, IPageManagerFactory
pageManagerFactory) throws HyracksDataException {
@@ -127,6 +124,11 @@
this.invListEndPageIdField = numTokenFields + 1;
this.invListStartOffField = numTokenFields + 2;
this.invListNumElementsField = numTokenFields + 3;
+
+ maxPageIdSetter = (Integer newMaxPageId) -> {
+ invListsMaxPageId = newMaxPageId;
+ return null;
+ };
}
@Override
@@ -221,168 +223,6 @@
int numElements =
IntegerPointable.getInteger(btreeTuple.getFieldData(invListNumElementsField),
btreeTuple.getFieldStart(invListNumElementsField));
listCursor.reset(startPageId, endPageId, startOff, numElements);
- }
-
- public final class OnDiskInvertedIndexBulkLoader implements
IIndexBulkLoader {
- private final ArrayTupleBuilder btreeTupleBuilder;
- private final ArrayTupleReference btreeTupleReference;
- private final IIndexBulkLoader btreeBulkloader;
-
- private int currentInvListStartPageId;
- private int currentInvListStartOffset;
- private final ArrayTupleBuilder lastTupleBuilder;
- private final ArrayTupleReference lastTuple;
-
- private int currentPageId;
- private ICachedPage currentPage;
- private final MultiComparator tokenCmp;
- private final MultiComparator invListCmp;
-
- private final boolean verifyInput;
- private final MultiComparator allCmp;
-
- private final IFIFOPageQueue queue;
-
- public OnDiskInvertedIndexBulkLoader(float btreeFillFactor, boolean
verifyInput, long numElementsHint,
- boolean checkIfEmptyIndex, int startPageId) throws
HyracksDataException {
- this.verifyInput = verifyInput;
- this.tokenCmp =
MultiComparator.create(btree.getComparatorFactories());
- this.invListCmp = MultiComparator.create(invListCmpFactories);
- if (verifyInput) {
- allCmp =
MultiComparator.create(btree.getComparatorFactories(), invListCmpFactories);
- } else {
- allCmp = null;
- }
- this.btreeTupleBuilder = new
ArrayTupleBuilder(btree.getFieldCount());
- this.btreeTupleReference = new ArrayTupleReference();
- this.lastTupleBuilder = new ArrayTupleBuilder(numTokenFields +
numInvListKeys);
- this.lastTuple = new ArrayTupleReference();
- this.btreeBulkloader =
- btree.createBulkLoader(btreeFillFactor, verifyInput,
numElementsHint, checkIfEmptyIndex);
- currentPageId = startPageId;
- currentPage =
bufferCache.confiscatePage(BufferedFileHandle.getDiskPageId(fileId,
currentPageId));
- invListBuilder.setTargetBuffer(currentPage.getBuffer().array(), 0);
- queue = bufferCache.createFIFOQueue();
- }
-
- public void pinNextPage() throws HyracksDataException {
- queue.put(currentPage);
- currentPageId++;
- currentPage =
bufferCache.confiscatePage(BufferedFileHandle.getDiskPageId(fileId,
currentPageId));
- }
-
- private void createAndInsertBTreeTuple() throws HyracksDataException {
- // Build tuple.
- btreeTupleBuilder.reset();
- DataOutput output = btreeTupleBuilder.getDataOutput();
- // Add key fields.
- lastTuple.reset(lastTupleBuilder.getFieldEndOffsets(),
lastTupleBuilder.getByteArray());
- for (int i = 0; i < numTokenFields; i++) {
- btreeTupleBuilder.addField(lastTuple.getFieldData(i),
lastTuple.getFieldStart(i),
- lastTuple.getFieldLength(i));
- }
- // Add inverted-list 'pointer' value fields.
- try {
- output.writeInt(currentInvListStartPageId);
- btreeTupleBuilder.addFieldEndOffset();
- output.writeInt(currentPageId);
- btreeTupleBuilder.addFieldEndOffset();
- output.writeInt(currentInvListStartOffset);
- btreeTupleBuilder.addFieldEndOffset();
- output.writeInt(invListBuilder.getListSize());
- btreeTupleBuilder.addFieldEndOffset();
- } catch (IOException e) {
- throw HyracksDataException.create(e);
- }
- // Reset tuple reference and add it into the BTree load.
- btreeTupleReference.reset(btreeTupleBuilder.getFieldEndOffsets(),
btreeTupleBuilder.getByteArray());
- btreeBulkloader.add(btreeTupleReference);
- }
-
- /**
- * Assumptions:
- * The first btree.getMultiComparator().getKeyFieldCount() fields in
tuple
- * are btree keys (e.g., a string token).
- * The next invListCmp.getKeyFieldCount() fields in tuple are keys of
the
- * inverted list (e.g., primary key).
- * Key fields of inverted list are fixed size.
- */
- @Override
- public void add(ITupleReference tuple) throws HyracksDataException {
- boolean firstElement = lastTupleBuilder.getSize() == 0;
- boolean startNewList = firstElement;
- if (!firstElement) {
- // If the current and the last token don't match, we start a
new list.
- lastTuple.reset(lastTupleBuilder.getFieldEndOffsets(),
lastTupleBuilder.getByteArray());
- startNewList = tokenCmp.compare(tuple, lastTuple) != 0;
- }
- if (startNewList) {
- if (!firstElement) {
- // Create entry in btree for last inverted list.
- createAndInsertBTreeTuple();
- }
- if (!invListBuilder.startNewList(tuple, numTokenFields)) {
- pinNextPage();
-
invListBuilder.setTargetBuffer(currentPage.getBuffer().array(), 0);
- if (!invListBuilder.startNewList(tuple, numTokenFields)) {
- throw new IllegalStateException("Failed to create
first inverted list.");
- }
- }
- currentInvListStartPageId = currentPageId;
- currentInvListStartOffset = invListBuilder.getPos();
- } else {
- if (invListCmp.compare(tuple, lastTuple, numTokenFields) == 0)
{
- // Duplicate inverted-list element.
- return;
- }
- }
-
- // Append to current inverted list.
- if (!invListBuilder.appendElement(tuple, numTokenFields,
numInvListKeys)) {
- pinNextPage();
-
invListBuilder.setTargetBuffer(currentPage.getBuffer().array(), 0);
- if (!invListBuilder.appendElement(tuple, numTokenFields,
numInvListKeys)) {
- throw new IllegalStateException(
- "Failed to append element to inverted list after
switching to a new page.");
- }
- }
-
- if (verifyInput && lastTupleBuilder.getSize() != 0) {
- if (allCmp.compare(tuple, lastTuple) <= 0) {
- throw new HyracksDataException(
- "Input stream given to OnDiskInvertedIndex bulk
load is not sorted.");
- }
- }
-
- // Remember last tuple by creating a copy.
- // TODO: This portion can be optimized by only copying the token
when it changes, and using the last appended inverted-list element as a
reference.
- lastTupleBuilder.reset();
- for (int i = 0; i < tuple.getFieldCount(); i++) {
- lastTupleBuilder.addField(tuple.getFieldData(i),
tuple.getFieldStart(i), tuple.getFieldLength(i));
- }
- }
-
- @Override
- public void end() throws HyracksDataException {
- // The last tuple builder is empty if add() was never called.
- if (lastTupleBuilder.getSize() != 0) {
- createAndInsertBTreeTuple();
- }
- btreeBulkloader.end();
-
- if (currentPage != null) {
- queue.put(currentPage);
- }
- invListsMaxPageId = currentPageId;
- bufferCache.finishQueue();
- }
-
- @Override
- public void abort() throws HyracksDataException {
- if (btreeBulkloader != null) {
- btreeBulkloader.abort();
- }
- }
}
@Override
@@ -533,8 +373,8 @@
@Override
public IIndexBulkLoader createBulkLoader(float fillFactor, boolean
verifyInput, long numElementsHint,
boolean checkIfEmptyIndex) throws HyracksDataException {
- return new OnDiskInvertedIndexBulkLoader(fillFactor, verifyInput,
numElementsHint, checkIfEmptyIndex,
- rootPageId);
+ return new OnDiskInvertedIndexBulkLoader(this, fillFactor,
verifyInput, numElementsHint, checkIfEmptyIndex,
+ rootPageId, maxPageIdSetter);
}
@Override
@@ -636,4 +476,15 @@
bufferCache.purgeHandle(fileId);
fileId = -1;
}
+
+ /**
+ * @return The file id of this index.
+ */
+ public int getFileId() {
+ return fileId;
+ }
+
+ public IInvertedListBuilder getInvertedListBuilder() {
+ return invListBuilder;
+ }
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndexBulkLoader.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndexBulkLoader.java
new file mode 100644
index 0000000..0469149
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndexBulkLoader.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.invertedindex.ondisk;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleReference;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.storage.am.btree.impls.BTree;
+import org.apache.hyracks.storage.am.common.api.IBulkLoadFinalizer;
+import
org.apache.hyracks.storage.am.lsm.invertedindex.api.IInvertedListBuilder;
+import org.apache.hyracks.storage.common.IIndexBulkLoader;
+import org.apache.hyracks.storage.common.MultiComparator;
+import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+import org.apache.hyracks.storage.common.buffercache.ICachedPage;
+import org.apache.hyracks.storage.common.buffercache.IFIFOPageQueue;
+import org.apache.hyracks.storage.common.file.BufferedFileHandle;
+
+/**
+ * TODO Clean:
+ * - Remove ArrayTupleBuilder
+ * - Use IFrameFieldAppender --> FrameFixedFieldAppender
+ * - Pass a frame (Frame size assumptions ?)
+ *
+ */
+public final class OnDiskInvertedIndexBulkLoader implements IIndexBulkLoader {
+ private final ArrayTupleBuilder btreeTupleBuilder;
+ private final ArrayTupleReference btreeTupleReference;
+ private final IIndexBulkLoader btreeBulkloader;
+
+ private int currentInvListStartPageId;
+ private int currentInvListStartOffset;
+ private final ArrayTupleBuilder lastTupleBuilder;
+ private final ArrayTupleReference lastTuple;
+
+ private int currentPageId;
+ private ICachedPage currentPage;
+ private final MultiComparator tokenCmp;
+ private final MultiComparator invListCmp;
+
+ private final boolean verifyInput;
+ private final MultiComparator allCmp;
+
+ private final IFIFOPageQueue queue;
+ private final IBufferCache bufferCache;
+
+ private final int fileId;
+ private final int numTokenFields;
+ private final int numInvListKeys;
+ private final IInvertedListBuilder invListBuilder;
+ private final IBulkLoadFinalizer<Void, Integer> finalizer;
+
+ public OnDiskInvertedIndexBulkLoader(OnDiskInvertedIndex index, float
btreeFillFactor, boolean verifyInput,
+ long numElementsHint, boolean checkIfEmptyIndex, int startPageId,
+ IBulkLoadFinalizer<Void, Integer> finalizer) throws
HyracksDataException {
+ final BTree btree = index.getBTree();
+ this.fileId = index.getFileId();
+ this.bufferCache = index.getBufferCache();
+ this.verifyInput = verifyInput;
+ this.tokenCmp = MultiComparator.create(btree.getComparatorFactories());
+ this.invListCmp =
MultiComparator.create(index.getInvListCmpFactories());
+ this.numInvListKeys = index.getInvListCmpFactories().length;
+ this.numTokenFields = btree.getComparatorFactories().length;
+ if (verifyInput) {
+ allCmp = MultiComparator.create(btree.getComparatorFactories(),
index.getInvListCmpFactories());
+ } else {
+ allCmp = null;
+ }
+ this.btreeTupleBuilder = new ArrayTupleBuilder(btree.getFieldCount());
+ this.btreeTupleReference = new ArrayTupleReference();
+ this.lastTupleBuilder = new ArrayTupleBuilder(numTokenFields +
numInvListKeys);
+ this.lastTuple = new ArrayTupleReference();
+ this.btreeBulkloader = btree.createBulkLoader(btreeFillFactor,
verifyInput, numElementsHint, checkIfEmptyIndex);
+ currentPageId = startPageId;
+ currentPage =
bufferCache.confiscatePage(BufferedFileHandle.getDiskPageId(fileId,
currentPageId));
+ invListBuilder = index.getInvertedListBuilder();
+ invListBuilder.setTargetBuffer(currentPage.getBuffer().array(), 0);
+ queue = bufferCache.createFIFOQueue();
+ this.finalizer = finalizer;
+ }
+
+ public void pinNextPage() throws HyracksDataException {
+ queue.put(currentPage);
+ currentPageId++;
+ currentPage =
bufferCache.confiscatePage(BufferedFileHandle.getDiskPageId(fileId,
currentPageId));
+ }
+
+ private void createAndInsertBTreeTuple() throws HyracksDataException {
+ // Build tuple.
+ btreeTupleBuilder.reset();
+ DataOutput output = btreeTupleBuilder.getDataOutput();
+ // Add key fields.
+ lastTuple.reset(lastTupleBuilder.getFieldEndOffsets(),
lastTupleBuilder.getByteArray());
+ for (int i = 0; i < numTokenFields; i++) {
+ btreeTupleBuilder.addField(lastTuple.getFieldData(i),
lastTuple.getFieldStart(i),
+ lastTuple.getFieldLength(i));
+ }
+ // Add inverted-list 'pointer' value fields.
+ try {
+ output.writeInt(currentInvListStartPageId);
+ btreeTupleBuilder.addFieldEndOffset();
+ output.writeInt(currentPageId);
+ btreeTupleBuilder.addFieldEndOffset();
+ output.writeInt(currentInvListStartOffset);
+ btreeTupleBuilder.addFieldEndOffset();
+ output.writeInt(invListBuilder.getListSize());
+ btreeTupleBuilder.addFieldEndOffset();
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
+ // Reset tuple reference and add it into the BTree load.
+ btreeTupleReference.reset(btreeTupleBuilder.getFieldEndOffsets(),
btreeTupleBuilder.getByteArray());
+ btreeBulkloader.add(btreeTupleReference);
+ }
+
+ /**
+ * Assumptions:
+ * The first btree.getMultiComparator().getKeyFieldCount() fields in tuple
+ * are btree keys (e.g., a string token).
+ * The next invListCmp.getKeyFieldCount() fields in tuple are keys of the
+ * inverted list (e.g., primary key).
+ * Key fields of inverted list are fixed size.
+ */
+ @Override
+ public void add(ITupleReference tuple) throws HyracksDataException {
+ boolean firstElement = lastTupleBuilder.getSize() == 0;
+ boolean startNewList = firstElement;
+ if (!firstElement) {
+ // If the current and the last token don't match, we start a new
list.
+ lastTuple.reset(lastTupleBuilder.getFieldEndOffsets(),
lastTupleBuilder.getByteArray());
+ startNewList = tokenCmp.compare(tuple, lastTuple) != 0;
+ }
+ if (startNewList) {
+ if (!firstElement) {
+ // Create entry in btree for last inverted list.
+ createAndInsertBTreeTuple();
+ }
+ if (!invListBuilder.startNewList(tuple, numTokenFields)) {
+ pinNextPage();
+
invListBuilder.setTargetBuffer(currentPage.getBuffer().array(), 0);
+ if (!invListBuilder.startNewList(tuple, numTokenFields)) {
+ throw new IllegalStateException("Failed to create first
inverted list.");
+ }
+ }
+ currentInvListStartPageId = currentPageId;
+ currentInvListStartOffset = invListBuilder.getPos();
+ } else {
+ if (invListCmp.compare(tuple, lastTuple, numTokenFields) == 0) {
+ // Duplicate inverted-list element.
+ return;
+ }
+ }
+
+ // Append to current inverted list.
+ if (!invListBuilder.appendElement(tuple, numTokenFields,
numInvListKeys)) {
+ pinNextPage();
+ invListBuilder.setTargetBuffer(currentPage.getBuffer().array(), 0);
+ if (!invListBuilder.appendElement(tuple, numTokenFields,
numInvListKeys)) {
+ throw new IllegalStateException(
+ "Failed to append element to inverted list after
switching to a new page.");
+ }
+ }
+
+ if (verifyInput && lastTupleBuilder.getSize() != 0) {
+ if (allCmp.compare(tuple, lastTuple) <= 0) {
+ throw new HyracksDataException("Input stream given to
OnDiskInvertedIndex bulk load is not sorted.");
+ }
+ }
+
+ // Remember last tuple by creating a copy.
+ // TODO: This portion can be optimized by only copying the token when
it changes, and using the last appended inverted-list element as a reference.
+ lastTupleBuilder.reset();
+ for (int i = 0; i < tuple.getFieldCount(); i++) {
+ lastTupleBuilder.addField(tuple.getFieldData(i),
tuple.getFieldStart(i), tuple.getFieldLength(i));
+ }
+ }
+
+ @Override
+ public void end() throws HyracksDataException {
+ // The last tuple builder is empty if add() was never called.
+ if (lastTupleBuilder.getSize() != 0) {
+ createAndInsertBTreeTuple();
+ }
+ btreeBulkloader.end();
+
+ if (currentPage != null) {
+ queue.put(currentPage);
+ }
+ finalizer.finalizeBulkLoad(currentPageId);
+ bufferCache.finishQueue();
+ }
+
+ @Override
+ public void abort() throws HyracksDataException {
+ if (btreeBulkloader != null) {
+ btreeBulkloader.abort();
+ }
+
+ if (currentPage != null && currentPage.confiscated()) {
+ bufferCache.returnPage(currentPage);
+ }
+ }
+}
\ No newline at end of file
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/PartitionedOnDiskInvertedIndex.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/PartitionedOnDiskInvertedIndex.java
index 01e0684..7d6a2d9 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/PartitionedOnDiskInvertedIndex.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/PartitionedOnDiskInvertedIndex.java
@@ -40,7 +40,7 @@
public class PartitionedOnDiskInvertedIndex extends OnDiskInvertedIndex
implements IPartitionedInvertedIndex {
- protected final int PARTITIONING_NUM_TOKENS_FIELD = 1;
+ protected static final int PARTITIONING_NUM_TOKENS_FIELD = 1;
public PartitionedOnDiskInvertedIndex(IBufferCache bufferCache,
IInvertedListBuilder invListBuilder,
ITypeTraits[] invListTypeTraits, IBinaryComparatorFactory[]
invListCmpFactories,
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/ExternalRTree.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/ExternalRTree.java
index f29bffc..1bb7562 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/ExternalRTree.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/ExternalRTree.java
@@ -30,7 +30,7 @@
import org.apache.hyracks.storage.am.bloomfilter.impls.BloomCalculations;
import
org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilterSpecification;
import org.apache.hyracks.storage.am.btree.impls.BTree;
-import org.apache.hyracks.storage.am.btree.impls.BTree.BTreeBulkLoader;
+import org.apache.hyracks.storage.am.btree.impls.BTreeBulkLoader;
import org.apache.hyracks.storage.am.common.api.IIndexOperationContext;
import org.apache.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
import org.apache.hyracks.storage.am.common.api.ITwoPCIndexBulkLoader;
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTree.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTree.java
index 1e71b7f..2238c06 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTree.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTree.java
@@ -19,9 +19,7 @@
package org.apache.hyracks.storage.am.rtree.impls;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
@@ -36,20 +34,15 @@
import org.apache.hyracks.storage.am.common.api.ITreeIndexCursor;
import org.apache.hyracks.storage.am.common.api.ITreeIndexFrame;
import org.apache.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
-import org.apache.hyracks.storage.am.common.api.ITreeIndexTupleReference;
-import org.apache.hyracks.storage.am.common.frames.AbstractSlotManager;
import org.apache.hyracks.storage.am.common.frames.FrameOpSpaceStatus;
import org.apache.hyracks.storage.am.common.impls.AbstractTreeIndex;
-import org.apache.hyracks.storage.am.common.impls.NodeFrontier;
import org.apache.hyracks.storage.am.common.impls.TreeIndexDiskOrderScanCursor;
import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
import org.apache.hyracks.storage.am.common.util.TreeIndexUtils;
import org.apache.hyracks.storage.am.rtree.api.IRTreeFrame;
import org.apache.hyracks.storage.am.rtree.api.IRTreeInteriorFrame;
import org.apache.hyracks.storage.am.rtree.api.IRTreeLeafFrame;
-import org.apache.hyracks.storage.am.rtree.frames.RTreeNSMFrame;
import org.apache.hyracks.storage.am.rtree.frames.RTreeNSMInteriorFrame;
-import org.apache.hyracks.storage.am.rtree.tuples.RTreeTypeAwareTupleWriter;
import org.apache.hyracks.storage.common.IIndexAccessParameters;
import org.apache.hyracks.storage.common.IIndexBulkLoader;
import org.apache.hyracks.storage.common.IIndexCursor;
@@ -57,7 +50,6 @@
import org.apache.hyracks.storage.common.ISearchOperationCallback;
import org.apache.hyracks.storage.common.ISearchPredicate;
import org.apache.hyracks.storage.common.MultiComparator;
-import org.apache.hyracks.storage.common.buffercache.BufferCache;
import org.apache.hyracks.storage.common.buffercache.IBufferCache;
import org.apache.hyracks.storage.common.buffercache.ICachedPage;
import org.apache.hyracks.storage.common.file.BufferedFileHandle;
@@ -901,210 +893,7 @@
public IIndexBulkLoader createBulkLoader(float fillFactor, boolean
verifyInput, long numElementsHint,
boolean checkIfEmptyIndex) throws HyracksDataException {
// TODO: verifyInput currently does nothing.
- return new RTreeBulkLoader(fillFactor);
- }
-
- public class RTreeBulkLoader extends
AbstractTreeIndex.AbstractTreeIndexBulkLoader {
- ITreeIndexFrame lowerFrame, prevInteriorFrame;
- RTreeTypeAwareTupleWriter interiorFrameTupleWriter =
- ((RTreeTypeAwareTupleWriter) interiorFrame.getTupleWriter());
- ITreeIndexTupleReference mbrTuple =
interiorFrame.createTupleReference();
- ByteBuffer mbr;
- List<Integer> prevNodeFrontierPages = new ArrayList<>();
-
- public RTreeBulkLoader(float fillFactor) throws HyracksDataException {
- super(fillFactor);
- prevInteriorFrame = interiorFrameFactory.createFrame();
- }
-
- @Override
- public void add(ITupleReference tuple) throws HyracksDataException {
- try {
- int leafFrameTupleSize =
leafFrame.getBytesRequiredToWriteTuple(tuple);
- int interiorFrameTupleSize =
interiorFrame.getBytesRequiredToWriteTuple(tuple);
- int tupleSize = Math.max(leafFrameTupleSize,
interiorFrameTupleSize);
- if (tupleSize > maxTupleSize) {
- throw
HyracksDataException.create(ErrorCode.RECORD_IS_TOO_LARGE, tupleSize,
maxTupleSize);
- }
-
- NodeFrontier leafFrontier = nodeFrontiers.get(0);
-
- int spaceNeeded = leafFrameTupleSize;
- int spaceUsed = leafFrame.getBuffer().capacity() -
leafFrame.getTotalFreeSpace();
-
- // try to free space by compression
- if (spaceUsed + spaceNeeded > leafMaxBytes) {
- leafFrame.compress();
- spaceUsed = leafFrame.getBuffer().capacity() -
leafFrame.getTotalFreeSpace();
- }
-
- if (spaceUsed + spaceNeeded > leafMaxBytes) {
-
- if (prevNodeFrontierPages.size() == 0) {
- prevNodeFrontierPages.add(leafFrontier.pageId);
- } else {
- prevNodeFrontierPages.set(0, leafFrontier.pageId);
- }
- propagateBulk(1, false, pagesToWrite);
-
- leafFrontier.pageId = freePageManager.takePage(metaFrame);
- queue.put(leafFrontier.page);
- for (ICachedPage c : pagesToWrite) {
- queue.put(c);
- }
-
- pagesToWrite.clear();
- leafFrontier.page = bufferCache
-
.confiscatePage(BufferedFileHandle.getDiskPageId(getFileId(),
leafFrontier.pageId));
- leafFrame.setPage(leafFrontier.page);
- leafFrame.initBuffer((byte) 0);
-
- }
-
- leafFrame.setPage(leafFrontier.page);
- leafFrame.insert(tuple,
AbstractSlotManager.GREATEST_KEY_INDICATOR);
- } catch (HyracksDataException e) {
- handleException();
- throw e;
- } catch (RuntimeException e) {
- handleException();
- throw e;
- }
-
- }
-
- @Override
- public void end() throws HyracksDataException {
- pagesToWrite.clear();
- //if writing a trivial 1-page tree, don't try and propagate up
- if (nodeFrontiers.size() > 1) {
- propagateBulk(1, true, pagesToWrite);
- }
-
- for (ICachedPage c : pagesToWrite) {
- queue.put(c);
- }
- finish();
- super.end();
- }
-
- @Override
- public void abort() throws HyracksDataException {
- super.handleException();
- }
-
- protected void finish() throws HyracksDataException {
- int prevPageId = -1;
- //here we assign physical identifiers to everything we can
- for (NodeFrontier n : nodeFrontiers) {
- //not a leaf
- if (nodeFrontiers.indexOf(n) != 0) {
- interiorFrame.setPage(n.page);
- mbrTuple.resetByTupleOffset(mbr.array(), 0);
- interiorFrame.insert(mbrTuple, -1);
- interiorFrame.getBuffer().putInt(
-
interiorFrame.getTupleOffset(interiorFrame.getTupleCount() - 1) +
mbrTuple.getTupleSize(),
- prevPageId);
-
- int finalPageId = freePageManager.takePage(metaFrame);
- n.pageId = finalPageId;
-
n.page.setDiskPageId(BufferedFileHandle.getDiskPageId(getFileId(),
finalPageId));
- //else we are looking at a leaf
- }
- //set next guide MBR
- //if propagateBulk didnt have to do anything this may be
un-necessary
- if (nodeFrontiers.size() > 1 && nodeFrontiers.indexOf(n) <
nodeFrontiers.size() - 1) {
- lowerFrame.setPage(n.page);
- ((RTreeNSMFrame) lowerFrame).adjustMBR();
- interiorFrameTupleWriter.writeTupleFields(((RTreeNSMFrame)
lowerFrame).getMBRTuples(), 0, mbr, 0);
- }
- queue.put(n.page);
- n.page = null;
- prevPageId = n.pageId;
- }
- rootPage = nodeFrontiers.get(nodeFrontiers.size() - 1).pageId;
- releasedLatches = true;
- }
-
- protected void propagateBulk(int level, boolean toRoot,
List<ICachedPage> pagesToWrite)
- throws HyracksDataException {
- boolean propagated = false;
-
- if (level == 1) {
- lowerFrame = leafFrame;
- }
-
- if (lowerFrame.getTupleCount() == 0) {
- return;
- }
-
- if (level >= nodeFrontiers.size()) {
- addLevel();
- }
-
- //adjust the tuple pointers of the lower frame to allow us to
calculate our MBR
- //if this is a leaf, then there is only one tuple, so this is
trivial
- ((RTreeNSMFrame) lowerFrame).adjustMBR();
-
- if (mbr == null) {
- int bytesRequired =
-
interiorFrameTupleWriter.bytesRequired(((RTreeNSMFrame)
lowerFrame).getMBRTuples()[0], 0,
- cmp.getKeyFieldCount()) +
((RTreeNSMInteriorFrame) interiorFrame).getChildPointerSize();
- mbr = ByteBuffer.allocate(bytesRequired);
- }
- interiorFrameTupleWriter.writeTupleFields(((RTreeNSMFrame)
lowerFrame).getMBRTuples(), 0, mbr, 0);
- mbrTuple.resetByTupleOffset(mbr.array(), 0);
-
- NodeFrontier frontier = nodeFrontiers.get(level);
- interiorFrame.setPage(frontier.page);
- //see if we have space for two tuples. this works around a tricky
boundary condition with sequential bulk
- // load where finalization can possibly lead to a split
- //TODO: accomplish this without wasting 1 tuple
- int sizeOfTwoTuples = 2 * (mbrTuple.getTupleSize() +
RTreeNSMInteriorFrame.childPtrSize);
- FrameOpSpaceStatus spaceForTwoTuples =
- (((RTreeNSMInteriorFrame)
interiorFrame).hasSpaceInsert(sizeOfTwoTuples));
- if (spaceForTwoTuples !=
FrameOpSpaceStatus.SUFFICIENT_CONTIGUOUS_SPACE && !toRoot) {
-
- int finalPageId = freePageManager.takePage(metaFrame);
- if (prevNodeFrontierPages.size() <= level) {
- prevNodeFrontierPages.add(finalPageId);
- } else {
- prevNodeFrontierPages.set(level, finalPageId);
- }
-
frontier.page.setDiskPageId(BufferedFileHandle.getDiskPageId(getFileId(),
finalPageId));
- pagesToWrite.add(frontier.page);
- lowerFrame = prevInteriorFrame;
- lowerFrame.setPage(frontier.page);
-
- frontier.page =
bufferCache.confiscatePage(BufferCache.INVALID_DPID);
- interiorFrame.setPage(frontier.page);
- interiorFrame.initBuffer((byte) level);
-
- interiorFrame.insert(mbrTuple,
AbstractSlotManager.GREATEST_KEY_INDICATOR);
-
- interiorFrame.getBuffer().putInt(
-
interiorFrame.getTupleOffset(interiorFrame.getTupleCount() - 1) +
mbrTuple.getTupleSize(),
- prevNodeFrontierPages.get(level - 1));
-
- propagateBulk(level + 1, toRoot, pagesToWrite);
- } else if (interiorFrame.hasSpaceInsert(mbrTuple) ==
FrameOpSpaceStatus.SUFFICIENT_CONTIGUOUS_SPACE
- && !toRoot) {
-
- interiorFrame.insert(mbrTuple, -1);
-
- interiorFrame.getBuffer().putInt(
-
interiorFrame.getTupleOffset(interiorFrame.getTupleCount() - 1) +
mbrTuple.getTupleSize(),
- prevNodeFrontierPages.get(level - 1));
- }
-
- if (toRoot && level < nodeFrontiers.size() - 1) {
- lowerFrame = prevInteriorFrame;
- lowerFrame.setPage(frontier.page);
- propagateBulk(level + 1, true, pagesToWrite);
- }
-
- leafFrame.setPage(nodeFrontiers.get(0).page);
- }
+ return new RTreeBulkLoader(this, fillFactor, maxTupleSize,
rootPageSetter);
}
@Override
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTreeBulkLoader.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTreeBulkLoader.java
new file mode 100644
index 0000000..8fa693c
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTreeBulkLoader.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.rtree.impls;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.storage.am.common.api.IBulkLoadFinalizer;
+import org.apache.hyracks.storage.am.common.api.ITreeIndexFrame;
+import org.apache.hyracks.storage.am.common.api.ITreeIndexTupleReference;
+import org.apache.hyracks.storage.am.common.frames.AbstractSlotManager;
+import org.apache.hyracks.storage.am.common.frames.FrameOpSpaceStatus;
+import org.apache.hyracks.storage.am.common.impls.AbstractTreeIndexBulkLoader;
+import org.apache.hyracks.storage.am.common.impls.NodeFrontier;
+import org.apache.hyracks.storage.am.rtree.frames.RTreeNSMFrame;
+import org.apache.hyracks.storage.am.rtree.frames.RTreeNSMInteriorFrame;
+import org.apache.hyracks.storage.am.rtree.tuples.RTreeTypeAwareTupleWriter;
+import org.apache.hyracks.storage.common.buffercache.BufferCache;
+import org.apache.hyracks.storage.common.buffercache.ICachedPage;
+import org.apache.hyracks.storage.common.file.BufferedFileHandle;
+
+public class RTreeBulkLoader extends AbstractTreeIndexBulkLoader {
+
+ private final ITreeIndexFrame prevInteriorFrame;
+ private ITreeIndexFrame lowerFrame;
+ private final RTreeTypeAwareTupleWriter interiorFrameTupleWriter;
+ private final ITreeIndexTupleReference mbrTuple;
+ private final List<Integer> prevNodeFrontierPages = new ArrayList<>();
+ private ByteBuffer mbr;
+
+ public RTreeBulkLoader(RTree index, float fillFactor, int maxTupleSize,
IBulkLoadFinalizer<Void, Integer> finalizer)
+ throws HyracksDataException {
+ super(index, fillFactor, maxTupleSize, finalizer);
+ prevInteriorFrame = index.getInteriorFrameFactory().createFrame();
+ interiorFrameTupleWriter = (RTreeTypeAwareTupleWriter)
interiorFrame.getTupleWriter();
+ mbrTuple = interiorFrame.createTupleReference();
+ }
+
+ @Override
+ public void add(ITupleReference tuple) throws HyracksDataException {
+ try {
+ int leafFrameTupleSize =
leafFrame.getBytesRequiredToWriteTuple(tuple);
+ int interiorFrameTupleSize =
interiorFrame.getBytesRequiredToWriteTuple(tuple);
+ int tupleSize = Math.max(leafFrameTupleSize,
interiorFrameTupleSize);
+ if (tupleSize > maxTupleSize) {
+ throw
HyracksDataException.create(ErrorCode.RECORD_IS_TOO_LARGE, tupleSize,
maxTupleSize);
+ }
+
+ NodeFrontier leafFrontier = nodeFrontiers.get(0);
+
+ int spaceNeeded = leafFrameTupleSize;
+ int spaceUsed = leafFrame.getBuffer().capacity() -
leafFrame.getTotalFreeSpace();
+
+ // try to free space by compression
+ if (spaceUsed + spaceNeeded > leafMaxBytes) {
+ leafFrame.compress();
+ spaceUsed = leafFrame.getBuffer().capacity() -
leafFrame.getTotalFreeSpace();
+ }
+
+ if (spaceUsed + spaceNeeded > leafMaxBytes) {
+
+ if (prevNodeFrontierPages.isEmpty()) {
+ prevNodeFrontierPages.add(leafFrontier.pageId);
+ } else {
+ prevNodeFrontierPages.set(0, leafFrontier.pageId);
+ }
+ propagateBulk(1, false, pagesToWrite);
+
+ leafFrontier.pageId = freePageManager.takePage(metaFrame);
+ queue.put(leafFrontier.page);
+ for (ICachedPage c : pagesToWrite) {
+ queue.put(c);
+ }
+
+ pagesToWrite.clear();
+ leafFrontier.page =
+
bufferCache.confiscatePage(BufferedFileHandle.getDiskPageId(fileId,
leafFrontier.pageId));
+ leafFrame.setPage(leafFrontier.page);
+ leafFrame.initBuffer((byte) 0);
+
+ }
+
+ leafFrame.setPage(leafFrontier.page);
+ leafFrame.insert(tuple,
AbstractSlotManager.GREATEST_KEY_INDICATOR);
+ } catch (HyracksDataException e) {
+ cleanUp();
+ throw e;
+ } catch (RuntimeException e) {
+ /**
+ * Need to check who throws RuntimeException?
+ */
+ cleanUp();
+ throw e;
+ }
+
+ }
+
+ @Override
+ public void end() throws HyracksDataException {
+ pagesToWrite.clear();
+ //if writing a trivial 1-page tree, don't try and propagate up
+ if (nodeFrontiers.size() > 1) {
+ propagateBulk(1, true, pagesToWrite);
+ }
+
+ for (ICachedPage c : pagesToWrite) {
+ queue.put(c);
+ }
+ finish();
+ super.end();
+ }
+
+ protected void finish() throws HyracksDataException {
+ int prevPageId = -1;
+ //here we assign physical identifiers to everything we can
+ for (NodeFrontier n : nodeFrontiers) {
+ //not a leaf
+ if (nodeFrontiers.indexOf(n) != 0) {
+ interiorFrame.setPage(n.page);
+ mbrTuple.resetByTupleOffset(mbr.array(), 0);
+ interiorFrame.insert(mbrTuple, -1);
+ interiorFrame.getBuffer().putInt(
+
interiorFrame.getTupleOffset(interiorFrame.getTupleCount() - 1) +
mbrTuple.getTupleSize(),
+ prevPageId);
+
+ int finalPageId = freePageManager.takePage(metaFrame);
+ n.pageId = finalPageId;
+ n.page.setDiskPageId(BufferedFileHandle.getDiskPageId(fileId,
finalPageId));
+ //else we are looking at a leaf
+ }
+ //set next guide MBR
+ //if propagateBulk didnt have to do anything this may be
un-necessary
+ if (nodeFrontiers.size() > 1 && nodeFrontiers.indexOf(n) <
nodeFrontiers.size() - 1) {
+ lowerFrame.setPage(n.page);
+ ((RTreeNSMFrame) lowerFrame).adjustMBR();
+ interiorFrameTupleWriter.writeTupleFields(((RTreeNSMFrame)
lowerFrame).getMBRTuples(), 0, mbr, 0);
+ }
+ queue.put(n.page);
+ n.page = null;
+ prevPageId = n.pageId;
+ }
+ rootPage = nodeFrontiers.get(nodeFrontiers.size() - 1).pageId;
+ releasedLatches = true;
+ }
+
+ protected void propagateBulk(int level, boolean toRoot, List<ICachedPage>
pagesToWrite)
+ throws HyracksDataException {
+
+ if (level == 1) {
+ lowerFrame = leafFrame;
+ }
+
+ if (lowerFrame.getTupleCount() == 0) {
+ return;
+ }
+
+ if (level >= nodeFrontiers.size()) {
+ addLevel();
+ }
+
+ //adjust the tuple pointers of the lower frame to allow us to
calculate our MBR
+ //if this is a leaf, then there is only one tuple, so this is trivial
+ ((RTreeNSMFrame) lowerFrame).adjustMBR();
+
+ if (mbr == null) {
+ int bytesRequired =
interiorFrameTupleWriter.bytesRequired(((RTreeNSMFrame)
lowerFrame).getMBRTuples()[0],
+ 0, cmp.getKeyFieldCount()) + ((RTreeNSMInteriorFrame)
interiorFrame).getChildPointerSize();
+ mbr = ByteBuffer.allocate(bytesRequired);
+ }
+ interiorFrameTupleWriter.writeTupleFields(((RTreeNSMFrame)
lowerFrame).getMBRTuples(), 0, mbr, 0);
+ mbrTuple.resetByTupleOffset(mbr.array(), 0);
+
+ NodeFrontier frontier = nodeFrontiers.get(level);
+ interiorFrame.setPage(frontier.page);
+ //see if we have space for two tuples. this works around a tricky
boundary condition with sequential bulk
+ // load where finalization can possibly lead to a split
+ //TODO: accomplish this without wasting 1 tuple
+ int sizeOfTwoTuples = 2 * (mbrTuple.getTupleSize() +
RTreeNSMInteriorFrame.childPtrSize);
+ FrameOpSpaceStatus spaceForTwoTuples = ((RTreeNSMInteriorFrame)
interiorFrame).hasSpaceInsert(sizeOfTwoTuples);
+ if (spaceForTwoTuples !=
FrameOpSpaceStatus.SUFFICIENT_CONTIGUOUS_SPACE && !toRoot) {
+
+ int finalPageId = freePageManager.takePage(metaFrame);
+ if (prevNodeFrontierPages.size() <= level) {
+ prevNodeFrontierPages.add(finalPageId);
+ } else {
+ prevNodeFrontierPages.set(level, finalPageId);
+ }
+
frontier.page.setDiskPageId(BufferedFileHandle.getDiskPageId(fileId,
finalPageId));
+ pagesToWrite.add(frontier.page);
+ lowerFrame = prevInteriorFrame;
+ lowerFrame.setPage(frontier.page);
+
+ frontier.page =
bufferCache.confiscatePage(BufferCache.INVALID_DPID);
+ interiorFrame.setPage(frontier.page);
+ interiorFrame.initBuffer((byte) level);
+
+ interiorFrame.insert(mbrTuple,
AbstractSlotManager.GREATEST_KEY_INDICATOR);
+
+ interiorFrame.getBuffer().putInt(
+ interiorFrame.getTupleOffset(interiorFrame.getTupleCount()
- 1) + mbrTuple.getTupleSize(),
+ prevNodeFrontierPages.get(level - 1));
+
+ propagateBulk(level + 1, toRoot, pagesToWrite);
+ } else if (interiorFrame.hasSpaceInsert(mbrTuple) ==
FrameOpSpaceStatus.SUFFICIENT_CONTIGUOUS_SPACE
+ && !toRoot) {
+
+ interiorFrame.insert(mbrTuple, -1);
+
+ interiorFrame.getBuffer().putInt(
+ interiorFrame.getTupleOffset(interiorFrame.getTupleCount()
- 1) + mbrTuple.getTupleSize(),
+ prevNodeFrontierPages.get(level - 1));
+ }
+
+ if (toRoot && level < nodeFrontiers.size() - 1) {
+ lowerFrame = prevInteriorFrame;
+ lowerFrame.setPage(frontier.page);
+ propagateBulk(level + 1, true, pagesToWrite);
+ }
+
+ leafFrame.setPage(nodeFrontiers.get(0).page);
+ }
+}
--
To view, visit https://asterix-gerrit.ics.uci.edu/2367
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I7f42a391a4de4b02acf6a8fdaf2b60818c1da806
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Wail Alkowaileet <[email protected]>