Luo Chen has submitted this change and it was merged. Change subject: [ASTERIXDB-1764][STO] Ensure LOAD follow same lifecycle with merge/flush ......................................................................
[ASTERIXDB-1764][STO] Ensure LOAD follow same lifecycle with merge/flush - user model changes: no - storage format changes: no - interface change: no Details: - Ensure ioOperationCallbacks are properly called for bulk loaded component - Add Load type to LSMIOOperationType to distinguish bulk loaded component from flush component - Change ILSMIOOperationCallback to use LSMIOOperationType instead of LSMOperationType, because this callback only targets at LSM IO operaitons Change-Id: Ib9ecf7292c5dbaf8638d159decc6e6faf79de58b Reviewed-on: https://asterix-gerrit.ics.uci.edu/2131 Sonar-Qube: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Contrib: Jenkins <[email protected]> Reviewed-by: abdullah alamoudi <[email protected]> Integration-Tests: Jenkins <[email protected]> --- M asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBtreeIoOpCallbackFactory.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java M asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMBTreeIOOperationCallbackTest.java M asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackTest.java M asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMInvertedIndexIOOperationCallbackTest.java M asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMRTreeIOOperationCallbackTest.java M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadNodePushable.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperationCallback.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AsynchronousScheduler.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/BlockingIOOperationCallbackWrapper.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ExternalIndexHarness.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/FlushOperation.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexDiskComponentBulkLoader.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MergeOperation.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpIOOperationCallbackFactory.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/StubIOOperationCallback.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/TracedIOOperation.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/ExternalRTree.java 23 files changed, 126 insertions(+), 109 deletions(-) Approvals: abdullah alamoudi: Looks good to me, approved Jenkins: Verified; No violations found; ; Verified diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBtreeIoOpCallbackFactory.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBtreeIoOpCallbackFactory.java index 327c66e..142bcc5 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBtreeIoOpCallbackFactory.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBtreeIoOpCallbackFactory.java @@ -20,10 +20,10 @@ import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; -import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType; import org.apache.hyracks.storage.am.lsm.common.impls.EmptyComponent; public class TestLsmBtreeIoOpCallbackFactory implements ILSMIOOperationCallbackFactory { @@ -95,18 +95,18 @@ } @Override - public void afterFinalize(LSMOperationType opType, ILSMDiskComponent newComponent) { + public void afterFinalize(LSMIOOperationType opType, ILSMDiskComponent newComponent) { super.afterFinalize(opType, newComponent); synchronized (INSTANCE) { if (newComponent != null) { if (newComponent == EmptyComponent.INSTANCE) { - if (opType == LSMOperationType.FLUSH) { + if (opType == LSMIOOperationType.FLUSH) { rollbackFlushes++; } else { rollbackMerges++; } } else { - if (opType == LSMOperationType.FLUSH) { + if (opType == LSMIOOperationType.FLUSH) { completedFlushes++; } else { completedMerges++; @@ -119,8 +119,8 @@ } } - private void recordFailure(LSMOperationType opType) { - if (opType == LSMOperationType.FLUSH) { + private void recordFailure(LSMIOOperationType opType) { + if (opType == LSMIOOperationType.FLUSH) { failedFlushes++; } else { failedMerges++; diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java index 5fcbac9..68f42e7 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java @@ -30,10 +30,10 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent; import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentId; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent; -import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType; import org.apache.hyracks.storage.am.lsm.common.impls.DiskComponentMetadata; import org.apache.hyracks.storage.am.lsm.common.impls.LSMDiskComponentId; import org.apache.hyracks.storage.am.lsm.common.util.ComponentUtils; @@ -67,8 +67,8 @@ } @Override - public void beforeOperation(LSMOperationType opType) throws HyracksDataException { - if (opType == LSMOperationType.FLUSH) { + public void beforeOperation(LSMIOOperationType opType) throws HyracksDataException { + if (opType == LSMIOOperationType.FLUSH) { /* * This method was called on the scheduleFlush operation. * We set the lastLSN to the last LSN for the index (the LSN for the flush log) @@ -87,9 +87,9 @@ } @Override - public void afterFinalize(LSMOperationType opType, ILSMDiskComponent newComponent) { + public void afterFinalize(LSMIOOperationType opType, ILSMDiskComponent newComponent) { // The operation was complete and the next I/O operation for the LSM index didn't start yet - if (opType == LSMOperationType.FLUSH && newComponent != null) { + if (opType == LSMIOOperationType.FLUSH && newComponent != null) { synchronized (this) { flushRequested[readIndex] = false; // if the component which just finished flushing is the component that will be modified next, @@ -183,13 +183,13 @@ } @Override - public void afterOperation(LSMOperationType opType, List<ILSMComponent> oldComponents, + public void afterOperation(LSMIOOperationType opType, List<ILSMComponent> oldComponents, ILSMDiskComponent newComponent) throws HyracksDataException { //TODO: Copying Filters and all content of the metadata pages for flush operation should be done here if (newComponent != null) { putLSNIntoMetadata(newComponent, oldComponents); putComponentIdIntoMetadata(newComponent, oldComponents); - if (opType == LSMOperationType.MERGE) { + if (opType == LSMIOOperationType.MERGE) { // In case of merge, oldComponents are never null LongPointable markerLsn = LongPointable.FACTORY.createPointable(ComponentUtils.getLong(oldComponents.get(0).getMetadata(), diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMBTreeIOOperationCallbackTest.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMBTreeIOOperationCallbackTest.java index 8d4cd51..d48227f 100644 --- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMBTreeIOOperationCallbackTest.java +++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMBTreeIOOperationCallbackTest.java @@ -21,8 +21,8 @@ import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; -import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType; import org.junit.Assert; import org.mockito.Mockito; @@ -38,17 +38,17 @@ //request to flush first component callback.updateLastLSN(1); - callback.beforeOperation(LSMOperationType.FLUSH); + callback.beforeOperation(LSMIOOperationType.FLUSH); //request to flush second component callback.updateLastLSN(2); - callback.beforeOperation(LSMOperationType.FLUSH); + callback.beforeOperation(LSMIOOperationType.FLUSH); Assert.assertEquals(1, callback.getComponentLSN(null)); - callback.afterFinalize(LSMOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class)); + callback.afterFinalize(LSMIOOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class)); Assert.assertEquals(2, callback.getComponentLSN(null)); - callback.afterFinalize(LSMOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class)); + callback.afterFinalize(LSMIOOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class)); } catch (Exception e) { Assert.fail(); } @@ -62,11 +62,11 @@ //request to flush first component callback.updateLastLSN(1); - callback.beforeOperation(LSMOperationType.FLUSH); + callback.beforeOperation(LSMIOOperationType.FLUSH); //request to flush second component callback.updateLastLSN(2); - callback.beforeOperation(LSMOperationType.FLUSH); + callback.beforeOperation(LSMIOOperationType.FLUSH); //request to flush first component again //this call should fail @@ -75,10 +75,10 @@ //the scheduleFlush request would fail this time Assert.assertEquals(1, callback.getComponentLSN(null)); - callback.afterFinalize(LSMOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class)); + callback.afterFinalize(LSMIOOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class)); Assert.assertEquals(2, callback.getComponentLSN(null)); - callback.afterFinalize(LSMOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class)); + callback.afterFinalize(LSMIOOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class)); } catch (Exception e) { Assert.fail(); } diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackTest.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackTest.java index ec1aba2..94ef0a3 100644 --- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackTest.java +++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackTest.java @@ -21,8 +21,8 @@ import org.apache.asterix.common.ioopcallbacks.LSMBTreeWithBuddyIOOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; -import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType; import org.junit.Assert; import org.mockito.Mockito; @@ -38,17 +38,17 @@ //request to flush first component callback.updateLastLSN(1); - callback.beforeOperation(LSMOperationType.FLUSH); + callback.beforeOperation(LSMIOOperationType.FLUSH); //request to flush second component callback.updateLastLSN(2); - callback.beforeOperation(LSMOperationType.FLUSH); + callback.beforeOperation(LSMIOOperationType.FLUSH); Assert.assertEquals(1, callback.getComponentLSN(null)); - callback.afterFinalize(LSMOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class)); + callback.afterFinalize(LSMIOOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class)); Assert.assertEquals(2, callback.getComponentLSN(null)); - callback.afterFinalize(LSMOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class)); + callback.afterFinalize(LSMIOOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class)); } catch (Exception e) { Assert.fail(); } @@ -62,11 +62,11 @@ //request to flush first component callback.updateLastLSN(1); - callback.beforeOperation(LSMOperationType.FLUSH); + callback.beforeOperation(LSMIOOperationType.FLUSH); //request to flush second component callback.updateLastLSN(2); - callback.beforeOperation(LSMOperationType.FLUSH); + callback.beforeOperation(LSMIOOperationType.FLUSH); //request to flush first component again //this call should fail @@ -75,10 +75,10 @@ //the scheduleFlush request would fail this time Assert.assertEquals(1, callback.getComponentLSN(null)); - callback.afterFinalize(LSMOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class)); + callback.afterFinalize(LSMIOOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class)); Assert.assertEquals(2, callback.getComponentLSN(null)); - callback.afterFinalize(LSMOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class)); + callback.afterFinalize(LSMIOOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class)); } catch (Exception e) { Assert.fail(); } diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMInvertedIndexIOOperationCallbackTest.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMInvertedIndexIOOperationCallbackTest.java index 1711bc2..b213da0 100644 --- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMInvertedIndexIOOperationCallbackTest.java +++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMInvertedIndexIOOperationCallbackTest.java @@ -21,8 +21,8 @@ import org.apache.asterix.common.ioopcallbacks.LSMInvertedIndexIOOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; -import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType; import org.junit.Assert; import org.mockito.Mockito; @@ -38,17 +38,17 @@ //request to flush first component callback.updateLastLSN(1); - callback.beforeOperation(LSMOperationType.FLUSH); + callback.beforeOperation(LSMIOOperationType.FLUSH); //request to flush second component callback.updateLastLSN(2); - callback.beforeOperation(LSMOperationType.FLUSH); + callback.beforeOperation(LSMIOOperationType.FLUSH); Assert.assertEquals(1, callback.getComponentLSN(null)); - callback.afterFinalize(LSMOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class)); + callback.afterFinalize(LSMIOOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class)); Assert.assertEquals(2, callback.getComponentLSN(null)); - callback.afterFinalize(LSMOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class)); + callback.afterFinalize(LSMIOOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class)); } catch (Exception e) { Assert.fail(); } @@ -62,11 +62,11 @@ //request to flush first component callback.updateLastLSN(1); - callback.beforeOperation(LSMOperationType.FLUSH); + callback.beforeOperation(LSMIOOperationType.FLUSH); //request to flush second component callback.updateLastLSN(2); - callback.beforeOperation(LSMOperationType.FLUSH); + callback.beforeOperation(LSMIOOperationType.FLUSH); //request to flush first component again //this call should fail @@ -75,10 +75,10 @@ //the scheduleFlush request would fail this time Assert.assertEquals(1, callback.getComponentLSN(null)); - callback.afterFinalize(LSMOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class)); + callback.afterFinalize(LSMIOOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class)); Assert.assertEquals(2, callback.getComponentLSN(null)); - callback.afterFinalize(LSMOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class)); + callback.afterFinalize(LSMIOOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class)); } catch (Exception e) { Assert.fail(); } diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMRTreeIOOperationCallbackTest.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMRTreeIOOperationCallbackTest.java index bb33f3d..df26ef9 100644 --- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMRTreeIOOperationCallbackTest.java +++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMRTreeIOOperationCallbackTest.java @@ -21,8 +21,8 @@ import org.apache.asterix.common.ioopcallbacks.LSMRTreeIOOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; -import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType; import org.junit.Assert; import org.mockito.Mockito; @@ -38,17 +38,17 @@ //request to flush first component callback.updateLastLSN(1); - callback.beforeOperation(LSMOperationType.FLUSH); + callback.beforeOperation(LSMIOOperationType.FLUSH); //request to flush second component callback.updateLastLSN(2); - callback.beforeOperation(LSMOperationType.FLUSH); + callback.beforeOperation(LSMIOOperationType.FLUSH); Assert.assertEquals(1, callback.getComponentLSN(null)); - callback.afterFinalize(LSMOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class)); + callback.afterFinalize(LSMIOOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class)); Assert.assertEquals(2, callback.getComponentLSN(null)); - callback.afterFinalize(LSMOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class)); + callback.afterFinalize(LSMIOOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class)); } catch (Exception e) { Assert.fail(); } @@ -62,11 +62,11 @@ //request to flush first component callback.updateLastLSN(1); - callback.beforeOperation(LSMOperationType.FLUSH); + callback.beforeOperation(LSMIOOperationType.FLUSH); //request to flush second component callback.updateLastLSN(2); - callback.beforeOperation(LSMOperationType.FLUSH); + callback.beforeOperation(LSMIOOperationType.FLUSH); //request to flush first component again //this call should fail @@ -75,10 +75,10 @@ //the scheduleFlush request would fail this time Assert.assertEquals(1, callback.getComponentLSN(null)); - callback.afterFinalize(LSMOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class)); + callback.afterFinalize(LSMIOOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class)); Assert.assertEquals(2, callback.getComponentLSN(null)); - callback.afterFinalize(LSMOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class)); + callback.afterFinalize(LSMIOOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class)); } catch (Exception e) { Assert.fail(); } diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadNodePushable.java index cb81b64..cee20ce 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadNodePushable.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadNodePushable.java @@ -34,8 +34,8 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentBulkLoader; import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentId; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; -import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType; import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex; /** @@ -225,7 +225,8 @@ for (int i = diskComponents.length - 1; i >= 0; i--) { // start from the oldest component to the newest component if (diskComponents[i] != null && diskComponents[i].getComponentSize() > 0) { - secondaryIndex.getIOOperationCallback().afterOperation(LSMOperationType.FLUSH, null, diskComponents[i]); + secondaryIndex.getIOOperationCallback().afterOperation(LSMIOOperationType.FLUSH, null, + diskComponents[i]); // setting component id has to be place between afterOperation and addBulkLoadedComponent, // since afterOperation would set a flush component id (but it's not invalid) diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java index 6a092e3..7dc5939 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java @@ -41,6 +41,7 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentBulkLoader; import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentFactory; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler; @@ -50,7 +51,6 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicy; import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker; import org.apache.hyracks.storage.am.lsm.common.api.ITwoPCIndex; -import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType; import org.apache.hyracks.storage.am.lsm.common.impls.BlockingIOOperationCallbackWrapper; import org.apache.hyracks.storage.am.lsm.common.impls.ExternalIndexHarness; import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences; @@ -288,7 +288,7 @@ } if (flushOnExit) { BlockingIOOperationCallbackWrapper cb = new BlockingIOOperationCallbackWrapper(ioOpCallback); - cb.afterFinalize(LSMOperationType.FLUSH, null); + cb.afterFinalize(LSMIOOperationType.FLUSH, null); } for (ILSMDiskComponent c : diskComponents) { c.deactivateAndPurge(); diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java index 9740631..c495b69 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java @@ -44,6 +44,7 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentBulkLoader; import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentFactory; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler; @@ -53,7 +54,6 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicy; import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker; import org.apache.hyracks.storage.am.lsm.common.api.ITwoPCIndex; -import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType; import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex; import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexOperationContext; import org.apache.hyracks.storage.am.lsm.common.impls.ExternalIndexHarness; @@ -432,7 +432,7 @@ throw new HyracksDataException("Failed to deactivate the index since it is already deactivated."); } if (flushOnExit) { - ioOpCallback.afterFinalize(LSMOperationType.FLUSH, null); + ioOpCallback.afterFinalize(LSMIOOperationType.FLUSH, null); } // Even though, we deactivate the index, we don't exit components or // modify any of the lists to make sure they diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java index c2ae786..ff32628 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java @@ -29,9 +29,10 @@ /** * Represents the io operation type */ - enum LSMIOOpertionType { + enum LSMIOOperationType { FLUSH, - MERGE + MERGE, + LOAD } /** @@ -52,7 +53,7 @@ /** * @return the operation type */ - LSMIOOpertionType getIOOpertionType(); + LSMIOOperationType getIOOpertionType(); @Override Boolean call() throws HyracksDataException; diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperationCallback.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperationCallback.java index 0323026..e122fd4 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperationCallback.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperationCallback.java @@ -21,6 +21,7 @@ import java.util.List; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType; public interface ILSMIOOperationCallback { @@ -29,7 +30,7 @@ * (i.e. IO operations could be flush or merge operations.) * For flush, this is called immediately before switching the current memory component pointer */ - void beforeOperation(LSMOperationType opType) throws HyracksDataException; + void beforeOperation(LSMIOOperationType opType) throws HyracksDataException; /** * This method is called on an IO operation sometime after the operation was completed. @@ -42,7 +43,7 @@ * @param newComponent * @throws HyracksDataException */ - void afterOperation(LSMOperationType opType, List<ILSMComponent> oldComponents, ILSMDiskComponent newComponent) + void afterOperation(LSMIOOperationType opType, List<ILSMComponent> oldComponents, ILSMDiskComponent newComponent) throws HyracksDataException; /** @@ -53,7 +54,7 @@ * @param newComponent * @throws HyracksDataException */ - void afterFinalize(LSMOperationType opType, ILSMDiskComponent newComponent) throws HyracksDataException; + void afterFinalize(LSMIOOperationType opType, ILSMDiskComponent newComponent) throws HyracksDataException; /** * This method is called when a memory component is recycled diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java index 396cb77..2b2fe0d 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java @@ -48,6 +48,7 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentFactory; import org.apache.hyracks.storage.am.lsm.common.api.ILSMHarness; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler; @@ -397,6 +398,7 @@ public IIndexBulkLoader createBulkLoader(float fillLevel, boolean verifyInput, long numElementsHint) throws HyracksDataException { + ioOpCallback.beforeOperation(LSMIOOperationType.LOAD); return new LSMIndexDiskComponentBulkLoader(this, fillLevel, verifyInput, numElementsHint); } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AsynchronousScheduler.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AsynchronousScheduler.java index 438bb0b..a439ace 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AsynchronousScheduler.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AsynchronousScheduler.java @@ -31,7 +31,7 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOpertionType; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler; public class AsynchronousScheduler implements ILSMIOOperationScheduler { @@ -60,7 +60,7 @@ super.afterExecute(r, t); LSMIOOperationTask<Boolean> task = (LSMIOOperationTask<Boolean>) r; ILSMIOOperation executedOp = task.getOperation(); - if (executedOp.getIOOpertionType() == LSMIOOpertionType.FLUSH) { + if (executedOp.getIOOpertionType() == LSMIOOperationType.FLUSH) { String id = executedOp.getIndexIdentifier(); synchronized (this) { runningFlushOperations.remove(id); @@ -84,9 +84,9 @@ @Override public void scheduleOperation(ILSMIOOperation operation) throws HyracksDataException { - if (operation.getIOOpertionType() == LSMIOOpertionType.MERGE) { + if (operation.getIOOpertionType() == LSMIOOperationType.MERGE) { executor.submit(operation); - } else if (operation.getIOOpertionType() == LSMIOOpertionType.FLUSH) { + } else if (operation.getIOOpertionType() == LSMIOOperationType.FLUSH) { String id = operation.getIndexIdentifier(); synchronized (executor) { if (runningFlushOperations.containsKey(id)) { diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/BlockingIOOperationCallbackWrapper.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/BlockingIOOperationCallbackWrapper.java index 8d0395f..12dbb46 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/BlockingIOOperationCallbackWrapper.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/BlockingIOOperationCallbackWrapper.java @@ -23,9 +23,9 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent; import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent; -import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType; public class BlockingIOOperationCallbackWrapper implements ILSMIOOperationCallback { @@ -45,18 +45,18 @@ } @Override - public void beforeOperation(LSMOperationType opType) throws HyracksDataException { + public void beforeOperation(LSMIOOperationType opType) throws HyracksDataException { wrappedCallback.beforeOperation(opType); } @Override - public void afterOperation(LSMOperationType opType, List<ILSMComponent> oldComponents, + public void afterOperation(LSMIOOperationType opType, List<ILSMComponent> oldComponents, ILSMDiskComponent newComponent) throws HyracksDataException { wrappedCallback.afterOperation(opType, oldComponents, newComponent); } @Override - public synchronized void afterFinalize(LSMOperationType opType, ILSMDiskComponent newComponent) + public synchronized void afterFinalize(LSMIOOperationType opType, ILSMDiskComponent newComponent) throws HyracksDataException { wrappedCallback.afterFinalize(opType, newComponent); notifyAll(); diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ExternalIndexHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ExternalIndexHarness.java index 2f4dcc2..20f9f6a 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ExternalIndexHarness.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ExternalIndexHarness.java @@ -30,6 +30,7 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent; import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext; @@ -106,7 +107,7 @@ // Check if there is any action that is needed to be taken based on the operation type switch (opType) { case MERGE: - lsmIndex.getIOOperationCallback().beforeOperation(LSMOperationType.MERGE); + lsmIndex.getIOOperationCallback().beforeOperation(LSMIOOperationType.MERGE); default: break; } @@ -207,7 +208,7 @@ public void scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) throws HyracksDataException { if (!getAndEnterComponents(ctx, LSMOperationType.MERGE, true)) { - callback.afterFinalize(LSMOperationType.MERGE, null); + callback.afterFinalize(LSMIOOperationType.MERGE, null); return; } lsmIndex.scheduleMerge(ctx, callback); @@ -220,7 +221,7 @@ if (!getAndEnterComponents(ctx, LSMOperationType.MERGE, true)) { // If the merge cannot be scheduled because there is already an ongoing merge on subset/all of the components, then // whenever the current merge has finished, it will schedule the full merge again. - callback.afterFinalize(LSMOperationType.MERGE, null); + callback.afterFinalize(LSMIOOperationType.MERGE, null); return; } fullMergeIsRequested.set(false); @@ -236,11 +237,11 @@ ILSMDiskComponent newComponent = null; try { newComponent = lsmIndex.merge(operation); - operation.getCallback().afterOperation(LSMOperationType.MERGE, ctx.getComponentHolder(), newComponent); + operation.getCallback().afterOperation(LSMIOOperationType.MERGE, ctx.getComponentHolder(), newComponent); newComponent.markAsValid(lsmIndex.isDurable()); } finally { exitComponents(ctx, LSMOperationType.MERGE, newComponent, false); - operation.getCallback().afterFinalize(LSMOperationType.MERGE, newComponent); + operation.getCallback().afterFinalize(LSMIOOperationType.MERGE, newComponent); } if (LOGGER.isLoggable(Level.INFO)) { LOGGER.info("Finished the merge operation for index: " + lsmIndex); @@ -300,7 +301,7 @@ @Override public void scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) throws HyracksDataException { - callback.afterFinalize(LSMOperationType.FLUSH, null); + callback.afterFinalize(LSMIOOperationType.FLUSH, null); } @Override diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/FlushOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/FlushOperation.java index 7b7f950..e809925 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/FlushOperation.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/FlushOperation.java @@ -60,8 +60,8 @@ } @Override - public LSMIOOpertionType getIOOpertionType() { - return LSMIOOpertionType.FLUSH; + public LSMIOOperationType getIOOpertionType() { + return LSMIOOperationType.FLUSH; } @Override diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java index 00a29bc..48b6d8f 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java @@ -44,6 +44,7 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; import org.apache.hyracks.storage.am.lsm.common.api.ILSMHarness; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor; @@ -193,7 +194,7 @@ // Check if there is any action that is needed to be taken based on the operation type switch (opType) { case FLUSH: - lsmIndex.getIOOperationCallback().beforeOperation(LSMOperationType.FLUSH); + lsmIndex.getIOOperationCallback().beforeOperation(LSMIOOperationType.FLUSH); // Changing the flush status should *always* precede changing the mutable component. lsmIndex.changeFlushStatusForCurrentMutableCompoent(false); lsmIndex.changeMutableComponent(); @@ -202,7 +203,7 @@ opTracker.notifyAll(); break; case MERGE: - lsmIndex.getIOOperationCallback().beforeOperation(LSMOperationType.MERGE); + lsmIndex.getIOOperationCallback().beforeOperation(LSMIOOperationType.MERGE); break; default: break; @@ -504,7 +505,7 @@ public void scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) throws HyracksDataException { if (!getAndEnterComponents(ctx, LSMOperationType.FLUSH, true)) { - callback.afterFinalize(LSMOperationType.FLUSH, null); + callback.afterFinalize(LSMIOOperationType.FLUSH, null); return; } lsmIndex.scheduleFlush(ctx, callback); @@ -520,7 +521,7 @@ boolean failedOperation = false; try { newComponent = lsmIndex.flush(operation); - operation.getCallback().afterOperation(LSMOperationType.FLUSH, null, newComponent); + operation.getCallback().afterOperation(LSMIOOperationType.FLUSH, null, newComponent); newComponent.markAsValid(lsmIndex.isDurable()); } catch (Throwable e) { failedOperation = true; @@ -530,7 +531,7 @@ throw e; } finally { exitComponents(ctx, LSMOperationType.FLUSH, newComponent, failedOperation); - operation.getCallback().afterFinalize(LSMOperationType.FLUSH, newComponent); + operation.getCallback().afterFinalize(LSMIOOperationType.FLUSH, newComponent); } if (LOGGER.isLoggable(Level.INFO)) { LOGGER.info("Finished the flush operation for index: " + lsmIndex); @@ -541,7 +542,7 @@ public void scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) throws HyracksDataException { if (!getAndEnterComponents(ctx, LSMOperationType.MERGE, true)) { - callback.afterFinalize(LSMOperationType.MERGE, null); + callback.afterFinalize(LSMIOOperationType.MERGE, null); return; } lsmIndex.scheduleMerge(ctx, callback); @@ -554,7 +555,7 @@ if (!getAndEnterComponents(ctx, LSMOperationType.MERGE, true)) { // If the merge cannot be scheduled because there is already an ongoing merge on subset/all of the components, then // whenever the current merge has finished, it will schedule the full merge again. - callback.afterFinalize(LSMOperationType.MERGE, null); + callback.afterFinalize(LSMIOOperationType.MERGE, null); return; } fullMergeIsRequested.set(false); @@ -571,7 +572,7 @@ boolean failedOperation = false; try { newComponent = lsmIndex.merge(operation); - operation.getCallback().afterOperation(LSMOperationType.MERGE, ctx.getComponentHolder(), newComponent); + operation.getCallback().afterOperation(LSMIOOperationType.MERGE, ctx.getComponentHolder(), newComponent); newComponent.markAsValid(lsmIndex.isDurable()); } catch (Throwable e) { failedOperation = true; @@ -596,7 +597,7 @@ // 8. completeOperation (decrement the numOfIoOperations) opTracker.completeOperation(lsmIndex, LSMOperationType.MERGE, ctx.getSearchOperationCallback(), ctx.getModificationCallback()); - operation.getCallback().afterFinalize(LSMOperationType.MERGE, newComponent); + operation.getCallback().afterFinalize(LSMIOOperationType.MERGE, newComponent); } if (LOGGER.isLoggable(Level.INFO)) { LOGGER.info("Finished the merge operation for index: " + lsmIndex); diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexDiskComponentBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexDiskComponentBulkLoader.java index 8befee1..08b8bb6 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexDiskComponentBulkLoader.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexDiskComponentBulkLoader.java @@ -21,7 +21,7 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; -import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType; import org.apache.hyracks.storage.common.IIndexBulkLoader; public class LSMIndexDiskComponentBulkLoader implements IIndexBulkLoader { @@ -46,19 +46,28 @@ @Override public void end() throws HyracksDataException { - componentBulkLoader.end(); - if (component.getComponentSize() > 0) { - //TODO(amoudi): Ensure Bulk load follow the same lifecycle Other Operations (Flush, Merge, etc). - //then after operation should be called from harness as well - //https://issues.apache.org/jira/browse/ASTERIXDB-1764 - lsmIndex.getIOOperationCallback().afterOperation(LSMOperationType.FLUSH, null, component); - lsmIndex.getLsmHarness().addBulkLoadedComponent(component); + try { + componentBulkLoader.end(); + if (component.getComponentSize() > 0) { + //TODO(amoudi): Ensure Bulk load follow the same lifecycle Other Operations (Flush, Merge, etc). + //then after operation should be called from harness as well + //https://issues.apache.org/jira/browse/ASTERIXDB-1764 + lsmIndex.getIOOperationCallback().afterOperation(LSMIOOperationType.LOAD, null, component); + lsmIndex.getLsmHarness().addBulkLoadedComponent(component); + } + } finally { + lsmIndex.getIOOperationCallback().afterFinalize(LSMIOOperationType.LOAD, component); } } @Override public void abort() throws HyracksDataException { - componentBulkLoader.abort(); + try { + componentBulkLoader.abort(); + lsmIndex.getIOOperationCallback().afterOperation(LSMIOOperationType.LOAD, null, null); + } finally { + lsmIndex.getIOOperationCallback().afterFinalize(LSMIOOperationType.LOAD, null); + } } } \ No newline at end of file diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MergeOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MergeOperation.java index c83d534..e16da5b 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MergeOperation.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MergeOperation.java @@ -47,8 +47,8 @@ } @Override - public LSMIOOpertionType getIOOpertionType() { - return LSMIOOpertionType.MERGE; + public LSMIOOperationType getIOOpertionType() { + return LSMIOOperationType.MERGE; } public IIndexCursor getCursor() { diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpIOOperationCallbackFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpIOOperationCallbackFactory.java index da8bc46..09ca553 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpIOOperationCallbackFactory.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpIOOperationCallbackFactory.java @@ -23,11 +23,11 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent; import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent; -import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType; public enum NoOpIOOperationCallbackFactory implements ILSMIOOperationCallbackFactory { INSTANCE; @@ -44,18 +44,19 @@ } @Override - public void beforeOperation(LSMOperationType opType) throws HyracksDataException { + public void beforeOperation(LSMIOOperationType opType) throws HyracksDataException { // Do nothing. } @Override - public void afterOperation(LSMOperationType opType, List<ILSMComponent> oldComponents, + public void afterOperation(LSMIOOperationType opType, List<ILSMComponent> oldComponents, ILSMDiskComponent newComponent) throws HyracksDataException { // Do nothing. } @Override - public void afterFinalize(LSMOperationType opType, ILSMDiskComponent newComponent) throws HyracksDataException { + public void afterFinalize(LSMIOOperationType opType, ILSMDiskComponent newComponent) + throws HyracksDataException { // Do nothing. } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/StubIOOperationCallback.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/StubIOOperationCallback.java index 0f3c90f..238e915 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/StubIOOperationCallback.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/StubIOOperationCallback.java @@ -23,9 +23,9 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent; import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent; -import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType; /** * This class is for testing. It's basically a way to get the new/old component info from the @@ -38,19 +38,19 @@ private ILSMDiskComponent newComponent = null; @Override - public void beforeOperation(LSMOperationType opType) throws HyracksDataException { + public void beforeOperation(LSMIOOperationType opType) throws HyracksDataException { // Not interested in this } @Override - public void afterOperation(LSMOperationType opType, List<ILSMComponent> oldComponents, + public void afterOperation(LSMIOOperationType opType, List<ILSMComponent> oldComponents, ILSMDiskComponent newComponent) throws HyracksDataException { this.oldComponents = oldComponents; this.newComponent = newComponent; } @Override - public synchronized void afterFinalize(LSMOperationType opType, ILSMDiskComponent newComponent) + public synchronized void afterFinalize(LSMIOOperationType opType, ILSMDiskComponent newComponent) throws HyracksDataException { // Redundant info from after } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/TracedIOOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/TracedIOOperation.java index 88f3231..4cfc3b6 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/TracedIOOperation.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/TracedIOOperation.java @@ -35,7 +35,7 @@ static final Logger LOGGER = Logger.getLogger(TracedIOOperation.class.getName()); protected final ILSMIOOperation ioOp; - private final LSMIOOpertionType ioOpType; + private final LSMIOOperationType ioOpType; private final ITracer tracer; private final long traceCategory; @@ -80,7 +80,7 @@ } @Override - public LSMIOOpertionType getIOOpertionType() { + public LSMIOOperationType getIOOpertionType() { return ioOpType; } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/ExternalRTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/ExternalRTree.java index 0f606ed..ca24b13 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/ExternalRTree.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/ExternalRTree.java @@ -41,6 +41,7 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentFactory; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler; @@ -50,7 +51,6 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicy; import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker; import org.apache.hyracks.storage.am.lsm.common.api.ITwoPCIndex; -import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType; import org.apache.hyracks.storage.am.lsm.common.impls.BlockingIOOperationCallbackWrapper; import org.apache.hyracks.storage.am.lsm.common.impls.ExternalIndexHarness; import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences; @@ -328,7 +328,7 @@ if (flushOnExit) { BlockingIOOperationCallbackWrapper cb = new BlockingIOOperationCallbackWrapper(ioOpCallback); - cb.afterFinalize(LSMOperationType.FLUSH, null); + cb.afterFinalize(LSMIOOperationType.FLUSH, null); } for (ILSMDiskComponent c : diskComponents) { -- To view, visit https://asterix-gerrit.ics.uci.edu/2131 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: Ib9ecf7292c5dbaf8638d159decc6e6faf79de58b Gerrit-PatchSet: 4 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Luo Chen <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Luo Chen <[email protected]> Gerrit-Reviewer: abdullah alamoudi <[email protected]>
