Repository: asterixdb Updated Branches: refs/heads/master 17ad87595 -> 517e1d27b
[NO ISSUE][STO] Improve the LSMIOOperationCallback interface. - user model changes: no - storage format changes: no - interface changes: yes + ILSMIndexOperationContext.getIoOperationType() + ILSMIndexOperationContext.getNewComponent() * before, after, and finalize calls of ILSMIOOperationCallback now take ILSMIndexOperationContext as a parameter Details: - Before, some calls to ILSMIOOperationCallback take just an enum LSMIOOperationType, some of them take an enum and a component object. These sometimes don't provide enough information to different implementations of the callback that might be interested in more than that. - Having the operation context object passed allow for better exchange of information between different callers and callees throughout the IO operation. Change-Id: Ib7120c40a1a2256ed528dfd2e5853db9dba247c6 Reviewed-on: https://asterix-gerrit.ics.uci.edu/2455 Sonar-Qube: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Contrib: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Reviewed-by: abdullah alamoudi <bamou...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/517e1d27 Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/517e1d27 Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/517e1d27 Branch: refs/heads/master Commit: 517e1d27b6baaddec4ad6b01d10ffc8d9ac370b0 Parents: 17ad875 Author: Abdullah Alamoudi <bamou...@gmail.com> Authored: Mon Mar 5 20:34:55 2018 -0800 Committer: abdullah alamoudi <bamou...@gmail.com> Committed: Mon Mar 5 23:58:10 2018 -0800 ---------------------------------------------------------------------- .../TestLsmBtreeIoOpCallbackFactory.java | 29 ++- .../AbstractLSMIOOperationCallback.java | 53 +++--- .../AbstractLSMIOOperationCallbackTest.java | 112 +++++++----- .../TestLSMIndexOperationContext.java | 177 +++++++++++++++++++ .../am/lsm/btree/impls/ExternalBTree.java | 6 +- .../lsm/btree/impls/ExternalBTreeWithBuddy.java | 5 +- .../am/lsm/common/api/ILSMIOOperation.java | 3 +- .../lsm/common/api/ILSMIOOperationCallback.java | 15 +- .../common/api/ILSMIndexOperationContext.java | 25 +++ .../am/lsm/common/impls/AbstractLSMIndex.java | 7 +- .../impls/AbstractLSMIndexOperationContext.java | 23 +++ .../BlockingIOOperationCallbackWrapper.java | 20 +-- .../impls/ComponentReplacementContext.java | 21 +++ .../lsm/common/impls/ExternalIndexHarness.java | 18 +- .../storage/am/lsm/common/impls/LSMHarness.java | 29 +-- .../impls/LSMIndexDiskComponentBulkLoader.java | 28 +-- .../impls/NoOpIOOperationCallbackFactory.java | 14 +- .../common/impls/StubIOOperationCallback.java | 23 +-- .../am/lsm/rtree/impls/ExternalRTree.java | 8 +- 19 files changed, 442 insertions(+), 174 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/asterixdb/blob/517e1d27/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBtreeIoOpCallbackFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBtreeIoOpCallbackFactory.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBtreeIoOpCallbackFactory.java index c69ffe5..5852ad9 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBtreeIoOpCallbackFactory.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBtreeIoOpCallbackFactory.java @@ -18,20 +18,17 @@ */ package org.apache.asterix.test.dataflow; -import java.util.List; - import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallback; import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory; import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.storage.am.lsm.btree.impl.TestLsmBtree; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGeneratorFactory; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext; import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent; import org.apache.hyracks.storage.am.lsm.common.impls.EmptyComponent; @@ -108,42 +105,40 @@ public class TestLsmBtreeIoOpCallbackFactory extends LSMBTreeIOOperationCallback } @Override - public void beforeOperation(LSMIOOperationType opType) throws HyracksDataException { + public void beforeOperation(ILSMIndexOperationContext opCtx) throws HyracksDataException { lsmBtree.beforeIoOperationCalled(); - super.beforeOperation(opType); + super.beforeOperation(opCtx); lsmBtree.beforeIoOperationReturned(); } @Override - public void afterOperation(LSMIOOperationType opType, List<ILSMComponent> oldComponents, - ILSMDiskComponent newComponent) throws HyracksDataException { + public void afterOperation(ILSMIndexOperationContext opCtx) throws HyracksDataException { lsmBtree.afterIoOperationCalled(); - super.afterOperation(opType, oldComponents, newComponent); + super.afterOperation(opCtx); lsmBtree.afterIoOperationReturned(); } @Override - public void afterFinalize(LSMIOOperationType opType, ILSMDiskComponent newComponent) - throws HyracksDataException { + public void afterFinalize(ILSMIndexOperationContext opCtx) throws HyracksDataException { lsmBtree.afterIoFinalizeCalled(); - super.afterFinalize(opType, newComponent); + super.afterFinalize(opCtx); synchronized (TestLsmBtreeIoOpCallbackFactory.this) { - if (newComponent != null) { - if (newComponent == EmptyComponent.INSTANCE) { - if (opType == LSMIOOperationType.FLUSH) { + if (opCtx.getNewComponent() != null) { + if (opCtx.getNewComponent() == EmptyComponent.INSTANCE) { + if (opCtx.getIoOperationType() == LSMIOOperationType.FLUSH) { rollbackFlushes++; } else { rollbackMerges++; } } else { - if (opType == LSMIOOperationType.FLUSH) { + if (opCtx.getIoOperationType() == LSMIOOperationType.FLUSH) { completedFlushes++; } else { completedMerges++; } } } else { - recordFailure(opType); + recordFailure(opCtx.getIoOperationType()); } TestLsmBtreeIoOpCallbackFactory.this.notifyAll(); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/517e1d27/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java index 412981c..b9f0cc7 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java @@ -37,6 +37,7 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext; import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent; import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager; import org.apache.hyracks.storage.am.lsm.common.impls.DiskComponentMetadata; @@ -90,8 +91,8 @@ public abstract class AbstractLSMIOOperationCallback implements ILSMIOOperationC } @Override - public void beforeOperation(LSMIOOperationType opType) throws HyracksDataException { - if (opType == LSMIOOperationType.FLUSH) { + public void beforeOperation(ILSMIndexOperationContext opCtx) throws HyracksDataException { + if (opCtx.getIoOperationType() == LSMIOOperationType.FLUSH) { /* * This method was called on the scheduleFlush operation. * We set the lastLSN to the last LSN for the index (the LSN for the flush log) @@ -111,25 +112,25 @@ public abstract class AbstractLSMIOOperationCallback implements ILSMIOOperationC } @Override - public void afterOperation(LSMIOOperationType opType, List<ILSMComponent> oldComponents, - ILSMDiskComponent newComponent) throws HyracksDataException { + public void afterOperation(ILSMIndexOperationContext opCtx) throws HyracksDataException { //TODO: Copying Filters and all content of the metadata pages for flush operation should be done here - if (newComponent == null) { + if (opCtx.getNewComponent() == null) { // failed operation. Nothing to do. return; } - putLSNIntoMetadata(newComponent, oldComponents); - putComponentIdIntoMetadata(opType, newComponent, oldComponents); - componentLsnMap.put(newComponent.getId(), getComponentLSN(oldComponents)); - if (opType == LSMIOOperationType.MERGE) { - if (oldComponents == null) { + putLSNIntoMetadata(opCtx.getNewComponent(), opCtx.getComponentsToBeMerged()); + putComponentIdIntoMetadata(opCtx.getIoOperationType(), opCtx.getNewComponent(), + opCtx.getComponentsToBeMerged()); + componentLsnMap.put(opCtx.getNewComponent().getId(), getComponentLSN(opCtx.getComponentsToBeMerged())); + if (opCtx.getIoOperationType() == LSMIOOperationType.MERGE) { + if (opCtx.getComponentsToBeMerged().isEmpty()) { throw new IllegalStateException("Merge must have old components"); } - LongPointable markerLsn = - LongPointable.FACTORY.createPointable(ComponentUtils.getLong(oldComponents.get(0).getMetadata(), + LongPointable markerLsn = LongPointable.FACTORY + .createPointable(ComponentUtils.getLong(opCtx.getComponentsToBeMerged().get(0).getMetadata(), ComponentUtils.MARKER_LSN_KEY, ComponentUtils.NOT_FOUND)); - newComponent.getMetadata().put(ComponentUtils.MARKER_LSN_KEY, markerLsn); - } else if (opType == LSMIOOperationType.FLUSH) { + opCtx.getNewComponent().getMetadata().put(ComponentUtils.MARKER_LSN_KEY, markerLsn); + } else if (opCtx.getIoOperationType() == LSMIOOperationType.FLUSH) { // advance memory component indexes synchronized (this) { // we've already consumed the specified LSN/component id. @@ -146,17 +147,18 @@ public abstract class AbstractLSMIOOperationCallback implements ILSMIOOperationC } @Override - public void afterFinalize(LSMIOOperationType opType, ILSMDiskComponent newComponent) throws HyracksDataException { + public void afterFinalize(ILSMIndexOperationContext opCtx) throws HyracksDataException { // The operation was complete and the next I/O operation for the LSM index didn't start yet - if (opType == LSMIOOperationType.FLUSH) { + if (opCtx.getIoOperationType() == LSMIOOperationType.FLUSH) { hasFlushed = true; - if (newComponent != null) { - final Long lsn = componentLsnMap.remove(newComponent.getId()); + if (opCtx.getNewComponent() != null) { + final Long lsn = componentLsnMap.remove(opCtx.getNewComponent().getId()); if (lsn == null) { - throw new IllegalStateException("Unidentified flushed component: " + newComponent); + throw new IllegalStateException("Unidentified flushed component: " + opCtx.getNewComponent()); } // empty component doesn't have any files - final Optional<String> componentFile = newComponent.getLSMComponentPhysicalFiles().stream().findAny(); + final Optional<String> componentFile = + opCtx.getNewComponent().getLSMComponentPhysicalFiles().stream().findAny(); if (componentFile.isPresent()) { final ResourceReference ref = ResourceReference.of(componentFile.get()); final String componentEndTime = AbstractLSMIndexFileManager.getComponentEndTime(ref.getName()); @@ -166,7 +168,7 @@ public abstract class AbstractLSMIOOperationCallback implements ILSMIOOperationC } } - private void putLSNIntoMetadata(ILSMDiskComponent newComponent, List<ILSMComponent> oldComponents) + private void putLSNIntoMetadata(ILSMDiskComponent newComponent, List<? extends ILSMComponent> oldComponents) throws HyracksDataException { newComponent.getMetadata().put(LSN_KEY, LongPointable.FACTORY.createPointable(getComponentLSN(oldComponents))); } @@ -178,8 +180,9 @@ public abstract class AbstractLSMIOOperationCallback implements ILSMIOOperationC return pointable.getLength() == 0 ? INVALID : pointable.longValue(); } - private ILSMComponentId getMergedComponentId(List<ILSMComponent> mergedComponents) throws HyracksDataException { - if (mergedComponents == null || mergedComponents.isEmpty()) { + private ILSMComponentId getMergedComponentId(List<? extends ILSMComponent> mergedComponents) + throws HyracksDataException { + if (mergedComponents.isEmpty()) { return null; } return LSMComponentIdUtils.union(mergedComponents.get(0).getId(), @@ -188,7 +191,7 @@ public abstract class AbstractLSMIOOperationCallback implements ILSMIOOperationC } private void putComponentIdIntoMetadata(LSMIOOperationType opType, ILSMDiskComponent newComponent, - List<ILSMComponent> oldComponents) throws HyracksDataException { + List<? extends ILSMComponent> oldComponents) throws HyracksDataException { // the id of flushed component is set when we copy the metadata of the memory component if (opType == LSMIOOperationType.MERGE) { ILSMComponentId componentId = getMergedComponentId(oldComponents); @@ -242,7 +245,7 @@ public abstract class AbstractLSMIOOperationCallback implements ILSMIOOperationC } public long getComponentLSN(List<? extends ILSMComponent> diskComponents) throws HyracksDataException { - if (diskComponents == null) { + if (diskComponents.isEmpty()) { // Implies a flush IO operation. --> moves the flush pointer // Flush operation of an LSM index are executed sequentially. synchronized (this) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/517e1d27/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/AbstractLSMIOOperationCallbackTest.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/AbstractLSMIOOperationCallbackTest.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/AbstractLSMIOOperationCallbackTest.java index 2ab5b4e..c03af40 100644 --- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/AbstractLSMIOOperationCallbackTest.java +++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/AbstractLSMIOOperationCallbackTest.java @@ -19,6 +19,8 @@ package org.apache.asterix.test.ioopcallbacks; +import java.util.Collections; + import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback; import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallback; import org.apache.asterix.common.storage.IIndexCheckpointManager; @@ -29,6 +31,7 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator; import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext; import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent; import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMMemoryComponent; import org.apache.hyracks.storage.am.lsm.common.impls.DiskComponentMetadata; @@ -49,24 +52,30 @@ public abstract class AbstractLSMIOOperationCallbackTest extends TestCase { Mockito.when(mockIndex.getCurrentMemoryComponent()).thenReturn(Mockito.mock(AbstractLSMMemoryComponent.class)); LSMBTreeIOOperationCallback callback = new LSMBTreeIOOperationCallback(mockIndex, new LSMComponentIdGenerator(), mockIndexCheckpointManagerProvider()); + ILSMIndexOperationContext firstOpCtx = new TestLSMIndexOperationContext(mockIndex); //request to flush first component callback.updateLastLSN(1); - callback.beforeOperation(LSMIOOperationType.FLUSH); + firstOpCtx.setIoOperationType(LSMIOOperationType.FLUSH); + callback.beforeOperation(firstOpCtx); + ILSMIndexOperationContext secondOpCtx = new TestLSMIndexOperationContext(mockIndex); //request to flush second component callback.updateLastLSN(2); - callback.beforeOperation(LSMIOOperationType.FLUSH); + secondOpCtx.setIoOperationType(LSMIOOperationType.FLUSH); + callback.beforeOperation(secondOpCtx); - Assert.assertEquals(1, callback.getComponentLSN(null)); + Assert.assertEquals(1, callback.getComponentLSN(Collections.emptyList())); final ILSMDiskComponent diskComponent1 = mockDiskComponent(); - callback.afterOperation(LSMIOOperationType.FLUSH, null, diskComponent1); - callback.afterFinalize(LSMIOOperationType.FLUSH, diskComponent1); + firstOpCtx.setNewComponent(diskComponent1); + callback.afterOperation(firstOpCtx); + callback.afterFinalize(firstOpCtx); - Assert.assertEquals(2, callback.getComponentLSN(null)); + Assert.assertEquals(2, callback.getComponentLSN(Collections.emptyList())); final ILSMDiskComponent diskComponent2 = mockDiskComponent(); - callback.afterOperation(LSMIOOperationType.FLUSH, null, diskComponent2); - callback.afterFinalize(LSMIOOperationType.FLUSH, diskComponent2); + secondOpCtx.setNewComponent(diskComponent2); + callback.afterOperation(secondOpCtx); + callback.afterFinalize(secondOpCtx); } @Test @@ -74,17 +83,20 @@ public abstract class AbstractLSMIOOperationCallbackTest extends TestCase { ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class); Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2); Mockito.when(mockIndex.getCurrentMemoryComponent()).thenReturn(Mockito.mock(AbstractLSMMemoryComponent.class)); - LSMBTreeIOOperationCallback callback = new LSMBTreeIOOperationCallback(mockIndex, new LSMComponentIdGenerator(), mockIndexCheckpointManagerProvider()); //request to flush first component + ILSMIndexOperationContext firstOpCtx = new TestLSMIndexOperationContext(mockIndex); callback.updateLastLSN(1); - callback.beforeOperation(LSMIOOperationType.FLUSH); + firstOpCtx.setIoOperationType(LSMIOOperationType.FLUSH); + callback.beforeOperation(firstOpCtx); //request to flush second component + ILSMIndexOperationContext secondOpCtx = new TestLSMIndexOperationContext(mockIndex); callback.updateLastLSN(2); - callback.beforeOperation(LSMIOOperationType.FLUSH); + secondOpCtx.setIoOperationType(LSMIOOperationType.FLUSH); + callback.beforeOperation(secondOpCtx); //request to flush first component again //this call should fail @@ -92,14 +104,16 @@ public abstract class AbstractLSMIOOperationCallbackTest extends TestCase { //there is no corresponding beforeOperation, since the first component is being flush //the scheduleFlush request would fail this time - Assert.assertEquals(1, callback.getComponentLSN(null)); + Assert.assertEquals(1, callback.getComponentLSN(Collections.emptyList())); final ILSMDiskComponent diskComponent1 = mockDiskComponent(); - callback.afterOperation(LSMIOOperationType.FLUSH, null, diskComponent1); - callback.afterFinalize(LSMIOOperationType.FLUSH, diskComponent1); + firstOpCtx.setNewComponent(diskComponent1); + callback.afterOperation(firstOpCtx); + callback.afterFinalize(firstOpCtx); final ILSMDiskComponent diskComponent2 = mockDiskComponent(); - Assert.assertEquals(2, callback.getComponentLSN(null)); - callback.afterOperation(LSMIOOperationType.FLUSH, null, diskComponent2); - callback.afterFinalize(LSMIOOperationType.FLUSH, diskComponent2); + secondOpCtx.setNewComponent(diskComponent2); + Assert.assertEquals(2, callback.getComponentLSN(Collections.emptyList())); + callback.afterOperation(secondOpCtx); + callback.afterFinalize(secondOpCtx); } @Test @@ -111,33 +125,39 @@ public abstract class AbstractLSMIOOperationCallbackTest extends TestCase { LSMBTreeIOOperationCallback callback = new LSMBTreeIOOperationCallback(mockIndex, new LSMComponentIdGenerator(), mockIndexCheckpointManagerProvider()); //request to flush first component + ILSMIndexOperationContext firstOpCtx = new TestLSMIndexOperationContext(mockIndex); callback.updateLastLSN(1); - callback.beforeOperation(LSMIOOperationType.FLUSH); + firstOpCtx.setIoOperationType(LSMIOOperationType.FLUSH); + callback.beforeOperation(firstOpCtx); //request to flush second component + ILSMIndexOperationContext secondOpCtx = new TestLSMIndexOperationContext(mockIndex); callback.updateLastLSN(2); - callback.beforeOperation(LSMIOOperationType.FLUSH); + secondOpCtx.setIoOperationType(LSMIOOperationType.FLUSH); + callback.beforeOperation(secondOpCtx); - Assert.assertEquals(1, callback.getComponentLSN(null)); + Assert.assertEquals(1, callback.getComponentLSN(Collections.emptyList())); // the first flush is finished, but has not finalized yet (in codebase, these two calls // are not synchronized) - callback.afterOperation(LSMIOOperationType.FLUSH, null, mockDiskComponent()); + firstOpCtx.setNewComponent(mockDiskComponent()); + callback.afterOperation(firstOpCtx); //request to flush first component again callback.updateLastLSN(3); // the first flush is finalized (it may be called after afterOperation for a while) - callback.afterFinalize(LSMIOOperationType.FLUSH, mockDiskComponent()); + callback.afterFinalize(firstOpCtx); // the second flush gets LSN 2 - Assert.assertEquals(2, callback.getComponentLSN(null)); + Assert.assertEquals(2, callback.getComponentLSN(Collections.emptyList())); // the second flush is finished - callback.afterOperation(LSMIOOperationType.FLUSH, null, mockDiskComponent()); - callback.afterFinalize(LSMIOOperationType.FLUSH, mockDiskComponent()); + secondOpCtx.setNewComponent(mockDiskComponent()); + callback.afterOperation(secondOpCtx); + callback.afterFinalize(secondOpCtx); // it should get new LSN 3 - Assert.assertEquals(3, callback.getComponentLSN(null)); + Assert.assertEquals(3, callback.getComponentLSN(Collections.emptyList())); } @Test @@ -179,14 +199,14 @@ public abstract class AbstractLSMIOOperationCallbackTest extends TestCase { // schedule a flush idGenerator.refresh(); ILSMComponentId expectedId = idGenerator.getId(); - callback.updateLastLSN(0); - callback.beforeOperation(LSMIOOperationType.FLUSH); + ILSMIndexOperationContext opCtx = new TestLSMIndexOperationContext(mockIndex); + opCtx.setIoOperationType(LSMIOOperationType.FLUSH); + callback.beforeOperation(opCtx); callback.recycled(mockComponent, true); - - final ILSMDiskComponent diskComponent = mockDiskComponent(); - callback.afterOperation(LSMIOOperationType.FLUSH, null, diskComponent); - callback.afterFinalize(LSMIOOperationType.FLUSH, diskComponent); + opCtx.setNewComponent(mockDiskComponent()); + callback.afterOperation(opCtx); + callback.afterFinalize(opCtx); checkMemoryComponent(expectedId, mockComponent); } } @@ -200,19 +220,19 @@ public abstract class AbstractLSMIOOperationCallbackTest extends TestCase { Mockito.when(mockIndex.getCurrentMemoryComponent()).thenReturn(mockComponent); LSMBTreeIOOperationCallback callback = new LSMBTreeIOOperationCallback(mockIndex, idGenerator, mockIndexCheckpointManagerProvider()); - ILSMComponentId id = idGenerator.getId(); callback.allocated(mockComponent); checkMemoryComponent(id, mockComponent); - Mockito.when(mockIndex.isMemoryComponentsAllocated()).thenReturn(true); - for (int i = 0; i < 10; i++) { idGenerator.refresh(); id = idGenerator.getId(); callback.updateLastLSN(0); + // Huh! There is no beforeOperation? + ILSMIndexOperationContext opCtx = new TestLSMIndexOperationContext(mockIndex); + opCtx.setIoOperationType(LSMIOOperationType.FLUSH); callback.recycled(mockComponent, false); - callback.afterFinalize(LSMIOOperationType.FLUSH, null); + callback.afterFinalize(opCtx); checkMemoryComponent(id, mockComponent); } } @@ -238,10 +258,12 @@ public abstract class AbstractLSMIOOperationCallbackTest extends TestCase { ILSMComponentId expectedId = idGenerator.getId(); callback.updateLastLSN(0); - callback.beforeOperation(LSMIOOperationType.FLUSH); - final ILSMDiskComponent diskComponent = mockDiskComponent(); - callback.afterOperation(LSMIOOperationType.FLUSH, null, diskComponent); - callback.afterFinalize(LSMIOOperationType.FLUSH, diskComponent); + ILSMIndexOperationContext firstOpCtx = new TestLSMIndexOperationContext(mockIndex); + firstOpCtx.setIoOperationType(LSMIOOperationType.FLUSH); + callback.beforeOperation(firstOpCtx); + firstOpCtx.setNewComponent(mockDiskComponent()); + callback.afterOperation(firstOpCtx); + callback.afterFinalize(firstOpCtx); // another flush is to be scheduled before the component is recycled idGenerator.refresh(); @@ -253,10 +275,12 @@ public abstract class AbstractLSMIOOperationCallbackTest extends TestCase { // schedule the next flush callback.updateLastLSN(0); - callback.beforeOperation(LSMIOOperationType.FLUSH); - final ILSMDiskComponent diskComponent2 = mockDiskComponent(); - callback.afterOperation(LSMIOOperationType.FLUSH, null, diskComponent2); - callback.afterFinalize(LSMIOOperationType.FLUSH, diskComponent2); + ILSMIndexOperationContext secondOpCtx = new TestLSMIndexOperationContext(mockIndex); + secondOpCtx.setIoOperationType(LSMIOOperationType.FLUSH); + callback.beforeOperation(secondOpCtx); + secondOpCtx.setNewComponent(mockDiskComponent()); + callback.afterOperation(secondOpCtx); + callback.afterFinalize(secondOpCtx); callback.recycled(mockComponent, true); checkMemoryComponent(nextId, mockComponent); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/517e1d27/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/TestLSMIndexOperationContext.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/TestLSMIndexOperationContext.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/TestLSMIndexOperationContext.java new file mode 100644 index 0000000..19536f6 --- /dev/null +++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/TestLSMIndexOperationContext.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.test.ioopcallbacks; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback; +import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation; +import org.apache.hyracks.storage.am.common.tuples.PermutingTupleReference; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext; +import org.apache.hyracks.storage.common.IModificationOperationCallback; +import org.apache.hyracks.storage.common.ISearchOperationCallback; +import org.apache.hyracks.storage.common.ISearchPredicate; +import org.apache.hyracks.storage.common.MultiComparator; + +public class TestLSMIndexOperationContext implements ILSMIndexOperationContext { + + private final ILSMIndex index; + private final List<ILSMComponent> componentHolder = new ArrayList<>(); + private final List<ILSMDiskComponent> componentsToBeMerged = new ArrayList<>(); + private final List<ILSMDiskComponent> componentsToBeReplicated = new ArrayList<>(); + private boolean isAccessingComponents; + private IndexOperation op; + private LSMIOOperationType ioOperationType; + private ILSMDiskComponent newComponent; + + public TestLSMIndexOperationContext(ILSMIndex index) { + this.index = index; + } + + @Override + public void setOperation(IndexOperation newOp) throws HyracksDataException { + this.op = newOp; + } + + @Override + public IndexOperation getOperation() { + return op; + } + + @Override + public void reset() { + op = null; + componentHolder.clear(); + componentsToBeMerged.clear(); + componentsToBeReplicated.clear(); + isAccessingComponents = false; + } + + @Override + public void destroy() throws HyracksDataException { + } + + @Override + public List<ILSMComponent> getComponentHolder() { + return componentHolder; + } + + @Override + public List<ILSMDiskComponent> getComponentsToBeMerged() { + return componentsToBeMerged; + } + + @Override + public ISearchOperationCallback getSearchOperationCallback() { + return NoOpOperationCallback.INSTANCE; + } + + @Override + public IModificationOperationCallback getModificationCallback() { + return NoOpOperationCallback.INSTANCE; + } + + @Override + public void setCurrentMutableComponentId(int currentMutableComponentId) { + } + + @Override + public void setSearchPredicate(ISearchPredicate searchPredicate) { + throw new UnsupportedOperationException(); + } + + @Override + public ISearchPredicate getSearchPredicate() { + throw new UnsupportedOperationException(); + } + + @Override + public List<ILSMDiskComponent> getComponentsToBeReplicated() { + return componentsToBeReplicated; + } + + @Override + public boolean isAccessingComponents() { + return isAccessingComponents; + } + + @Override + public void setAccessingComponents(boolean accessingComponents) { + this.isAccessingComponents = accessingComponents; + } + + @Override + public PermutingTupleReference getIndexTuple() { + throw new UnsupportedOperationException(); + } + + @Override + public PermutingTupleReference getFilterTuple() { + return null; + } + + @Override + public MultiComparator getFilterCmp() { + return null; + } + + @Override + public ILSMIndex getIndex() { + return index; + } + + @Override + public void logPerformanceCounters(int tupleCount) { + } + + @Override + public void incrementEnterExitTime(long increment) { + } + + @Override + public boolean isTracingEnabled() { + return false; + } + + @Override + public LSMIOOperationType getIoOperationType() { + return ioOperationType; + } + + @Override + public void setIoOperationType(LSMIOOperationType ioOpType) { + this.ioOperationType = ioOpType; + } + + @Override + public ILSMDiskComponent getNewComponent() { + return newComponent; + } + + @Override + public void setNewComponent(ILSMDiskComponent component) { + this.newComponent = component; + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/517e1d27/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java index 92d74d9..c0f7571 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java @@ -50,7 +50,6 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext; import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicy; import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker; import org.apache.hyracks.storage.am.lsm.common.api.ITwoPCIndex; -import org.apache.hyracks.storage.am.lsm.common.impls.BlockingIOOperationCallbackWrapper; import org.apache.hyracks.storage.am.lsm.common.impls.ExternalIndexHarness; import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences; import org.apache.hyracks.storage.am.lsm.common.impls.LSMTreeIndexAccessor; @@ -288,8 +287,9 @@ public class ExternalBTree extends LSMBTree implements ITwoPCIndex { throw new HyracksDataException("Failed to deactivate the index since it is already deactivated."); } if (flushOnExit) { - BlockingIOOperationCallbackWrapper cb = new BlockingIOOperationCallbackWrapper(ioOpCallback); - cb.afterFinalize(LSMIOOperationType.FLUSH, null); + ExternalBTreeOpContext opCtx = createOpContext(NoOpOperationCallback.INSTANCE, version); + opCtx.setIoOperationType(LSMIOOperationType.FLUSH); + ioOpCallback.afterFinalize(opCtx); } for (ILSMDiskComponent c : diskComponents) { c.deactivateAndPurge(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/517e1d27/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java index 6e06d37..1ba55f7 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java @@ -35,6 +35,7 @@ import org.apache.hyracks.storage.am.common.api.IMetadataPageManager; import org.apache.hyracks.storage.am.common.api.ITreeIndex; import org.apache.hyracks.storage.am.common.api.ITreeIndexFrameFactory; import org.apache.hyracks.storage.am.common.api.ITwoPCIndexBulkLoader; +import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters; import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback; import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation; import org.apache.hyracks.storage.am.lsm.common.api.AbstractLSMWithBloomFilterDiskComponent; @@ -430,7 +431,9 @@ public class ExternalBTreeWithBuddy extends AbstractLSMIndex implements ITreeInd throw new HyracksDataException("Failed to deactivate the index since it is already deactivated."); } if (flushOnExit) { - ioOpCallback.afterFinalize(LSMIOOperationType.FLUSH, null); + AbstractLSMIndexOperationContext opCtx = createOpContext(NoOpIndexAccessParameters.INSTANCE); + opCtx.setIoOperationType(LSMIOOperationType.FLUSH); + ioOpCallback.afterFinalize(opCtx); } // Even though, we deactivate the index, we don't exit components or // modify any of the lists to make sure they http://git-wip-us.apache.org/repos/asf/asterixdb/blob/517e1d27/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java index f5ee23b..65e7f64 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java @@ -33,7 +33,8 @@ public interface ILSMIOOperation extends Callable<Boolean> { enum LSMIOOperationType { FLUSH, MERGE, - LOAD + LOAD, + NOOP } /** http://git-wip-us.apache.org/repos/asf/asterixdb/blob/517e1d27/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperationCallback.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperationCallback.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperationCallback.java index 8df872b..acc9e89 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperationCallback.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperationCallback.java @@ -18,19 +18,16 @@ */ package org.apache.hyracks.storage.am.lsm.common.api; -import java.util.List; - import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType; public interface ILSMIOOperationCallback { /** * This method is called on an IO operation before the operation starts. - * (i.e. IO operations could be flush or merge operations.) + * (i.e. IO operations could be flush, or merge operations.) * For flush, this is called immediately before switching the current memory component pointer */ - void beforeOperation(LSMIOOperationType opType) throws HyracksDataException; + void beforeOperation(ILSMIndexOperationContext opCtx) throws HyracksDataException; /** * This method is called on an IO operation sometime after the operation was completed. @@ -39,22 +36,18 @@ public interface ILSMIOOperationCallback { * Copying content of metadata page from memory component to disk component should be done in this call * Merging content of metadata pages from disk components to new disk component should be done in this call * - * @param oldComponents - * @param newComponent * @throws HyracksDataException */ - void afterOperation(LSMIOOperationType opType, List<ILSMComponent> oldComponents, ILSMDiskComponent newComponent) - throws HyracksDataException; + void afterOperation(ILSMIndexOperationContext opCtx) throws HyracksDataException; /** * This method is called on an IO operation when the operation needs any cleanup works * regardless that the IO operation was executed or not. Once the IO operation is executed, * this method should be called after ILSMIOOperationCallback.afterOperation() was called. * - * @param newComponent * @throws HyracksDataException */ - void afterFinalize(LSMIOOperationType opType, ILSMDiskComponent newComponent) throws HyracksDataException; + void afterFinalize(ILSMIndexOperationContext opCtx) throws HyracksDataException; /** * This method is called when a memory component is recycled http://git-wip-us.apache.org/repos/asf/asterixdb/blob/517e1d27/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java index ec9124d..79b3262 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java @@ -22,6 +22,7 @@ import java.util.List; import org.apache.hyracks.storage.am.common.api.IIndexOperationContext; import org.apache.hyracks.storage.am.common.tuples.PermutingTupleReference; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType; import org.apache.hyracks.storage.common.IModificationOperationCallback; import org.apache.hyracks.storage.common.ISearchOperationCallback; import org.apache.hyracks.storage.common.ISearchPredicate; @@ -82,4 +83,28 @@ public interface ILSMIndexOperationContext extends IIndexOperationContext { * @return true if performance tracing is enabled, false otherwise */ boolean isTracingEnabled(); + + /** + * @return the IO Operation type associated with this context + */ + LSMIOOperationType getIoOperationType(); + + /** + * Set the IO Operation type associated with this context + * + * @param ioOpType + */ + void setIoOperationType(LSMIOOperationType ioOpType); + + /** + * @return the new component produced by this operation if any, null otherwise + */ + ILSMDiskComponent getNewComponent(); + + /** + * Set the new component produced by this operation + * + * @param component + */ + void setNewComponent(ILSMDiskComponent component); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/517e1d27/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java index fef5516..0368a09 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java @@ -369,6 +369,7 @@ public abstract class AbstractLSMIndex implements ILSMIndex { AbstractLSMIndexOperationContext opCtx = createOpContext(NoOpIndexAccessParameters.INSTANCE); opCtx.setOperation(ctx.getOperation()); opCtx.getComponentHolder().addAll(mergingComponents); + opCtx.getComponentsToBeMerged().addAll(ctx.getComponentsToBeMerged()); ILSMDiskComponent firstComponent = (ILSMDiskComponent) mergingComponents.get(0); ILSMDiskComponent lastComponent = (ILSMDiskComponent) mergingComponents.get(mergingComponents.size() - 1); LSMComponentFileReferences mergeFileRefs = getMergeFileReferences(firstComponent, lastComponent); @@ -406,8 +407,10 @@ public abstract class AbstractLSMIndex implements ILSMIndex { public IIndexBulkLoader createBulkLoader(float fillLevel, boolean verifyInput, long numElementsHint) throws HyracksDataException { - ioOpCallback.beforeOperation(LSMIOOperationType.LOAD); - return new LSMIndexDiskComponentBulkLoader(this, fillLevel, verifyInput, numElementsHint); + AbstractLSMIndexOperationContext opCtx = createOpContext(NoOpIndexAccessParameters.INSTANCE); + opCtx.setIoOperationType(LSMIOOperationType.LOAD); + ioOpCallback.beforeOperation(opCtx); + return new LSMIndexDiskComponentBulkLoader(this, opCtx, fillLevel, verifyInput, numElementsHint); } @Override http://git-wip-us.apache.org/repos/asf/asterixdb/blob/517e1d27/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexOperationContext.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexOperationContext.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexOperationContext.java index 1b540b7..72c2b07 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexOperationContext.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexOperationContext.java @@ -27,6 +27,7 @@ import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation; import org.apache.hyracks.storage.am.common.tuples.PermutingTupleReference; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent; import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext; import org.apache.hyracks.storage.common.IModificationOperationCallback; @@ -54,6 +55,8 @@ public abstract class AbstractLSMIndexOperationContext implements ILSMIndexOpera protected final ITracer tracer; protected final long traceCategory; private long enterExitTime = 0L; + private LSMIOOperationType ioOpType = LSMIOOperationType.NOOP; + private ILSMDiskComponent newDiskComponent; public AbstractLSMIndexOperationContext(ILSMIndex index, int[] treeFields, int[] filterFields, IBinaryComparatorFactory[] filterCmpFactories, ISearchOperationCallback searchCallback, @@ -191,4 +194,24 @@ public abstract class AbstractLSMIndexOperationContext implements ILSMIndexOpera public ILSMIndex getIndex() { return index; } + + @Override + public LSMIOOperationType getIoOperationType() { + return ioOpType; + } + + @Override + public void setIoOperationType(LSMIOOperationType ioOpType) { + this.ioOpType = ioOpType; + } + + @Override + public ILSMDiskComponent getNewComponent() { + return newDiskComponent; + } + + @Override + public void setNewComponent(ILSMDiskComponent component) { + this.newDiskComponent = component; + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/517e1d27/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/BlockingIOOperationCallbackWrapper.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/BlockingIOOperationCallbackWrapper.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/BlockingIOOperationCallbackWrapper.java index e464231..042720c 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/BlockingIOOperationCallbackWrapper.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/BlockingIOOperationCallbackWrapper.java @@ -18,13 +18,9 @@ */ package org.apache.hyracks.storage.am.lsm.common.impls; -import java.util.List; - import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext; import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent; public class BlockingIOOperationCallbackWrapper implements ILSMIOOperationCallback { @@ -45,20 +41,18 @@ public class BlockingIOOperationCallbackWrapper implements ILSMIOOperationCallba } @Override - public void beforeOperation(LSMIOOperationType opType) throws HyracksDataException { - wrappedCallback.beforeOperation(opType); + public void beforeOperation(ILSMIndexOperationContext opCtx) throws HyracksDataException { + wrappedCallback.beforeOperation(opCtx); } @Override - public void afterOperation(LSMIOOperationType opType, List<ILSMComponent> oldComponents, - ILSMDiskComponent newComponent) throws HyracksDataException { - wrappedCallback.afterOperation(opType, oldComponents, newComponent); + public void afterOperation(ILSMIndexOperationContext opCtx) throws HyracksDataException { + wrappedCallback.afterOperation(opCtx); } @Override - public synchronized void afterFinalize(LSMIOOperationType opType, ILSMDiskComponent newComponent) - throws HyracksDataException { - wrappedCallback.afterFinalize(opType, newComponent); + public synchronized void afterFinalize(ILSMIndexOperationContext opCtx) throws HyracksDataException { + wrappedCallback.afterFinalize(opCtx); notifyAll(); notified = true; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/517e1d27/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ComponentReplacementContext.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ComponentReplacementContext.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ComponentReplacementContext.java index dcac219..a992c5e 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ComponentReplacementContext.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ComponentReplacementContext.java @@ -29,6 +29,7 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.LSMComponentType; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId; import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext; import org.apache.hyracks.storage.common.IModificationOperationCallback; @@ -218,4 +219,24 @@ public class ComponentReplacementContext implements ILSMIndexOperationContext { public void destroy() throws HyracksDataException { // No Op.. Nothing to destroy } + + @Override + public LSMIOOperationType getIoOperationType() { + throw new UnsupportedOperationException(); + } + + @Override + public void setIoOperationType(LSMIOOperationType ioOpType) { + throw new UnsupportedOperationException(); + } + + @Override + public ILSMDiskComponent getNewComponent() { + throw new UnsupportedOperationException(); + } + + @Override + public void setNewComponent(ILSMDiskComponent component) { + throw new UnsupportedOperationException(); + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/517e1d27/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ExternalIndexHarness.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ExternalIndexHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ExternalIndexHarness.java index d9d3a07..aa54127 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ExternalIndexHarness.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ExternalIndexHarness.java @@ -107,7 +107,8 @@ public class ExternalIndexHarness extends LSMHarness { // Check if there is any action that is needed to be taken based on the operation type switch (opType) { case MERGE: - lsmIndex.getIOOperationCallback().beforeOperation(LSMIOOperationType.MERGE); + ctx.setIoOperationType(LSMIOOperationType.MERGE); + lsmIndex.getIOOperationCallback().beforeOperation(ctx); default: break; } @@ -208,7 +209,8 @@ public class ExternalIndexHarness extends LSMHarness { public void scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) throws HyracksDataException { if (!getAndEnterComponents(ctx, LSMOperationType.MERGE, true)) { - callback.afterFinalize(LSMIOOperationType.MERGE, null); + ctx.setIoOperationType(LSMIOOperationType.MERGE); + callback.afterFinalize(ctx); return; } lsmIndex.scheduleMerge(ctx, callback); @@ -221,7 +223,8 @@ public class ExternalIndexHarness extends LSMHarness { if (!getAndEnterComponents(ctx, LSMOperationType.MERGE, true)) { // If the merge cannot be scheduled because there is already an ongoing merge on subset/all of the components, then // whenever the current merge has finished, it will schedule the full merge again. - callback.afterFinalize(LSMIOOperationType.MERGE, null); + ctx.setIoOperationType(LSMIOOperationType.MERGE); + callback.afterFinalize(ctx); return; } fullMergeIsRequested.set(false); @@ -237,11 +240,13 @@ public class ExternalIndexHarness extends LSMHarness { ILSMDiskComponent newComponent = null; try { newComponent = lsmIndex.merge(operation); - operation.getCallback().afterOperation(LSMIOOperationType.MERGE, ctx.getComponentHolder(), newComponent); + ctx.setNewComponent(newComponent); + ctx.setIoOperationType(LSMIOOperationType.MERGE); + operation.getCallback().afterOperation(ctx); newComponent.markAsValid(lsmIndex.isDurable()); } finally { exitComponents(ctx, LSMOperationType.MERGE, newComponent, false); - operation.getCallback().afterFinalize(LSMIOOperationType.MERGE, newComponent); + operation.getCallback().afterFinalize(ctx); } if (LOGGER.isInfoEnabled()) { LOGGER.info("Finished the merge operation for index: " + lsmIndex); @@ -301,7 +306,8 @@ public class ExternalIndexHarness extends LSMHarness { @Override public void scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) throws HyracksDataException { - callback.afterFinalize(LSMIOOperationType.FLUSH, null); + ctx.setIoOperationType(LSMIOOperationType.FLUSH); + callback.afterFinalize(ctx); } @Override http://git-wip-us.apache.org/repos/asf/asterixdb/blob/517e1d27/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java index fa3093c..eed8f6e 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java @@ -221,7 +221,8 @@ public class LSMHarness implements ILSMHarness { // Check if there is any action that is needed to be taken based on the operation type switch (opType) { case FLUSH: - lsmIndex.getIOOperationCallback().beforeOperation(LSMIOOperationType.FLUSH); + ctx.setIoOperationType(LSMIOOperationType.FLUSH); + lsmIndex.getIOOperationCallback().beforeOperation(ctx); // Changing the flush status should *always* precede changing the mutable component. lsmIndex.changeFlushStatusForCurrentMutableCompoent(false); lsmIndex.changeMutableComponent(); @@ -230,7 +231,8 @@ public class LSMHarness implements ILSMHarness { opTracker.notifyAll(); // NOSONAR: Always called from a synchronized block break; case MERGE: - lsmIndex.getIOOperationCallback().beforeOperation(LSMIOOperationType.MERGE); + ctx.setIoOperationType(LSMIOOperationType.MERGE); + lsmIndex.getIOOperationCallback().beforeOperation(ctx); break; default: break; @@ -549,7 +551,8 @@ public class LSMHarness implements ILSMHarness { public void scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) throws HyracksDataException { if (!getAndEnterComponents(ctx, LSMOperationType.FLUSH, true)) { - callback.afterFinalize(LSMIOOperationType.FLUSH, null); + ctx.setIoOperationType(LSMIOOperationType.FLUSH); + callback.afterFinalize(ctx); return; } lsmIndex.scheduleFlush(ctx, callback); @@ -565,7 +568,9 @@ public class LSMHarness implements ILSMHarness { boolean failedOperation = false; try { newComponent = lsmIndex.flush(operation); - operation.getCallback().afterOperation(LSMIOOperationType.FLUSH, null, newComponent); + ctx.setNewComponent(newComponent); + ctx.setIoOperationType(LSMIOOperationType.FLUSH); + operation.getCallback().afterOperation(ctx); newComponent.markAsValid(lsmIndex.isDurable()); } catch (Throwable e) { // NOSONAR Log and re-throw failedOperation = true; @@ -575,7 +580,8 @@ public class LSMHarness implements ILSMHarness { throw e; } finally { exitComponents(ctx, LSMOperationType.FLUSH, newComponent, failedOperation); - operation.getCallback().afterFinalize(LSMIOOperationType.FLUSH, newComponent); + ctx.setIoOperationType(LSMIOOperationType.FLUSH); + operation.getCallback().afterFinalize(ctx); } } finally { @@ -595,7 +601,8 @@ public class LSMHarness implements ILSMHarness { public void scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) throws HyracksDataException { if (!getAndEnterComponents(ctx, LSMOperationType.MERGE, true)) { - callback.afterFinalize(LSMIOOperationType.MERGE, null); + ctx.setIoOperationType(LSMIOOperationType.MERGE); + callback.afterFinalize(ctx); return; } lsmIndex.scheduleMerge(ctx, callback); @@ -609,7 +616,8 @@ public class LSMHarness implements ILSMHarness { // If the merge cannot be scheduled because there is already an ongoing merge on // subset/all of the components, then whenever the current merge has finished, // it will schedule the full merge again. - callback.afterFinalize(LSMIOOperationType.MERGE, null); + ctx.setIoOperationType(LSMIOOperationType.MERGE); + callback.afterFinalize(ctx); return; } fullMergeIsRequested.set(false); @@ -626,8 +634,9 @@ public class LSMHarness implements ILSMHarness { boolean failedOperation = false; try { newComponent = lsmIndex.merge(operation); - operation.getCallback().afterOperation(LSMIOOperationType.MERGE, ctx.getComponentHolder(), - newComponent); + ctx.setNewComponent(newComponent); + ctx.setIoOperationType(LSMIOOperationType.MERGE); + operation.getCallback().afterOperation(ctx); newComponent.markAsValid(lsmIndex.isDurable()); } catch (Throwable e) { // NOSONAR: Log and re-throw failedOperation = true; @@ -637,7 +646,7 @@ public class LSMHarness implements ILSMHarness { throw e; } finally { exitComponents(ctx, LSMOperationType.MERGE, newComponent, failedOperation); - operation.getCallback().afterFinalize(LSMIOOperationType.MERGE, newComponent); + operation.getCallback().afterFinalize(ctx); } } finally { /* http://git-wip-us.apache.org/repos/asf/asterixdb/blob/517e1d27/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexDiskComponentBulkLoader.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexDiskComponentBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexDiskComponentBulkLoader.java index 7bc0660..5e105a4 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexDiskComponentBulkLoader.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexDiskComponentBulkLoader.java @@ -22,26 +22,27 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentBulkLoader; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext; import org.apache.hyracks.storage.common.IIndexBulkLoader; public class LSMIndexDiskComponentBulkLoader implements IIndexBulkLoader { private final AbstractLSMIndex lsmIndex; - private final ILSMDiskComponent component; private final ILSMDiskComponentBulkLoader componentBulkLoader; + private ILSMIndexOperationContext opCtx; - public LSMIndexDiskComponentBulkLoader(AbstractLSMIndex lsmIndex, float fillFactor, boolean verifyInput, - long numElementsHint) throws HyracksDataException { + public LSMIndexDiskComponentBulkLoader(AbstractLSMIndex lsmIndex, ILSMIndexOperationContext opCtx, float fillFactor, + boolean verifyInput, long numElementsHint) throws HyracksDataException { this.lsmIndex = lsmIndex; + this.opCtx = opCtx; // Note that by using a flush target file name, we state that the // new bulk loaded component is "newer" than any other merged component. - this.component = lsmIndex.createBulkLoadTarget(); + opCtx.setNewComponent(lsmIndex.createBulkLoadTarget()); this.componentBulkLoader = - component.createBulkLoader(fillFactor, verifyInput, numElementsHint, false, true, true); + opCtx.getNewComponent().createBulkLoader(fillFactor, verifyInput, numElementsHint, false, true, true); } public ILSMDiskComponent getComponent() { - return component; + return opCtx.getNewComponent(); } @Override @@ -57,15 +58,15 @@ public class LSMIndexDiskComponentBulkLoader implements IIndexBulkLoader { public void end() throws HyracksDataException { try { componentBulkLoader.end(); - if (component.getComponentSize() > 0) { + if (opCtx.getNewComponent().getComponentSize() > 0) { //TODO(amoudi): Ensure Bulk load follow the same lifecycle Other Operations (Flush, Merge, etc). //then after operation should be called from harness as well //https://issues.apache.org/jira/browse/ASTERIXDB-1764 - lsmIndex.getIOOperationCallback().afterOperation(LSMIOOperationType.LOAD, null, component); - lsmIndex.getHarness().addBulkLoadedComponent(component); + lsmIndex.getIOOperationCallback().afterOperation(opCtx); + lsmIndex.getHarness().addBulkLoadedComponent(opCtx.getNewComponent()); } } finally { - lsmIndex.getIOOperationCallback().afterFinalize(LSMIOOperationType.LOAD, component); + lsmIndex.getIOOperationCallback().afterFinalize(opCtx); } } @@ -73,9 +74,10 @@ public class LSMIndexDiskComponentBulkLoader implements IIndexBulkLoader { public void abort() throws HyracksDataException { try { componentBulkLoader.abort(); - lsmIndex.getIOOperationCallback().afterOperation(LSMIOOperationType.LOAD, null, null); + opCtx.setNewComponent(null); + lsmIndex.getIOOperationCallback().afterOperation(opCtx); } finally { - lsmIndex.getIOOperationCallback().afterFinalize(LSMIOOperationType.LOAD, null); + lsmIndex.getIOOperationCallback().afterFinalize(opCtx); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/517e1d27/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpIOOperationCallbackFactory.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpIOOperationCallbackFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpIOOperationCallbackFactory.java index eec2dca..3432624 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpIOOperationCallbackFactory.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpIOOperationCallbackFactory.java @@ -18,16 +18,12 @@ */ package org.apache.hyracks.storage.am.lsm.common.impls; -import java.util.List; - import org.apache.hyracks.api.application.INCServiceContext; import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext; import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent; import org.apache.hyracks.storage.common.IResource; @@ -51,19 +47,17 @@ public enum NoOpIOOperationCallbackFactory implements ILSMIOOperationCallbackFac } @Override - public void beforeOperation(LSMIOOperationType opType) throws HyracksDataException { + public void beforeOperation(ILSMIndexOperationContext opCtx) throws HyracksDataException { // Do nothing. } @Override - public void afterOperation(LSMIOOperationType opType, List<ILSMComponent> oldComponents, - ILSMDiskComponent newComponent) throws HyracksDataException { + public void afterOperation(ILSMIndexOperationContext opCtx) throws HyracksDataException { // Do nothing. } @Override - public void afterFinalize(LSMIOOperationType opType, ILSMDiskComponent newComponent) - throws HyracksDataException { + public void afterFinalize(ILSMIndexOperationContext opCtx) throws HyracksDataException { // Do nothing. } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/517e1d27/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/StubIOOperationCallback.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/StubIOOperationCallback.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/StubIOOperationCallback.java index 88def5e..2c16be0 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/StubIOOperationCallback.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/StubIOOperationCallback.java @@ -21,10 +21,9 @@ package org.apache.hyracks.storage.am.lsm.common.impls; import java.util.List; import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent; import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext; import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent; /** @@ -34,33 +33,29 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent; public class StubIOOperationCallback implements ILSMIOOperationCallback { - private List<ILSMComponent> oldComponents = null; - private ILSMDiskComponent newComponent = null; + private ILSMIndexOperationContext opCtx = null; @Override - public void beforeOperation(LSMIOOperationType opType) throws HyracksDataException { + public void beforeOperation(ILSMIndexOperationContext opCtx) throws HyracksDataException { // Not interested in this } @Override - public void afterOperation(LSMIOOperationType opType, List<ILSMComponent> oldComponents, - ILSMDiskComponent newComponent) throws HyracksDataException { - this.oldComponents = oldComponents; - this.newComponent = newComponent; + public void afterOperation(ILSMIndexOperationContext opCtx) throws HyracksDataException { + this.opCtx = opCtx; } @Override - public synchronized void afterFinalize(LSMIOOperationType opType, ILSMDiskComponent newComponent) - throws HyracksDataException { + public void afterFinalize(ILSMIndexOperationContext opCtx) throws HyracksDataException { // Redundant info from after } - public List<ILSMComponent> getLastOldComponents() { - return oldComponents; + public List<ILSMDiskComponent> getLastOldComponents() { + return opCtx.getComponentsToBeMerged(); } public ILSMDiskComponent getLastNewComponent() { - return newComponent; + return opCtx.getNewComponent(); } @Override http://git-wip-us.apache.org/repos/asf/asterixdb/blob/517e1d27/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/ExternalRTree.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/ExternalRTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/ExternalRTree.java index f29bffc..7b12250 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/ExternalRTree.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/ExternalRTree.java @@ -50,7 +50,7 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext; import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicy; import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker; import org.apache.hyracks.storage.am.lsm.common.api.ITwoPCIndex; -import org.apache.hyracks.storage.am.lsm.common.impls.BlockingIOOperationCallbackWrapper; +import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexOperationContext; import org.apache.hyracks.storage.am.lsm.common.impls.ExternalIndexHarness; import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences; import org.apache.hyracks.storage.am.rtree.impls.SearchPredicate; @@ -323,10 +323,10 @@ public class ExternalRTree extends LSMRTree implements ITwoPCIndex { if (!isActive) { throw new HyracksDataException("Failed to deactivate the index since it is already deactivated."); } - if (flushOnExit) { - BlockingIOOperationCallbackWrapper cb = new BlockingIOOperationCallbackWrapper(ioOpCallback); - cb.afterFinalize(LSMIOOperationType.FLUSH, null); + AbstractLSMIndexOperationContext opCtx = createOpContext(NoOpOperationCallback.INSTANCE, version); + opCtx.setIoOperationType(LSMIOOperationType.FLUSH); + ioOpCallback.afterFinalize(opCtx); } for (ILSMDiskComponent c : diskComponents) {